Hanzo
Services

Hanzo MQ

High-performance message queue and job processing built on Hanzo KV (Redis Streams). BullMQ-compatible protocol with reliable delivery, priority queues, and dead letter support.

Hanzo MQ

Hanzo MQ (@hanzo/mq) is a distributed message queue and job processing system built on top of Hanzo KV (Redis/Valkey-compatible). It provides reliable, at-least-once delivery with priority queues, delayed jobs, dead letter queues, rate limiting, and parent-child job dependencies -- all backed by atomic Lua scripts running on Redis Streams.

Package: @hanzo/mq (v5.70.0) Protocol: BullMQ-compatible Backend: Hanzo KV / Valkey / Redis Source: github.com/hanzoai/mq

Features

FeatureDescription
Redis Streams BackendAll operations use atomic Lua scripts on Redis Streams for consistency
At-Least-Once DeliveryJobs are locked during processing and reclaimed on worker failure
Priority Queues2,097,152 priority levels (0 = highest) with O(log n) insertion
Delayed JobsSchedule jobs to execute after a specified delay in milliseconds
Dead Letter QueuesFailed jobs automatically move to a failed set after exhausting retries
Rate LimitingPer-queue rate limits to protect downstream services
Consumer GroupsMultiple workers process jobs from the same queue with automatic load balancing
Repeatable JobsCron-based and interval-based job scheduling with deduplication
Parent-Child DependenciesBuild complex job flows with FlowProducer -- children must complete before parents
Job DeduplicationDebounce and throttle duplicate jobs by custom ID
Sandboxed WorkersIsolate job processing in child processes or worker threads
Pause/ResumePause and resume queues without losing jobs
Global EventsReal-time event streaming for job lifecycle via Redis Streams
Backoff StrategiesBuilt-in exponential and fixed backoff, plus custom strategies
TelemetryOpenTelemetry-compatible tracing for job spans
Multi-LanguageSDKs for TypeScript, Python, Elixir, and PHP

Architecture

┌─────────────────────────────────────────────────────────────────┐
│  Producers                                                      │
│                                                                 │
│  ┌──────────┐  ┌──────────┐  ┌──────────────┐                  │
│  │ Queue    │  │ Queue    │  │ FlowProducer │                  │
│  │ .add()   │  │ .addBulk │  │ .add(tree)   │                  │
│  └────┬─────┘  └────┬─────┘  └──────┬───────┘                  │
│       │              │               │                          │
└───────┼──────────────┼───────────────┼──────────────────────────┘
        │              │               │
        v              v               v
┌─────────────────────────────────────────────────────────────────┐
│  Hanzo KV (Redis Streams)                                       │
│                                                                 │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐           │
│  │ waiting │  │ delayed │  │ active  │  │ events  │           │
│  │ (list)  │  │ (zset)  │  │ (list)  │  │(stream) │           │
│  └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘           │
│       │            │            │             │                 │
│  ┌────┴────┐  ┌────┴────┐  ┌───┴─────┐  ┌───┴──────┐          │
│  │priority │  │ sched.  │  │  lock   │  │completed │          │
│  │ (zset)  │  │ (timer) │  │(per-job)│  │  (set)   │          │
│  └─────────┘  └─────────┘  └─────────┘  └──────────┘          │
│                                          ┌──────────┐          │
│                                          │  failed  │          │
│                                          │  (set)   │          │
│                                          └──────────┘          │
└─────────────────────────────────────────────────────────────────┘
        │              │               │
        v              v               v
┌─────────────────────────────────────────────────────────────────┐
│  Consumers                                                      │
│                                                                 │
│  ┌──────────┐  ┌──────────┐  ┌──────────────┐                  │
│  │ Worker 1 │  │ Worker 2 │  │ Worker N     │                  │
│  │(process) │  │(process) │  │(sandboxed)   │                  │
│  └──────────┘  └──────────┘  └──────────────┘                  │
│                                                                 │
│  ┌──────────────┐                                               │
│  │ QueueEvents  │  ← real-time lifecycle events                 │
│  └──────────────┘                                               │
└─────────────────────────────────────────────────────────────────┘

