Hanzo
Services

Hanzo Stream

Kafka-compatible streaming gateway built on Hanzo PubSub

Hanzo Stream

Hanzo Stream is a stateless Kafka wire protocol gateway built on Hanzo PubSub. Standard Kafka clients connect on :9092; the gateway translates every Kafka API call into PubSub JetStream operations. All storage, replication, and durability live in PubSub — the gateway holds no state and scales horizontally.

Endpoint: stream.hanzo.ai:9092 Gateway: api.hanzo.ai/v1/stream/* Source: github.com/hanzoai/stream

Features

  • Kafka Wire Protocol: Native compatibility with Kafka clients -- no code changes required
  • Stateless Gateway: All state lives in Hanzo PubSub; multiple brokers can share the same PubSub cluster
  • Topics and Partitions: Create topics with configurable partition counts for parallel consumption
  • Consumer Groups: Coordinated consumers with automatic offset tracking via Hanzo Stream KV
  • Compression: GZIP, Snappy, LZ4, and ZSTD compression on the wire
  • Horizontal Scaling: Deploy as many broker instances as needed behind a load balancer
  • Hanzo Stream Backed: Durable, replicated storage with configurable replication factor

Supported Kafka APIs

APIKeyDescription
Produce0Write records to topic partitions
Fetch1Read records from topic partitions
ListOffsets2Query earliest/latest offsets
Metadata3Discover topics, partitions, and brokers
OffsetCommit8Commit consumer group offsets
OffsetFetch9Retrieve committed offsets
FindCoordinator10Locate the group coordinator broker
JoinGroup11Join a consumer group
Heartbeat12Maintain consumer group membership
SyncGroup14Distribute partition assignments
ApiVersions18Negotiate supported API versions
CreateTopics19Create new topics
InitProducerId22Initialize idempotent producer
DescribeConfigs32Query broker and topic configuration

Quick Start

Connect using any standard Kafka client on port 9092.

kafkacat / kcat

# Create a topic and produce messages
echo "hello world" | kcat -b stream.hanzo.ai:9092 -t my-topic -P

# Consume from the beginning
kcat -b stream.hanzo.ai:9092 -t my-topic -C -o beginning

Kafka CLI Tools

# Create a topic
kafka-topics.sh --create --topic events \
  --partitions 3 \
  --bootstrap-server stream.hanzo.ai:9092

# Produce
kafka-console-producer.sh \
  --bootstrap-server stream.hanzo.ai:9092 \
  --topic events

# Consume
kafka-console-consumer.sh \
  --bootstrap-server stream.hanzo.ai:9092 \
  --topic events \
  --from-beginning

Python (kafka-python)

from kafka import KafkaProducer, KafkaConsumer

# Producer
producer = KafkaProducer(
    bootstrap_servers=["stream.hanzo.ai:9092"],
    compression_type="zstd",
)
producer.send("events", b"hello from python")
producer.flush()

# Consumer
consumer = KafkaConsumer(
    "events",
    bootstrap_servers=["stream.hanzo.ai:9092"],
    group_id="my-group",
    auto_offset_reset="earliest",
)
for msg in consumer:
    print(f"{msg.topic}:{msg.partition}:{msg.offset} = {msg.value}")

Go (sarama)

package main

import (
    "fmt"
    "log"

    "github.com/IBM/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.Compression = sarama.CompressionZSTD

    // Producer
    producer, err := sarama.NewSyncProducer(
        []string{"stream.hanzo.ai:9092"}, config,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer producer.Close()

    partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
        Topic: "events",
        Value: sarama.StringEncoder("hello from go"),
    })
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("sent to partition %d at offset %d\n", partition, offset)

    // Consumer
    consumer, err := sarama.NewConsumer(
        []string{"stream.hanzo.ai:9092"}, config,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    pc, err := consumer.ConsumePartition("events", 0, sarama.OffsetOldest)
    if err != nil {
        log.Fatal(err)
    }
    for msg := range pc.Messages() {
        fmt.Printf("%s:%d:%d = %s\n",
            msg.Topic, msg.Partition, msg.Offset, msg.Value)
    }
}

Java

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.*;

// Producer
Properties props = new Properties();
props.put("bootstrap.servers", "stream.hanzo.ai:9092");
props.put("key.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.type", "zstd");

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("events", "hello from java"));
producer.close();

// Consumer
Properties cprops = new Properties();
cprops.put("bootstrap.servers", "stream.hanzo.ai:9092");
cprops.put("group.id", "my-group");
cprops.put("auto.offset.reset", "earliest");
cprops.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
cprops.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(cprops);
consumer.subscribe(Collections.singletonList("events"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("%s:%d:%d = %s%n",
            record.topic(), record.partition(),
            record.offset(), record.value());
    }
}

Architecture

┌─────────────────────────────────────────────────────────────┐
│  Kafka Clients (producers / consumers)                      │
│  kafkacat, sarama, kafka-python, Java KafkaClient, ...      │
└──────────────────────┬──────────────────────────────────────┘
                       │ Kafka wire protocol (TCP :9092)
                       v
┌─────────────────────────────────────────────────────────────┐
│  Hanzo Stream Gateway (stateless)                            │
│                                                             │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                  │
│  │ Instance  │  │ Instance  │  │ Instance  │                 │
│  │    1      │  │    2      │  │    3      │                 │
│  └─────┬────┘  └─────┬────┘  └─────┬────┘                  │
│        │             │             │                        │
│        └──────┬──────┘──────┬──────┘                        │
│               │             │                               │
│        Protocol Translation │                               │
│    (Kafka <-> PubSub/JS)    │                               │
├───────────────┴─────────────┴───────────────────────────────┤
│  Hanzo PubSub (Hanzo Stream)                                │
│                                                             │
│  Streams:                      KV Store:                    │
│    kafka-events-0              kafka-consumer-offsets        │
│    kafka-events-1                group.topic.partition       │
│    kafka-events-2                -> committed offset         │
│    kafka-logs-0                                             │
│                                                             │
│  Replication, Durability, Clustering                        │
└─────────────────────────────────────────────────────────────┘

How It Works

Hanzo Stream translates Kafka concepts to PubSub JetStream primitives:

Kafka ConceptPubSub JetStream Equivalent
Topic foo, Partition NStream kafka-foo-N, Subject kafka.foo.N
Produce recordPublish to stream subject, returns sequence as offset
Fetch at offsetGet message at sequence (offset + 1)
Consumer group offsetsKV bucket kafka-consumer-offsets
Create topic (N partitions)Create N streams
Topic metadataStream info queries

One stream per partition ensures a clean 1:1 mapping between Kafka offsets and PubSub sequences. The broker translates between Kafka's 0-based offsets and PubSub's 1-based sequences automatically.

Stateless Design

The broker maintains no local state. All data, metadata, and consumer offsets are stored in Hanzo PubSub. This means:

  • Horizontal scaling: Add or remove broker instances at any time
  • Zero data loss: No state to lose on broker restart
  • Simple operations: No leader election, no rebalancing, no split-brain concerns
  • Shared nothing: Every instance is identical and interchangeable

Configuration

FlagDefaultDescription
--pubsub-urlnats://localhost:4222Hanzo PubSub server URL
--pubsub-credsOptional PubSub credentials file
--port9092Port for Kafka client connections
--hostlocalhostAdvertised hostname
--node-id1Broker node ID
--replicas1Hanzo Stream replication factor
--storagefileHanzo Stream storage type (file or memory)

Self-Hosted

Run your own Hanzo Stream instance with Docker:

docker run -d --name stream \
  -p 9092:9092 \
  ghcr.io/hanzoai/stream:latest \
  --pubsub-url nats://your-pubsub:4222 \
  --host 0.0.0.0

Or build from source:

git clone https://github.com/hanzoai/stream.git
cd stream
go build -o hanzo-stream .
./hanzo-stream --pubsub-url nats://localhost:4222

Performance

Hanzo Stream is optimized for high-throughput streaming workloads:

┌────────────────────────────────────────────────────────────────┐
│  Producer Benchmark (1M records, 500B each, acks=1)            │
│                                                                │
│  Throughput:  ~100,000 records/sec  (~47 MB/sec)               │
│  Latency:    2.6 ms avg | 1 ms p50 | 14 ms p95 | 53 ms p99   │
└────────────────────────────────────────────────────────────────┘

Run your own benchmark with standard Kafka tools:

kafka-producer-perf-test.sh \
  --topic perf-test \
  --num-records 1000000 \
  --record-size 500 \
  --throughput 100000 \
  --producer-props \
    acks=1 \
    batch.size=16384 \
    linger.ms=5 \
    bootstrap.servers=stream.hanzo.ai:9092

How is this guide?

Last updated on

On this page