Hanzo
Services

Hanzo PubSub

High-performance publish/subscribe messaging with persistent streaming, key-value store, and object storage

Hanzo PubSub

Hanzo PubSub is a high-performance message broker for publish/subscribe messaging, request/reply, queue groups, and persistent streaming. It serves as the foundational messaging layer across the Hanzo platform -- powering event-driven architectures, real-time data pipelines, microservice communication, and edge-to-cloud connectivity.

Endpoint: pubsub.hanzo.ai:4222 WebSocket: wss://pubsub.hanzo.ai:5222 Monitoring: pubsub.hanzo.ai:8222 Gateway: api.hanzo.ai/v1/pubsub/*

Features

  • Publish/Subscribe: Subject-based messaging with wildcard subscriptions (>, *) for flexible topic hierarchies
  • Request/Reply: Synchronous request/reply with automatic inbox routing and timeouts
  • Queue Groups: Distribute messages across subscriber groups for automatic load balancing
  • JetStream: Persistent streaming with at-least-once and exactly-once delivery, replay policies, and consumer groups
  • Key-Value Store: Distributed key-value storage built on JetStream with watch, history, and TTL support
  • Object Store: Large object storage (files, blobs) with chunking, SHA-256 integrity, and metadata
  • Clustering: Full-mesh cluster replication with Raft consensus for JetStream fault tolerance
  • Super-Clusters: Gateway connections between clusters for global multi-region topologies
  • Leaf Nodes: Lightweight edge nodes that bridge to hub clusters with subject filtering
  • WebSocket: Native WebSocket transport for browser and edge clients on port 5222
  • TLS/mTLS: End-to-end encryption with mutual TLS, certificate pinning, and OCSP stapling
  • Multi-Tenancy: Account-based isolation with per-account JetStream limits and subject import/export

Architecture

┌───────────────────────────────────────────────────────────────────────┐
│  Clients                                                              │
│  Go (pubsub-go)  Python  JavaScript  Rust  WebSocket  CLI            │
└──────────┬────────────────────┬───────────────────────┬───────────────┘
           │ TCP :4222          │ WS :5222              │ Leaf :7422
           v                   v                       v
┌───────────────────────────────────────────────────────────────────────┐
│  Hanzo PubSub Cluster                                                 │
│                                                                       │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                   │
│  │  PubSub-1   │──│  PubSub-2   │──│  PubSub-3   │   Full-mesh      │
│  │  (leader)   │  │             │  │             │   route :6222    │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘                   │
│         │                │                │                           │
│         └────────┬───────┘────────┬───────┘                           │
│                  │                │                                    │
│           ┌──────┴──────┐  ┌─────┴──────┐                             │
│           │  JetStream  │  │  Accounts  │                             │
│           │  (Raft R3)  │  │  (multi-   │                             │
│           │             │  │   tenant)  │                             │
│           └──────┬──────┘  └────────────┘                             │
│                  │                                                     │
│    ┌─────────────┼─────────────┐                                      │
│    │             │             │                                       │
│  ┌─┴──────┐ ┌───┴────┐ ┌─────┴──┐                                    │
│  │Streams │ │  KV    │ │ Object │                                     │
│  │        │ │ Store  │ │ Store  │                                     │
│  └────────┘ └────────┘ └────────┘                                     │
├───────────────────────────────────────────────────────────────────────┤
│  Gateway :7222 (super-cluster)                                        │
│  ┌──────────┐        ┌──────────┐        ┌──────────┐                 │
│  │ Region A │◄──────►│ Region B │◄──────►│ Region C │                 │
│  │ Cluster  │        │ Cluster  │        │ Cluster  │                 │
│  └──────────┘        └──────────┘        └──────────┘                 │
├───────────────────────────────────────────────────────────────────────┤
│  Leaf Nodes :7422 (edge)                                              │
│  ┌────────┐  ┌────────┐  ┌────────┐                                   │
│  │ Edge-1 │  │ Edge-2 │  │ Edge-3 │  Filtered subject bridging       │
│  └────────┘  └────────┘  └────────┘                                   │
└───────────────────────────────────────────────────────────────────────┘

Port Allocation

PortProtocolPurpose
4222TCPClient connections
5222WebSocketBrowser and edge clients
6222TCPCluster route connections
7222TCPGateway (super-cluster) connections
7422TCPLeaf node connections
8222HTTPMonitoring and metrics

Quick Start

CLI

# Publish a message
pubsub pub orders.created '{"id":"order-123","total":59.99}'