Job Lifecycle

Every job moves through a defined state machine:

add() ──> waiting ──> active ──> completed
              │          │
              │          ├──> failed (after all attempts)
              │          │       │
              │          │       └──> dead letter (configurable)
              │          │
              │          └──> delayed (on DelayedError)

              └──> delayed (if delay > 0)

                      └──> waiting (when delay expires)

Quick Start

Install

npm install @hanzo/mq

Add Jobs to a Queue

import { Queue } from '@hanzo/mq';

const queue = new Queue('email-notifications', {
  connection: {
    host: 'kv.hanzo.ai',
    port: 6379,
    password: process.env.KV_PASSWORD,
  },
});

// Add a single job
await queue.add('welcome-email', {
  to: '[email protected]',
  template: 'welcome',
  locale: 'en',
});

// Add with options
await queue.add('password-reset', {
  to: '[email protected]',
  token: 'abc123',
}, {
  priority: 1,           // high priority (0 = highest)
  attempts: 3,           // retry up to 3 times
  backoff: {
    type: 'exponential',
    delay: 1000,          // 1s, 2s, 4s
  },
  removeOnComplete: 100,  // keep last 100 completed
  removeOnFail: 500,      // keep last 500 failed
});

// Add with delay
await queue.add('reminder', {
  to: '[email protected]',
}, {
  delay: 60 * 60 * 1000, // 1 hour from now
});

Process Jobs with Workers

import { Worker } from '@hanzo/mq';

const worker = new Worker('email-notifications', async (job) => {
  switch (job.name) {
    case 'welcome-email':
      await sendEmail(job.data.to, job.data.template);
      break;
    case 'password-reset':
      await sendResetEmail(job.data.to, job.data.token);
      break;
  }

  // Optionally report progress
  await job.updateProgress(100);

  return { sent: true, timestamp: Date.now() };
}, {
  connection: {
    host: 'kv.hanzo.ai',
    port: 6379,
    password: process.env.KV_PASSWORD,
  },
  concurrency: 5, // process 5 jobs in parallel
});

worker.on('completed', (job) => {
  console.log(`${job.id} completed: ${JSON.stringify(job.returnvalue)}`);
});

worker.on('failed', (job, err) => {
  console.error(`${job.id} failed: ${err.message}`);
});

Listen to Global Events

import { QueueEvents } from '@hanzo/mq';

const events = new QueueEvents('email-notifications', {
  connection: {
    host: 'kv.hanzo.ai',
    port: 6379,
    password: process.env.KV_PASSWORD,
  },
});

events.on('completed', ({ jobId, returnvalue }) => {
  console.log(`Job ${jobId} done`, returnvalue);
});

events.on('failed', ({ jobId, failedReason }) => {
  console.error(`Job ${jobId} failed: ${failedReason}`);
});

events.on('progress', ({ jobId, data }) => {
  console.log(`Job ${jobId} progress: ${data}%`);
});

Consumer Groups

Multiple workers consuming the same queue name form an implicit consumer group. Hanzo MQ uses atomic Redis operations to ensure each job is delivered to exactly one worker at a time.

import { Worker } from '@hanzo/mq';

// Deploy N instances -- each picks up jobs independently
const worker = new Worker('order-processing', async (job) => {
  await processOrder(job.data);
}, {
  connection: { host: 'kv.hanzo.ai', port: 6379 },
  concurrency: 10,      // 10 concurrent jobs per worker instance
  name: `worker-${process.env.HOSTNAME}`, // track which worker handled each job
  lockDuration: 30000,   // 30s lock per job
  stalledInterval: 15000, // check for stalled jobs every 15s
  maxStalledCount: 2,    // allow 2 stall recoveries before failing
});

