Hanzo

Hanzo Tasks

Durable workflow execution engine with automatic retries, saga compensation, and multi-tenant isolation.

Hanzo Tasks

Hanzo Tasks is a durable workflow execution engine for building invincible distributed applications. Define complex business logic as code -- workflows survive process crashes, infrastructure failures, and deployments. Every step is tracked, retried on failure, and can be inspected or replayed from the console.

Hanzo Tasks provides durable workflow execution with multi-tenant namespace isolation via Hanzo IAM, secrets management via Hanzo KMS, and unified observability through the Hanzo O11y stack.

Endpoint: tasks.hanzo.ai (UI) / tasks-api.hanzo.ai:443 (gRPC) Console: console.hanzo.ai > Tasks SDKs: Go, Python, TypeScript, Java Source: github.com/hanzoai/tasks

Features

FeatureDescription
Durable ExecutionWorkflows survive crashes, restarts, and deployments -- state is persisted after every step
Automatic RetriesConfigurable retry policies with exponential backoff, max attempts, and non-retryable error types
Saga CompensationDefine compensation logic that runs when a multi-step workflow partially fails
Scheduled WorkflowsCron expressions and fixed intervals with guaranteed execution and catch-up
Signals & QueriesSend data to running workflows (signals) or read their state (queries) without interrupting execution
Child WorkflowsCompose complex pipelines from reusable workflow building blocks
Continue-as-NewLong-running workflows reset history to prevent unbounded growth
Multi-Tenant NamespacesEach Hanzo org gets an isolated namespace with independent workflow history
IAM IntegrationOIDC-based auth via Hanzo IAM -- RBAC controls who can start, signal, or cancel workflows
KMS SecretsWorkflow activities access secrets via Hanzo KMS -- no plaintext credentials in code
Full History & ReplayEvery workflow execution is recorded -- replay failed workflows for debugging
OTEL ObservabilityDistributed tracing, metrics, and logs flow into the Hanzo O11y stack
Open APIStandard workflow SDKs and CLI tools work unchanged

Architecture

┌────────────────────────────────────────────────────┐
│                  Hanzo Console                      │
│          (Workflow browser, execution viewer)       │
└──────────────────────┬─────────────────────────────┘
                       │ gRPC
┌──────────────────────▼─────────────────────────────┐
│              Hanzo Tasks Server                     │
│   ┌──────────┬──────────┬──────────┬────────────┐  │
│   │ Frontend │ History  │ Matching │  Worker    │  │
│   │ Service  │ Service  │ Service  │  Service   │  │
│   └──────────┴──────────┴──────────┴────────────┘  │
│         Multi-tenant namespace routing              │
│         IAM OIDC authorizer                         │
└──────────────────────┬─────────────────────────────┘

            ┌──────────▼──────────┐
            │   Hanzo SQL         │
            │   (PostgreSQL)      │
            └─────────────────────┘

Workers connect to the Tasks server via gRPC, poll for work on task queues, execute activities, and report results back. The server persists all state to PostgreSQL.

Quick Start

Python SDK

from hanzo.tasks import Client, workflow, activity
import asyncio

@activity.defn
async def send_email(to: str, subject: str) -> str:
    # Your activity logic here
    return f"Sent to {to}"

@workflow.defn
class OnboardingWorkflow:
    @workflow.run
    async def run(self, user_email: str) -> str:
        # Step 1: Send welcome email (retried automatically on failure)
        result = await workflow.execute_activity(
            send_email,
            args=[user_email, "Welcome to Hanzo!"],
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=RetryPolicy(maximum_attempts=3),
        )

        # Step 2: Wait for user to complete profile (up to 7 days)
        await workflow.wait_condition(
            lambda: self.profile_completed,
            timeout=timedelta(days=7),
        )

        return "Onboarding complete"

    @workflow.signal
    async def complete_profile(self):
        self.profile_completed = True

async def main():
    client = await Client.connect(
        "tasks-api.hanzo.ai:443",
        # IAM token for multi-tenant auth
        api_key="your-hanzo-api-key",
        namespace="your-org",  # Hanzo org = Tasks namespace
    )

    handle = await client.start_workflow(
        OnboardingWorkflow.run,
        "new-user@example.com",
        id="onboard-user-123",
        task_queue="onboarding",
    )

    result = await handle.result()

Go SDK

package main

