Hanzo PubSub
High-performance publish/subscribe messaging with persistent streaming, key-value store, and object storage
Hanzo PubSub
Hanzo PubSub is a high-performance message broker for publish/subscribe messaging, request/reply, queue groups, and persistent streaming. It serves as the foundational messaging layer across the Hanzo platform -- powering event-driven architectures, real-time data pipelines, microservice communication, and edge-to-cloud connectivity.
Endpoint: pubsub.hanzo.ai:4222
WebSocket: wss://pubsub.hanzo.ai:5222
Monitoring: pubsub.hanzo.ai:8222
Gateway: api.hanzo.ai/v1/pubsub/*
Features
- Publish/Subscribe: Subject-based messaging with wildcard subscriptions (
>,*) for flexible topic hierarchies - Request/Reply: Synchronous request/reply with automatic inbox routing and timeouts
- Queue Groups: Distribute messages across subscriber groups for automatic load balancing
- JetStream: Persistent streaming with at-least-once and exactly-once delivery, replay policies, and consumer groups
- Key-Value Store: Distributed key-value storage built on JetStream with watch, history, and TTL support
- Object Store: Large object storage (files, blobs) with chunking, SHA-256 integrity, and metadata
- Clustering: Full-mesh cluster replication with Raft consensus for JetStream fault tolerance
- Super-Clusters: Gateway connections between clusters for global multi-region topologies
- Leaf Nodes: Lightweight edge nodes that bridge to hub clusters with subject filtering
- WebSocket: Native WebSocket transport for browser and edge clients on port 5222
- TLS/mTLS: End-to-end encryption with mutual TLS, certificate pinning, and OCSP stapling
- Multi-Tenancy: Account-based isolation with per-account JetStream limits and subject import/export
Architecture
┌───────────────────────────────────────────────────────────────────────┐
│ Clients │
│ Go (pubsub-go) Python JavaScript Rust WebSocket CLI │
└──────────┬────────────────────┬───────────────────────┬───────────────┘
│ TCP :4222 │ WS :5222 │ Leaf :7422
v v v
┌───────────────────────────────────────────────────────────────────────┐
│ Hanzo PubSub Cluster │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ PubSub-1 │──│ PubSub-2 │──│ PubSub-3 │ Full-mesh │
│ │ (leader) │ │ │ │ │ route :6222 │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────┬───────┘────────┬───────┘ │
│ │ │ │
│ ┌──────┴──────┐ ┌─────┴──────┐ │
│ │ JetStream │ │ Accounts │ │
│ │ (Raft R3) │ │ (multi- │ │
│ │ │ │ tenant) │ │
│ └──────┬──────┘ └────────────┘ │
│ │ │
│ ┌─────────────┼─────────────┐ │
│ │ │ │ │
│ ┌─┴──────┐ ┌───┴────┐ ┌─────┴──┐ │
│ │Streams │ │ KV │ │ Object │ │
│ │ │ │ Store │ │ Store │ │
│ └────────┘ └────────┘ └────────┘ │
├───────────────────────────────────────────────────────────────────────┤
│ Gateway :7222 (super-cluster) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Region A │◄──────►│ Region B │◄──────►│ Region C │ │
│ │ Cluster │ │ Cluster │ │ Cluster │ │
│ └──────────┘ └──────────┘ └──────────┘ │
├───────────────────────────────────────────────────────────────────────┤
│ Leaf Nodes :7422 (edge) │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Edge-1 │ │ Edge-2 │ │ Edge-3 │ Filtered subject bridging │
│ └────────┘ └────────┘ └────────┘ │
└───────────────────────────────────────────────────────────────────────┘Port Allocation
| Port | Protocol | Purpose |
|---|---|---|
| 4222 | TCP | Client connections |
| 5222 | WebSocket | Browser and edge clients |
| 6222 | TCP | Cluster route connections |
| 7222 | TCP | Gateway (super-cluster) connections |
| 7422 | TCP | Leaf node connections |
| 8222 | HTTP | Monitoring and metrics |
Quick Start
CLI
# Publish a message
pubsub pub orders.created '{"id":"order-123","total":59.99}'
# Subscribe with wildcard
pubsub sub "orders.>"
# Request/reply
pubsub request inventory.check '{"sku":"ABC-100"}' --timeout 2s
# Queue group subscriber
pubsub sub orders.created --queue workersGo
package main
import (
"context"
"fmt"
"log"
"time"
pubsub "github.com/hanzoai/pubsub-go"
"github.com/hanzoai/pubsub-go/jetstream"
)
func main() {
nc, err := pubsub.Connect("pubsub.hanzo.ai:4222",
pubsub.UserCredentials("user.creds"),
)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Core pub/sub
nc.Subscribe("orders.>", func(msg *pubsub.Msg) {
fmt.Printf("Received: %s\n", msg.Data)
})
nc.Publish("orders.created", []byte(`{"id":"order-123"}`))
// Request/reply
resp, _ := nc.Request("inventory.check",
[]byte(`{"sku":"ABC-100"}`), 2*time.Second)
fmt.Printf("Reply: %s\n", resp.Data)
// JetStream persistent streaming
ctx := context.Background()
js, _ := jetstream.New(nc)
stream, _ := js.CreateStream(ctx, jetstream.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.>"},
Storage: jetstream.FileStorage,
Replicas: 3,
})
js.Publish(ctx, "orders.created", []byte(`{"id":"order-456"}`))
cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "order-processor",
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: "orders.created",
})
cons.Consume(func(msg jetstream.Msg) {
fmt.Printf("Stream msg: %s\n", msg.Data())
msg.Ack()
})
}Python
import asyncio, json
from hanzo.pubsub import connect
async def main():
nc = await connect("pubsub.hanzo.ai:4222")
# Subscribe
async def handler(msg):
print(f"Received on {msg.subject}: {json.loads(msg.data)}")
await nc.subscribe("orders.>", cb=handler)
# Publish
await nc.publish("orders.created",
json.dumps({"id": "order-789"}).encode())
# JetStream
js = nc.jetstream()
await js.add_stream(name="EVENTS", subjects=["events.>"])
ack = await js.publish("events.user.signup",
json.dumps({"user": "alice"}).encode())
sub = await js.pull_subscribe("events.>", durable="my-worker")
for msg in await sub.fetch(batch=10, timeout=5):
print(f"JetStream: {msg.data.decode()}")
await msg.ack()
await nc.close()
asyncio.run(main())JavaScript
import { connect, StringCodec, jetstream, jetstreamManager } from "hanzo-pubsub";
const nc = await connect({ servers: "pubsub.hanzo.ai:4222" });
const sc = StringCodec();
// Core pub/sub
const sub = nc.subscribe("orders.>");
(async () => {
for await (const msg of sub) {
console.log(`Received: ${sc.decode(msg.data)}`);
}
})();
nc.publish("orders.created", sc.encode('{"id":"order-101"}'));
// JetStream
const jsm = await jetstreamManager(nc);
await jsm.streams.add({ name: "ORDERS", subjects: ["orders.>"] });
const js = jetstream(nc);
await js.publish("orders.created", sc.encode('{"id":"order-202"}'));
const consumer = await js.consumers.get("ORDERS", "processor");
const messages = await consumer.consume();
for await (const msg of messages) {
console.log(`Stream: ${sc.decode(msg.data)}`);
msg.ack();
}
await nc.close();JetStream
JetStream adds persistent streaming to Hanzo PubSub. Messages published to JetStream-enabled subjects are durably stored and can be replayed, filtered, and consumed by multiple independent consumers.
Streams
A stream captures messages from one or more subjects into ordered, append-only storage.
| Property | Description |
|---|---|
Name | Unique stream identifier |
Subjects | Subjects to capture (supports wildcards) |
Storage | file (disk) or memory (RAM) |
Replicas | Number of replicas (1, 3, or 5) for Raft consensus |
Retention | limits (default), interest, or workqueue |
MaxMsgs | Maximum number of messages to retain |
MaxBytes | Maximum total bytes to retain |
MaxAge | Maximum age of messages (e.g., 24h) |
Discard | old (drop oldest) or new (reject new) when limits hit |
Consumers
Consumers track position in a stream and deliver messages to subscribers.
| Type | Description |
|---|---|
| Durable | Server-side state survives client disconnects; named and resumable |
| Ephemeral | Automatically cleaned up after idle timeout |
| Ordered | Lightweight client-side ordered delivery with auto-recovery |
Deliver policies: all, last, new, by_start_sequence, by_start_time, last_per_subject
Ack policies: explicit (manual ack), none (fire-and-forget), all (cumulative)
Exactly-Once Delivery
JetStream supports exactly-once semantics using message deduplication on the publish side (via Msg-Id header) and double-ack on the consume side.
Key-Value Store
Hanzo PubSub includes a distributed Key-Value store built on top of JetStream. It provides familiar KV semantics with stream-backed replication and real-time change notifications.
ctx := context.Background()
js, _ := jetstream.New(nc)
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "SESSIONS",
History: 5,
TTL: 1 * time.Hour,
Replicas: 3,
})
kv.Put(ctx, "user.alice", []byte(`{"role":"admin"}`))
entry, _ := kv.Get(ctx, "user.alice")
fmt.Printf("Value: %s (rev %d)\n", entry.Value(), entry.Revision())
// Watch for real-time changes
watcher, _ := kv.WatchAll(ctx)
for entry := range watcher.Updates() {
if entry != nil {
fmt.Printf("Changed: %s = %s\n", entry.Key(), entry.Value())
}
}Capabilities: Get, Put, Delete, Purge, Create (CAS), Update (CAS), Watch (wildcards), History, TTL, configurable replicas (1/3/5).
Object Store
Storage for large objects (files, model weights, datasets) with automatic chunking, SHA-256 integrity, and metadata tracking.
os, _ := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{
Bucket: "MODELS",
Replicas: 3,
})
file, _ := os.Open("model.safetensors")
os.Put(ctx, jetstream.ObjectMeta{Name: "llm/v1/weights"}, file)
reader, _ := os.Get(ctx, "llm/v1/weights")
// ... stream large object from readerConfiguration
# /etc/pubsub/server.conf
listen: 0.0.0.0:4222
max_payload: 8MB
max_connections: 65536
jetstream {
store_dir: /data/pubsub/jetstream
max_memory: 4GB
max_storage: 100GB
domain: hanzo
}
server_name: pubsub-1
cluster {
name: hanzo-pubsub
listen: 0.0.0.0:6222
routes: [
pubsub://pubsub-1:6222
pubsub://pubsub-2:6222
pubsub://pubsub-3:6222
]
}
websocket {
listen: 0.0.0.0:5222
tls {
cert_file: /etc/pubsub/tls/cert.pem
key_file: /etc/pubsub/tls/key.pem
}
}
leafnodes {
listen: 0.0.0.0:7422
}
gateway {
name: region-us
listen: 0.0.0.0:7222
gateways: [
{ name: region-eu, urls: ["pubsub://eu-pubsub-1:7222"] }
{ name: region-ap, urls: ["pubsub://ap-pubsub-1:7222"] }
]
}
tls {
cert_file: /etc/pubsub/tls/cert.pem
key_file: /etc/pubsub/tls/key.pem
ca_file: /etc/pubsub/tls/ca.pem
verify: true
}
http_port: 8222Key Parameters
| Parameter | Default | Description |
|---|---|---|
listen | 0.0.0.0:4222 | Client listener address and port |
max_payload | 1MB | Maximum message payload size |
max_connections | 65536 | Maximum concurrent client connections |
jetstream.max_memory | auto | Maximum memory for JetStream memory storage |
jetstream.max_storage | auto | Maximum disk for JetStream file storage |
jetstream.store_dir | /tmp/pubsub | JetStream data directory |
cluster.name | -- | Cluster name for route discovery |
websocket.listen | -- | WebSocket listener (disabled if unset) |
leafnodes.listen | -- | Leaf node listener (disabled if unset) |
gateway.name | -- | Super-cluster gateway name |
tls.verify | false | Enable mutual TLS client verification |
Monitoring Endpoints
| Endpoint | Description |
|---|---|
GET /varz | General server information and statistics |
GET /connz | Active client connection details |
GET /routez | Cluster route information |
GET /subsz | Subscription routing tree |
GET /jsz | JetStream account and usage information |
GET /healthz | Health check (returns 200 when ready) |
GET /gatewayz | Super-cluster gateway status |
GET /leafz | Leaf node connection details |
Docker
docker run -d --name pubsub \
-p 4222:4222 -p 5222:5222 -p 8222:8222 \
-v pubsub-data:/data/pubsub \
ghcr.io/hanzoai/pubsub:latest \
-js -sd /data/pubsub/jetstreamRelated Services
How is this guide?
Last updated on