# Subscribe with wildcard
pubsub sub "orders.>"

# Request/reply
pubsub request inventory.check '{"sku":"ABC-100"}' --timeout 2s

# Queue group subscriber
pubsub sub orders.created --queue workers

Go

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    pubsub "github.com/hanzoai/pubsub-go"
    "github.com/hanzoai/pubsub-go/jetstream"
)

func main() {
    nc, err := pubsub.Connect("pubsub.hanzo.ai:4222",
        pubsub.UserCredentials("user.creds"),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    // Core pub/sub
    nc.Subscribe("orders.>", func(msg *pubsub.Msg) {
        fmt.Printf("Received: %s\n", msg.Data)
    })
    nc.Publish("orders.created", []byte(`{"id":"order-123"}`))

    // Request/reply
    resp, _ := nc.Request("inventory.check",
        []byte(`{"sku":"ABC-100"}`), 2*time.Second)
    fmt.Printf("Reply: %s\n", resp.Data)

    // JetStream persistent streaming
    ctx := context.Background()
    js, _ := jetstream.New(nc)

    stream, _ := js.CreateStream(ctx, jetstream.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"orders.>"},
        Storage:  jetstream.FileStorage,
        Replicas: 3,
    })

    js.Publish(ctx, "orders.created", []byte(`{"id":"order-456"}`))

    cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
        Durable:       "order-processor",
        AckPolicy:     jetstream.AckExplicitPolicy,
        FilterSubject: "orders.created",
    })

    cons.Consume(func(msg jetstream.Msg) {
        fmt.Printf("Stream msg: %s\n", msg.Data())
        msg.Ack()
    })
}

Python

import asyncio, json
from hanzo.pubsub import connect

async def main():
    nc = await connect("pubsub.hanzo.ai:4222")

    # Subscribe
    async def handler(msg):
        print(f"Received on {msg.subject}: {json.loads(msg.data)}")
    await nc.subscribe("orders.>", cb=handler)

    # Publish
    await nc.publish("orders.created",
        json.dumps({"id": "order-789"}).encode())

    # JetStream
    js = nc.jetstream()
    await js.add_stream(name="EVENTS", subjects=["events.>"])
    ack = await js.publish("events.user.signup",
        json.dumps({"user": "alice"}).encode())

    sub = await js.pull_subscribe("events.>", durable="my-worker")
    for msg in await sub.fetch(batch=10, timeout=5):
        print(f"JetStream: {msg.data.decode()}")
        await msg.ack()

    await nc.close()

asyncio.run(main())

JavaScript

import { connect, StringCodec, jetstream, jetstreamManager } from "hanzo-pubsub";

const nc = await connect({ servers: "pubsub.hanzo.ai:4222" });
const sc = StringCodec();

// Core pub/sub
const sub = nc.subscribe("orders.>");
(async () => {
  for await (const msg of sub) {
    console.log(`Received: ${sc.decode(msg.data)}`);
  }
})();
nc.publish("orders.created", sc.encode('{"id":"order-101"}'));

// JetStream
const jsm = await jetstreamManager(nc);
await jsm.streams.add({ name: "ORDERS", subjects: ["orders.>"] });

const js = jetstream(nc);
await js.publish("orders.created", sc.encode('{"id":"order-202"}'));

const consumer = await js.consumers.get("ORDERS", "processor");
const messages = await consumer.consume();
for await (const msg of messages) {
  console.log(`Stream: ${sc.decode(msg.data)}`);
  msg.ack();
}
await nc.close();

JetStream

JetStream adds persistent streaming to Hanzo PubSub. Messages published to JetStream-enabled subjects are durably stored and can be replayed, filtered, and consumed by multiple independent consumers.

Streams

A stream captures messages from one or more subjects into ordered, append-only storage.

PropertyDescription
NameUnique stream identifier
SubjectsSubjects to capture (supports wildcards)
Storagefile (disk) or memory (RAM)
ReplicasNumber of replicas (1, 3, or 5) for Raft consensus
Retentionlimits (default), interest, or workqueue
MaxMsgsMaximum number of messages to retain
MaxBytesMaximum total bytes to retain
MaxAgeMaximum age of messages (e.g., 24h)
Discardold (drop oldest) or new (reject new) when limits hit

Consumers

Consumers track position in a stream and deliver messages to subscribers.

TypeDescription
DurableServer-side state survives client disconnects; named and resumable
EphemeralAutomatically cleaned up after idle timeout
OrderedLightweight client-side ordered delivery with auto-recovery