import (
    "context"
    "github.com/hanzoai/tasks/client"
    "github.com/hanzoai/tasks/worker"
    "github.com/hanzoai/tasks/workflow"
    "github.com/hanzoai/tasks/activity"
)

func OnboardingWorkflow(ctx workflow.Context, email string) error {
    opts := workflow.ActivityOptions{
        StartToCloseTimeout: 30 * time.Second,
        RetryPolicy: &tasks.RetryPolicy{MaximumAttempts: 3},
    }
    ctx = workflow.WithActivityOptions(ctx, opts)

    err := workflow.ExecuteActivity(ctx, SendEmail, email, "Welcome!").Get(ctx, nil)
    if err != nil {
        return err
    }

    // Wait for signal (human-in-the-loop)
    var completed bool
    signalChan := workflow.GetSignalChannel(ctx, "complete-profile")
    signalChan.Receive(ctx, &completed)

    return nil
}

TypeScript SDK

import { Client, Connection } from '@hanzo/tasks';

const connection = await Connection.connect({
  address: 'tasks-api.hanzo.ai:443',
});

const client = new Client({
  connection,
  namespace: 'your-org',
});

const handle = await client.workflow.start('OnboardingWorkflow', {
  args: ['new-user@example.com'],
  taskQueue: 'onboarding',
  workflowId: 'onboard-user-123',
});

const result = await handle.result();

Use Cases

AI Agent Orchestration

@workflow.defn
class AgentPipeline:
    @workflow.run
    async def run(self, prompt: str):
        plan = await workflow.execute_activity(plan_task, prompt)
        for step in plan.steps:
            result = await workflow.execute_activity(
                execute_step, step,
                retry_policy=RetryPolicy(maximum_attempts=5),
            )
            if result.needs_human_review:
                approved = await workflow.wait_condition(
                    lambda: self.approved, timeout=timedelta(hours=24)
                )

Commerce Saga (Compensation)

@workflow.defn
class OrderSaga:
    @workflow.run
    async def run(self, order):
        # Each step has a compensating action
        charge = await workflow.execute_activity(charge_card, order)
        try:
            await workflow.execute_activity(reserve_inventory, order)
            await workflow.execute_activity(ship_order, order)
        except Exception:
            # Compensate: refund the charge
            await workflow.execute_activity(refund_charge, charge)
            raise

Scheduled Jobs (Replace CronJobs)

# Start a cron workflow
handle = await client.start_workflow(
    DailyReportWorkflow.run,
    id="daily-report",
    task_queue="reports",
    cron_schedule="0 9 * * MON-FRI",  # 9 AM weekdays
)

Multi-Tenancy

Each Hanzo organization maps to a Tasks namespace:

Hanzo ConceptTasks Concept
OrganizationNamespace
ProjectNamespace prefix (org/project)
IAM UserWorkflow identity
IAM RoleNamespace permission set
KMS SecretActivity credential

Namespace isolation ensures one org's workflows cannot see or affect another's.

Pricing

Hanzo Tasks follows usage-based pricing:

TierActions/moPriceIncludes
Free25,000$01 namespace, 7-day retention
Pro1,000,000$0.035/1K actions10 namespaces, 30-day retention
Team10,000,000$0.025/1K actionsUnlimited namespaces, 90-day retention
EnterpriseUnlimitedCustomDedicated resources, 365-day retention, SLA

An action is a workflow action: workflow start, activity completion, signal, timer, etc.

Configuration

Environment Variables

VariableDescriptionDefault
TASKS_ENDPOINTTasks server gRPC endpointtasks-api.hanzo.ai:443
TASKS_NAMESPACEDefault namespace (org name)-
HANZO_API_KEYAPI key for authentication-
TASKS_TLS_ENABLEDEnable TLS for gRPCtrue

Self-Hosted

# compose.yml
services:
  tasks:
    image: ghcr.io/hanzoai/tasks:latest
    ports:
      - "7233:7233"
    environment:
      DB: postgres12
      POSTGRES_SEEDS: your-postgres-host
      POSTGRES_USER: tasks
      POSTGRES_PWD: ${TASKS_DB_PASSWORD}
  • Hanzo MQ -- Job queues for simpler async processing
  • Hanzo PubSub -- Event streaming backbone
  • Hanzo Stream -- Kafka-compatible streaming
  • Hanzo Cron -- Lightweight scheduled jobs (powered by Tasks)
  • Hanzo Flow -- Visual workflow builder (UI for Tasks)

How is this guide?

Last updated on

On this page