Hanzo Tasks
Durable workflow execution engine with automatic retries, saga compensation, and multi-tenant isolation.
Hanzo Tasks
Hanzo Tasks is a durable workflow execution engine for building invincible distributed applications. Define complex business logic as code -- workflows survive process crashes, infrastructure failures, and deployments. Every step is tracked, retried on failure, and can be inspected or replayed from the console.
Hanzo Tasks provides durable workflow execution with multi-tenant namespace isolation via Hanzo IAM, secrets management via Hanzo KMS, and unified observability through the Hanzo O11y stack.
Endpoint: tasks.hanzo.ai (UI) / tasks-api.hanzo.ai:443 (gRPC)
Console: console.hanzo.ai > Tasks
SDKs: Go, Python, TypeScript, Java
Source: github.com/hanzoai/tasks
Features
| Feature | Description |
|---|---|
| Durable Execution | Workflows survive crashes, restarts, and deployments -- state is persisted after every step |
| Automatic Retries | Configurable retry policies with exponential backoff, max attempts, and non-retryable error types |
| Saga Compensation | Define compensation logic that runs when a multi-step workflow partially fails |
| Scheduled Workflows | Cron expressions and fixed intervals with guaranteed execution and catch-up |
| Signals & Queries | Send data to running workflows (signals) or read their state (queries) without interrupting execution |
| Child Workflows | Compose complex pipelines from reusable workflow building blocks |
| Continue-as-New | Long-running workflows reset history to prevent unbounded growth |
| Multi-Tenant Namespaces | Each Hanzo org gets an isolated namespace with independent workflow history |
| IAM Integration | OIDC-based auth via Hanzo IAM -- RBAC controls who can start, signal, or cancel workflows |
| KMS Secrets | Workflow activities access secrets via Hanzo KMS -- no plaintext credentials in code |
| Full History & Replay | Every workflow execution is recorded -- replay failed workflows for debugging |
| OTEL Observability | Distributed tracing, metrics, and logs flow into the Hanzo O11y stack |
| Open API | Standard workflow SDKs and CLI tools work unchanged |
Architecture
┌────────────────────────────────────────────────────┐
│ Hanzo Console │
│ (Workflow browser, execution viewer) │
└──────────────────────┬─────────────────────────────┘
│ gRPC
┌──────────────────────▼─────────────────────────────┐
│ Hanzo Tasks Server │
│ ┌──────────┬──────────┬──────────┬────────────┐ │
│ │ Frontend │ History │ Matching │ Worker │ │
│ │ Service │ Service │ Service │ Service │ │
│ └──────────┴──────────┴──────────┴────────────┘ │
│ Multi-tenant namespace routing │
│ IAM OIDC authorizer │
└──────────────────────┬─────────────────────────────┘
│
┌──────────▼──────────┐
│ Hanzo SQL │
│ (PostgreSQL) │
└─────────────────────┘Workers connect to the Tasks server via gRPC, poll for work on task queues, execute activities, and report results back. The server persists all state to PostgreSQL.
Quick Start
Python SDK
from hanzo.tasks import Client, workflow, activity
import asyncio
@activity.defn
async def send_email(to: str, subject: str) -> str:
# Your activity logic here
return f"Sent to {to}"
@workflow.defn
class OnboardingWorkflow:
@workflow.run
async def run(self, user_email: str) -> str:
# Step 1: Send welcome email (retried automatically on failure)
result = await workflow.execute_activity(
send_email,
args=[user_email, "Welcome to Hanzo!"],
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=3),
)
# Step 2: Wait for user to complete profile (up to 7 days)
await workflow.wait_condition(
lambda: self.profile_completed,
timeout=timedelta(days=7),
)
return "Onboarding complete"
@workflow.signal
async def complete_profile(self):
self.profile_completed = True
async def main():
client = await Client.connect(
"tasks-api.hanzo.ai:443",
# IAM token for multi-tenant auth
api_key="your-hanzo-api-key",
namespace="your-org", # Hanzo org = Tasks namespace
)
handle = await client.start_workflow(
OnboardingWorkflow.run,
"new-user@example.com",
id="onboard-user-123",
task_queue="onboarding",
)
result = await handle.result()Go SDK
package main
import (
"context"
"github.com/hanzoai/tasks/client"
"github.com/hanzoai/tasks/worker"
"github.com/hanzoai/tasks/workflow"
"github.com/hanzoai/tasks/activity"
)
func OnboardingWorkflow(ctx workflow.Context, email string) error {
opts := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &tasks.RetryPolicy{MaximumAttempts: 3},
}
ctx = workflow.WithActivityOptions(ctx, opts)
err := workflow.ExecuteActivity(ctx, SendEmail, email, "Welcome!").Get(ctx, nil)
if err != nil {
return err
}
// Wait for signal (human-in-the-loop)
var completed bool
signalChan := workflow.GetSignalChannel(ctx, "complete-profile")
signalChan.Receive(ctx, &completed)
return nil
}TypeScript SDK
import { Client, Connection } from '@hanzo/tasks';
const connection = await Connection.connect({
address: 'tasks-api.hanzo.ai:443',
});
const client = new Client({
connection,
namespace: 'your-org',
});
const handle = await client.workflow.start('OnboardingWorkflow', {
args: ['new-user@example.com'],
taskQueue: 'onboarding',
workflowId: 'onboard-user-123',
});
const result = await handle.result();Use Cases
AI Agent Orchestration
@workflow.defn
class AgentPipeline:
@workflow.run
async def run(self, prompt: str):
plan = await workflow.execute_activity(plan_task, prompt)
for step in plan.steps:
result = await workflow.execute_activity(
execute_step, step,
retry_policy=RetryPolicy(maximum_attempts=5),
)
if result.needs_human_review:
approved = await workflow.wait_condition(
lambda: self.approved, timeout=timedelta(hours=24)
)Commerce Saga (Compensation)
@workflow.defn
class OrderSaga:
@workflow.run
async def run(self, order):
# Each step has a compensating action
charge = await workflow.execute_activity(charge_card, order)
try:
await workflow.execute_activity(reserve_inventory, order)
await workflow.execute_activity(ship_order, order)
except Exception:
# Compensate: refund the charge
await workflow.execute_activity(refund_charge, charge)
raiseScheduled Jobs (Replace CronJobs)
# Start a cron workflow
handle = await client.start_workflow(
DailyReportWorkflow.run,
id="daily-report",
task_queue="reports",
cron_schedule="0 9 * * MON-FRI", # 9 AM weekdays
)Multi-Tenancy
Each Hanzo organization maps to a Tasks namespace:
| Hanzo Concept | Tasks Concept |
|---|---|
| Organization | Namespace |
| Project | Namespace prefix (org/project) |
| IAM User | Workflow identity |
| IAM Role | Namespace permission set |
| KMS Secret | Activity credential |
Namespace isolation ensures one org's workflows cannot see or affect another's.
Pricing
Hanzo Tasks follows usage-based pricing:
| Tier | Actions/mo | Price | Includes |
|---|---|---|---|
| Free | 25,000 | $0 | 1 namespace, 7-day retention |
| Pro | 1,000,000 | $0.035/1K actions | 10 namespaces, 30-day retention |
| Team | 10,000,000 | $0.025/1K actions | Unlimited namespaces, 90-day retention |
| Enterprise | Unlimited | Custom | Dedicated resources, 365-day retention, SLA |
An action is a workflow action: workflow start, activity completion, signal, timer, etc.
Configuration
Environment Variables
| Variable | Description | Default |
|---|---|---|
TASKS_ENDPOINT | Tasks server gRPC endpoint | tasks-api.hanzo.ai:443 |
TASKS_NAMESPACE | Default namespace (org name) | - |
HANZO_API_KEY | API key for authentication | - |
TASKS_TLS_ENABLED | Enable TLS for gRPC | true |
Self-Hosted
# compose.yml
services:
tasks:
image: ghcr.io/hanzoai/tasks:latest
ports:
- "7233:7233"
environment:
DB: postgres12
POSTGRES_SEEDS: your-postgres-host
POSTGRES_USER: tasks
POSTGRES_PWD: ${TASKS_DB_PASSWORD}Related Services
- Hanzo MQ -- Job queues for simpler async processing
- Hanzo PubSub -- Event streaming backbone
- Hanzo Stream -- Kafka-compatible streaming
- Hanzo Cron -- Lightweight scheduled jobs (powered by Tasks)
- Hanzo Flow -- Visual workflow builder (UI for Tasks)
How is this guide?
Last updated on