Deliver policies: all, last, new, by_start_sequence, by_start_time, last_per_subject

Ack policies: explicit (manual ack), none (fire-and-forget), all (cumulative)

Exactly-Once Delivery

JetStream supports exactly-once semantics using message deduplication on the publish side (via Msg-Id header) and double-ack on the consume side.

Key-Value Store

Hanzo PubSub includes a distributed Key-Value store built on top of JetStream. It provides familiar KV semantics with stream-backed replication and real-time change notifications.

ctx := context.Background()
js, _ := jetstream.New(nc)

kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
    Bucket:   "SESSIONS",
    History:  5,
    TTL:      1 * time.Hour,
    Replicas: 3,
})

kv.Put(ctx, "user.alice", []byte(`{"role":"admin"}`))
entry, _ := kv.Get(ctx, "user.alice")
fmt.Printf("Value: %s (rev %d)\n", entry.Value(), entry.Revision())

// Watch for real-time changes
watcher, _ := kv.WatchAll(ctx)
for entry := range watcher.Updates() {
    if entry != nil {
        fmt.Printf("Changed: %s = %s\n", entry.Key(), entry.Value())
    }
}

Capabilities: Get, Put, Delete, Purge, Create (CAS), Update (CAS), Watch (wildcards), History, TTL, configurable replicas (1/3/5).

Object Store

Storage for large objects (files, model weights, datasets) with automatic chunking, SHA-256 integrity, and metadata tracking.

os, _ := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{
    Bucket:   "MODELS",
    Replicas: 3,
})

file, _ := os.Open("model.safetensors")
os.Put(ctx, jetstream.ObjectMeta{Name: "llm/v1/weights"}, file)

reader, _ := os.Get(ctx, "llm/v1/weights")
// ... stream large object from reader

Configuration

# /etc/pubsub/server.conf

listen: 0.0.0.0:4222
max_payload: 8MB
max_connections: 65536

jetstream {
  store_dir: /data/pubsub/jetstream
  max_memory: 4GB
  max_storage: 100GB
  domain: hanzo
}

server_name: pubsub-1
cluster {
  name: hanzo-pubsub
  listen: 0.0.0.0:6222
  routes: [
    pubsub://pubsub-1:6222
    pubsub://pubsub-2:6222
    pubsub://pubsub-3:6222
  ]
}

websocket {
  listen: 0.0.0.0:5222
  tls {
    cert_file: /etc/pubsub/tls/cert.pem
    key_file:  /etc/pubsub/tls/key.pem
  }
}

leafnodes {
  listen: 0.0.0.0:7422
}

gateway {
  name: region-us
  listen: 0.0.0.0:7222
  gateways: [
    { name: region-eu, urls: ["pubsub://eu-pubsub-1:7222"] }
    { name: region-ap, urls: ["pubsub://ap-pubsub-1:7222"] }
  ]
}

tls {
  cert_file: /etc/pubsub/tls/cert.pem
  key_file:  /etc/pubsub/tls/key.pem
  ca_file:   /etc/pubsub/tls/ca.pem
  verify:    true
}

http_port: 8222

Key Parameters

ParameterDefaultDescription
listen0.0.0.0:4222Client listener address and port
max_payload1MBMaximum message payload size
max_connections65536Maximum concurrent client connections
jetstream.max_memoryautoMaximum memory for JetStream memory storage
jetstream.max_storageautoMaximum disk for JetStream file storage
jetstream.store_dir/tmp/pubsubJetStream data directory
cluster.name--Cluster name for route discovery
websocket.listen--WebSocket listener (disabled if unset)
leafnodes.listen--Leaf node listener (disabled if unset)
gateway.name--Super-cluster gateway name
tls.verifyfalseEnable mutual TLS client verification

Monitoring Endpoints

EndpointDescription
GET /varzGeneral server information and statistics
GET /connzActive client connection details
GET /routezCluster route information
GET /subszSubscription routing tree
GET /jszJetStream account and usage information
GET /healthzHealth check (returns 200 when ready)
GET /gatewayzSuper-cluster gateway status
GET /leafzLeaf node connection details

Docker

docker run -d --name pubsub \
  -p 4222:4222 -p 5222:5222 -p 8222:8222 \
  -v pubsub-data:/data/pubsub \
  ghcr.io/hanzoai/pubsub:latest \
  -js -sd /data/pubsub/jetstream

How is this guide?

Last updated on

On this page