Scaling Pattern

                      ┌──────────────┐
                      │ Queue:       │
                      │ order-proc   │
                      └──────┬───────┘

               ┌─────────────┼─────────────┐
               v             v             v
        ┌────────────┐ ┌────────────┐ ┌────────────┐
        │ Worker A   │ │ Worker B   │ │ Worker C   │
        │ c=10       │ │ c=10       │ │ c=10       │
        │ pod-abc    │ │ pod-def    │ │ pod-ghi    │
        └────────────┘ └────────────┘ └────────────┘
              30 concurrent jobs total

Workers auto-balance load -- no coordinator needed. The stalled job checker ensures jobs from crashed workers are re-queued automatically.

Dead Letter Queues

Jobs that fail after exhausting all retry attempts remain in the failed set. You can implement a dead letter queue pattern by monitoring failures and moving them to a separate queue:

import { Queue, Worker, QueueEvents } from '@hanzo/mq';

// Primary queue with retry policy
const primaryQueue = new Queue('payments');
const dlq = new Queue('payments-dlq');

// Worker with retry limits
const worker = new Worker('payments', async (job) => {
  const result = await chargeCard(job.data);
  if (!result.success) {
    throw new Error(`Payment failed: ${result.error}`);
  }
  return result;
}, {
  connection: { host: 'kv.hanzo.ai', port: 6379 },
  attempts: 3,
  backoff: { type: 'exponential', delay: 5000 },
});

// Move permanently failed jobs to DLQ
worker.on('failed', async (job, err) => {
  if (job.attemptsMade >= job.opts.attempts) {
    await dlq.add('failed-payment', {
      originalJobId: job.id,
      originalData: job.data,
      failedReason: err.message,
      failedAt: new Date().toISOString(),
    });
    console.error(`Job ${job.id} moved to DLQ after ${job.attemptsMade} attempts`);
  }
});

// DLQ processor for manual review or alerting
const dlqWorker = new Worker('payments-dlq', async (job) => {
  await alertOpsTeam(job.data);
});

Parent-Child Job Dependencies

The FlowProducer class creates job trees where parent jobs wait for all children to complete before executing:

import { FlowProducer, Worker } from '@hanzo/mq';

const flow = new FlowProducer({
  connection: { host: 'kv.hanzo.ai', port: 6379 },
});

// Build a dependency tree
await flow.add({
  name: 'generate-report',
  queueName: 'reports',
  data: { reportId: 'monthly-2026-02' },
  children: [
    {
      name: 'fetch-sales',
      queueName: 'data-pipeline',
      data: { source: 'sales', month: '2026-02' },
    },
    {
      name: 'fetch-analytics',
      queueName: 'data-pipeline',
      data: { source: 'analytics', month: '2026-02' },
    },
    {
      name: 'fetch-support',
      queueName: 'data-pipeline',
      data: { source: 'support', month: '2026-02' },
    },
  ],
});

// Children execute first (in parallel)
const pipelineWorker = new Worker('data-pipeline', async (job) => {
  const data = await fetchData(job.data.source, job.data.month);
  return data; // return value accessible to parent
});

// Parent executes after all children complete
const reportWorker = new Worker('reports', async (job) => {
  const childValues = await job.getChildrenValues();
  return buildReport(job.data.reportId, childValues);
});

Repeatable and Scheduled Jobs

Schedule recurring jobs with cron expressions or fixed intervals:

import { Queue } from '@hanzo/mq';

const queue = new Queue('scheduled-tasks', {
  connection: { host: 'kv.hanzo.ai', port: 6379 },
});

// Cron schedule -- every day at 3:00 AM UTC
await queue.upsertJobScheduler(
  'daily-cleanup',
  { pattern: '0 3 * * *' },
  'cleanup',
  { maxAge: 30 * 24 * 60 * 60 * 1000 },  // job data
  { attempts: 2 },                         // job options
);

// Fixed interval -- every 5 minutes
await queue.upsertJobScheduler(
  'health-check',
  { every: 5 * 60 * 1000 },
  'ping',
  { targets: ['api', 'db', 'kv'] },
  { removeOnComplete: 10 },
);

Rate Limiting

Protect downstream services by limiting how fast workers process jobs:

import { Worker } from '@hanzo/mq';

const worker = new Worker('api-calls', async (job) => {
  return await callExternalAPI(job.data);
}, {
  connection: { host: 'kv.hanzo.ai', port: 6379 },
  limiter: {
    max: 100,        // max 100 jobs
    duration: 60000, // per 60 seconds
  },
  concurrency: 10,
});

Configuration Reference

Queue Options

OptionTypeDefaultDescription
connectionConnectionOptionslocalhost:6379Hanzo KV / Redis connection
prefixstring"bull"Key prefix for all queue keys
defaultJobOptionsJobOptions{}Default options applied to all jobs
streams.events.maxLennumber10000Max events in the event stream
skipMetasUpdatebooleanfalseSkip metadata updates (read-only mode)

Job Options

OptionTypeDefaultDescription
prioritynumber0Priority level (0 = highest, max 2,097,152)
delaynumber0Delay in ms before job becomes processable
attemptsnumber1Total retry attempts
backoffnumber | BackoffOptions--Backoff strategy for retries
lifobooleanfalseAdd to front of queue instead of back
removeOnCompleteboolean | number | KeepJobsfalseAuto-remove completed jobs
removeOnFailboolean | number | KeepJobsfalseAuto-remove failed jobs
jobIdstringauto-generatedCustom job ID for deduplication
sizeLimitnumber--Max payload size in bytes

Worker Options

OptionTypeDefaultDescription
concurrencynumber1Parallel jobs per worker
lockDurationnumber30000Job lock TTL in ms
stalledIntervalnumber30000Stalled job check interval in ms
maxStalledCountnumber1Max stall recoveries before failing
limiterRateLimiterOptions--Rate limiting configuration
drainDelaynumber5Seconds to wait when queue is empty
removeOnCompleteKeepJobs--Default removal policy for completed jobs
removeOnFailKeepJobs--Default removal policy for failed jobs
useWorkerThreadsbooleanfalseUse worker threads instead of child processes
namestring--Worker name stored on processed jobs

Connection Options

// Direct connection
{
  host: 'kv.hanzo.ai',
  port: 6379,
  password: 'your-password',
  db: 0,
  tls: {},  // enable TLS
}

// Using an existing ioredis instance
import Redis from 'ioredis';
const redis = new Redis('redis://kv.hanzo.ai:6379');
const queue = new Queue('myqueue', { connection: redis });

Hanzo Console Integration

Hanzo Console uses @hanzo/mq internally for background job processing with 27+ queues. The Console worker deployment processes these queues:

// Example: Console-style multi-queue worker setup
import { Worker } from '@hanzo/mq';

const QUEUE_NAMES = [
  'trace-upsert',
  'ingestion-processing',
  'eval-execution',
  'batch-export',
  'email-notification',
  'dataset-run',
  'legacy-ingestion',
  // ... 20+ more queues
];

const connection = {
  host: process.env.REDIS_HOST || 'kv.hanzo.ai',
  port: parseInt(process.env.REDIS_PORT || '6379'),
  password: process.env.REDIS_PASSWORD,
};

for (const queueName of QUEUE_NAMES) {
  new Worker(queueName, processors[queueName], {
    connection,
    concurrency: 5,
    removeOnComplete: { count: 1000 },
    removeOnFail: { count: 5000 },
  });
}

Multi-Language SDKs

Hanzo MQ provides SDKs beyond the primary TypeScript package:

Python

pip install hanzo-mq
from hanzo_mq import Queue, Worker

queue = Queue("tasks", {"connection": {"host": "kv.hanzo.ai", "port": 6379}})

await queue.add("process", {"input": "data"}, {"attempts": 3})

worker = Worker(
    "tasks",
    process_job,
    {"connection": {"host": "kv.hanzo.ai", "port": 6379}},
)

Elixir and PHP

Community-maintained SDKs are available in the elixir/ and php/ directories of the source repository. They share the same Lua command scripts and maintain wire-level compatibility.

How is this guide?

Last updated on

On this page