Hanzo MQ
High-performance message queue and job processing built on Hanzo KV (Redis Streams). BullMQ-compatible protocol with reliable delivery, priority queues, and dead letter support.
Hanzo MQ
Hanzo MQ (@hanzo/mq) is a distributed message queue and job processing system built on top of Hanzo KV (Redis/Valkey-compatible). It provides reliable, at-least-once delivery with priority queues, delayed jobs, dead letter queues, rate limiting, and parent-child job dependencies -- all backed by atomic Lua scripts running on Redis Streams.
Package: @hanzo/mq (v5.70.0)
Protocol: BullMQ-compatible
Backend: Hanzo KV / Valkey / Redis
Source: github.com/hanzoai/mq
Features
| Feature | Description |
|---|---|
| Redis Streams Backend | All operations use atomic Lua scripts on Redis Streams for consistency |
| At-Least-Once Delivery | Jobs are locked during processing and reclaimed on worker failure |
| Priority Queues | 2,097,152 priority levels (0 = highest) with O(log n) insertion |
| Delayed Jobs | Schedule jobs to execute after a specified delay in milliseconds |
| Dead Letter Queues | Failed jobs automatically move to a failed set after exhausting retries |
| Rate Limiting | Per-queue rate limits to protect downstream services |
| Consumer Groups | Multiple workers process jobs from the same queue with automatic load balancing |
| Repeatable Jobs | Cron-based and interval-based job scheduling with deduplication |
| Parent-Child Dependencies | Build complex job flows with FlowProducer -- children must complete before parents |
| Job Deduplication | Debounce and throttle duplicate jobs by custom ID |
| Sandboxed Workers | Isolate job processing in child processes or worker threads |
| Pause/Resume | Pause and resume queues without losing jobs |
| Global Events | Real-time event streaming for job lifecycle via Redis Streams |
| Backoff Strategies | Built-in exponential and fixed backoff, plus custom strategies |
| Telemetry | OpenTelemetry-compatible tracing for job spans |
| Multi-Language | SDKs for TypeScript, Python, Elixir, and PHP |
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Producers │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ Queue │ │ Queue │ │ FlowProducer │ │
│ │ .add() │ │ .addBulk │ │ .add(tree) │ │
│ └────┬─────┘ └────┬─────┘ └──────┬───────┘ │
│ │ │ │ │
└───────┼──────────────┼───────────────┼──────────────────────────┘
│ │ │
v v v
┌─────────────────────────────────────────────────────────────────┐
│ Hanzo KV (Redis Streams) │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ waiting │ │ delayed │ │ active │ │ events │ │
│ │ (list) │ │ (zset) │ │ (list) │ │(stream) │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │
│ ┌────┴────┐ ┌────┴────┐ ┌───┴─────┐ ┌───┴──────┐ │
│ │priority │ │ sched. │ │ lock │ │completed │ │
│ │ (zset) │ │ (timer) │ │(per-job)│ │ (set) │ │
│ └─────────┘ └─────────┘ └─────────┘ └──────────┘ │
│ ┌──────────┐ │
│ │ failed │ │
│ │ (set) │ │
│ └──────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │ │
v v v
┌─────────────────────────────────────────────────────────────────┐
│ Consumers │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ Worker 1 │ │ Worker 2 │ │ Worker N │ │
│ │(process) │ │(process) │ │(sandboxed) │ │
│ └──────────┘ └──────────┘ └──────────────┘ │
│ │
│ ┌──────────────┐ │
│ │ QueueEvents │ ← real-time lifecycle events │
│ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘Job Lifecycle
Every job moves through a defined state machine:
add() ──> waiting ──> active ──> completed
│ │
│ ├──> failed (after all attempts)
│ │ │
│ │ └──> dead letter (configurable)
│ │
│ └──> delayed (on DelayedError)
│
└──> delayed (if delay > 0)
│
└──> waiting (when delay expires)Quick Start
Install
npm install @hanzo/mqAdd Jobs to a Queue
import { Queue } from '@hanzo/mq';
const queue = new Queue('email-notifications', {
connection: {
host: 'kv.hanzo.ai',
port: 6379,
password: process.env.KV_PASSWORD,
},
});
// Add a single job
await queue.add('welcome-email', {
to: '[email protected]',
template: 'welcome',
locale: 'en',
});
// Add with options
await queue.add('password-reset', {
to: '[email protected]',
token: 'abc123',
}, {
priority: 1, // high priority (0 = highest)
attempts: 3, // retry up to 3 times
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s
},
removeOnComplete: 100, // keep last 100 completed
removeOnFail: 500, // keep last 500 failed
});
// Add with delay
await queue.add('reminder', {
to: '[email protected]',
}, {
delay: 60 * 60 * 1000, // 1 hour from now
});Process Jobs with Workers
import { Worker } from '@hanzo/mq';
const worker = new Worker('email-notifications', async (job) => {
switch (job.name) {
case 'welcome-email':
await sendEmail(job.data.to, job.data.template);
break;
case 'password-reset':
await sendResetEmail(job.data.to, job.data.token);
break;
}
// Optionally report progress
await job.updateProgress(100);
return { sent: true, timestamp: Date.now() };
}, {
connection: {
host: 'kv.hanzo.ai',
port: 6379,
password: process.env.KV_PASSWORD,
},
concurrency: 5, // process 5 jobs in parallel
});
worker.on('completed', (job) => {
console.log(`${job.id} completed: ${JSON.stringify(job.returnvalue)}`);
});
worker.on('failed', (job, err) => {
console.error(`${job.id} failed: ${err.message}`);
});Listen to Global Events
import { QueueEvents } from '@hanzo/mq';
const events = new QueueEvents('email-notifications', {
connection: {
host: 'kv.hanzo.ai',
port: 6379,
password: process.env.KV_PASSWORD,
},
});
events.on('completed', ({ jobId, returnvalue }) => {
console.log(`Job ${jobId} done`, returnvalue);
});
events.on('failed', ({ jobId, failedReason }) => {
console.error(`Job ${jobId} failed: ${failedReason}`);
});
events.on('progress', ({ jobId, data }) => {
console.log(`Job ${jobId} progress: ${data}%`);
});Consumer Groups
Multiple workers consuming the same queue name form an implicit consumer group. Hanzo MQ uses atomic Redis operations to ensure each job is delivered to exactly one worker at a time.
import { Worker } from '@hanzo/mq';
// Deploy N instances -- each picks up jobs independently
const worker = new Worker('order-processing', async (job) => {
await processOrder(job.data);
}, {
connection: { host: 'kv.hanzo.ai', port: 6379 },
concurrency: 10, // 10 concurrent jobs per worker instance
name: `worker-${process.env.HOSTNAME}`, // track which worker handled each job
lockDuration: 30000, // 30s lock per job
stalledInterval: 15000, // check for stalled jobs every 15s
maxStalledCount: 2, // allow 2 stall recoveries before failing
});Scaling Pattern
┌──────────────┐
│ Queue: │
│ order-proc │
└──────┬───────┘
│
┌─────────────┼─────────────┐
v v v
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Worker A │ │ Worker B │ │ Worker C │
│ c=10 │ │ c=10 │ │ c=10 │
│ pod-abc │ │ pod-def │ │ pod-ghi │
└────────────┘ └────────────┘ └────────────┘
30 concurrent jobs totalWorkers auto-balance load -- no coordinator needed. The stalled job checker ensures jobs from crashed workers are re-queued automatically.
Dead Letter Queues
Jobs that fail after exhausting all retry attempts remain in the failed set. You can implement a dead letter queue pattern by monitoring failures and moving them to a separate queue:
import { Queue, Worker, QueueEvents } from '@hanzo/mq';
// Primary queue with retry policy
const primaryQueue = new Queue('payments');
const dlq = new Queue('payments-dlq');
// Worker with retry limits
const worker = new Worker('payments', async (job) => {
const result = await chargeCard(job.data);
if (!result.success) {
throw new Error(`Payment failed: ${result.error}`);
}
return result;
}, {
connection: { host: 'kv.hanzo.ai', port: 6379 },
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
});
// Move permanently failed jobs to DLQ
worker.on('failed', async (job, err) => {
if (job.attemptsMade >= job.opts.attempts) {
await dlq.add('failed-payment', {
originalJobId: job.id,
originalData: job.data,
failedReason: err.message,
failedAt: new Date().toISOString(),
});
console.error(`Job ${job.id} moved to DLQ after ${job.attemptsMade} attempts`);
}
});
// DLQ processor for manual review or alerting
const dlqWorker = new Worker('payments-dlq', async (job) => {
await alertOpsTeam(job.data);
});Parent-Child Job Dependencies
The FlowProducer class creates job trees where parent jobs wait for all children to complete before executing:
import { FlowProducer, Worker } from '@hanzo/mq';
const flow = new FlowProducer({
connection: { host: 'kv.hanzo.ai', port: 6379 },
});
// Build a dependency tree
await flow.add({
name: 'generate-report',
queueName: 'reports',
data: { reportId: 'monthly-2026-02' },
children: [
{
name: 'fetch-sales',
queueName: 'data-pipeline',
data: { source: 'sales', month: '2026-02' },
},
{
name: 'fetch-analytics',
queueName: 'data-pipeline',
data: { source: 'analytics', month: '2026-02' },
},
{
name: 'fetch-support',
queueName: 'data-pipeline',
data: { source: 'support', month: '2026-02' },
},
],
});
// Children execute first (in parallel)
const pipelineWorker = new Worker('data-pipeline', async (job) => {
const data = await fetchData(job.data.source, job.data.month);
return data; // return value accessible to parent
});
// Parent executes after all children complete
const reportWorker = new Worker('reports', async (job) => {
const childValues = await job.getChildrenValues();
return buildReport(job.data.reportId, childValues);
});Repeatable and Scheduled Jobs
Schedule recurring jobs with cron expressions or fixed intervals:
import { Queue } from '@hanzo/mq';
const queue = new Queue('scheduled-tasks', {
connection: { host: 'kv.hanzo.ai', port: 6379 },
});
// Cron schedule -- every day at 3:00 AM UTC
await queue.upsertJobScheduler(
'daily-cleanup',
{ pattern: '0 3 * * *' },
'cleanup',
{ maxAge: 30 * 24 * 60 * 60 * 1000 }, // job data
{ attempts: 2 }, // job options
);
// Fixed interval -- every 5 minutes
await queue.upsertJobScheduler(
'health-check',
{ every: 5 * 60 * 1000 },
'ping',
{ targets: ['api', 'db', 'kv'] },
{ removeOnComplete: 10 },
);Rate Limiting
Protect downstream services by limiting how fast workers process jobs:
import { Worker } from '@hanzo/mq';
const worker = new Worker('api-calls', async (job) => {
return await callExternalAPI(job.data);
}, {
connection: { host: 'kv.hanzo.ai', port: 6379 },
limiter: {
max: 100, // max 100 jobs
duration: 60000, // per 60 seconds
},
concurrency: 10,
});Configuration Reference
Queue Options
| Option | Type | Default | Description |
|---|---|---|---|
connection | ConnectionOptions | localhost:6379 | Hanzo KV / Redis connection |
prefix | string | "bull" | Key prefix for all queue keys |
defaultJobOptions | JobOptions | {} | Default options applied to all jobs |
streams.events.maxLen | number | 10000 | Max events in the event stream |
skipMetasUpdate | boolean | false | Skip metadata updates (read-only mode) |
Job Options
| Option | Type | Default | Description |
|---|---|---|---|
priority | number | 0 | Priority level (0 = highest, max 2,097,152) |
delay | number | 0 | Delay in ms before job becomes processable |
attempts | number | 1 | Total retry attempts |
backoff | number | BackoffOptions | -- | Backoff strategy for retries |
lifo | boolean | false | Add to front of queue instead of back |
removeOnComplete | boolean | number | KeepJobs | false | Auto-remove completed jobs |
removeOnFail | boolean | number | KeepJobs | false | Auto-remove failed jobs |
jobId | string | auto-generated | Custom job ID for deduplication |
sizeLimit | number | -- | Max payload size in bytes |
Worker Options
| Option | Type | Default | Description |
|---|---|---|---|
concurrency | number | 1 | Parallel jobs per worker |
lockDuration | number | 30000 | Job lock TTL in ms |
stalledInterval | number | 30000 | Stalled job check interval in ms |
maxStalledCount | number | 1 | Max stall recoveries before failing |
limiter | RateLimiterOptions | -- | Rate limiting configuration |
drainDelay | number | 5 | Seconds to wait when queue is empty |
removeOnComplete | KeepJobs | -- | Default removal policy for completed jobs |
removeOnFail | KeepJobs | -- | Default removal policy for failed jobs |
useWorkerThreads | boolean | false | Use worker threads instead of child processes |
name | string | -- | Worker name stored on processed jobs |
Connection Options
// Direct connection
{
host: 'kv.hanzo.ai',
port: 6379,
password: 'your-password',
db: 0,
tls: {}, // enable TLS
}
// Using an existing ioredis instance
import Redis from 'ioredis';
const redis = new Redis('redis://kv.hanzo.ai:6379');
const queue = new Queue('myqueue', { connection: redis });Hanzo Console Integration
Hanzo Console uses @hanzo/mq internally for background job processing with 27+ queues. The Console worker deployment processes these queues:
// Example: Console-style multi-queue worker setup
import { Worker } from '@hanzo/mq';
const QUEUE_NAMES = [
'trace-upsert',
'ingestion-processing',
'eval-execution',
'batch-export',
'email-notification',
'dataset-run',
'legacy-ingestion',
// ... 20+ more queues
];
const connection = {
host: process.env.REDIS_HOST || 'kv.hanzo.ai',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
};
for (const queueName of QUEUE_NAMES) {
new Worker(queueName, processors[queueName], {
connection,
concurrency: 5,
removeOnComplete: { count: 1000 },
removeOnFail: { count: 5000 },
});
}Multi-Language SDKs
Hanzo MQ provides SDKs beyond the primary TypeScript package:
Python
pip install hanzo-mqfrom hanzo_mq import Queue, Worker
queue = Queue("tasks", {"connection": {"host": "kv.hanzo.ai", "port": 6379}})
await queue.add("process", {"input": "data"}, {"attempts": 3})
worker = Worker(
"tasks",
process_job,
{"connection": {"host": "kv.hanzo.ai", "port": 6379}},
)Elixir and PHP
Community-maintained SDKs are available in the elixir/ and php/ directories of the source repository. They share the same Lua command scripts and maintain wire-level compatibility.
Related Services
How is this guide?
Last updated on