Hanzo Stream
Kafka-compatible streaming gateway built on Hanzo PubSub
Hanzo Stream
Hanzo Stream is a stateless Kafka wire protocol gateway built on Hanzo PubSub. Standard Kafka clients connect on :9092; the gateway translates every Kafka API call into PubSub JetStream operations. All storage, replication, and durability live in PubSub — the gateway holds no state and scales horizontally.
Endpoint: stream.hanzo.ai:9092
Gateway: api.hanzo.ai/v1/stream/*
Source: github.com/hanzoai/stream
Features
- Kafka Wire Protocol: Native compatibility with Kafka clients -- no code changes required
- Stateless Gateway: All state lives in Hanzo PubSub; multiple brokers can share the same PubSub cluster
- Topics and Partitions: Create topics with configurable partition counts for parallel consumption
- Consumer Groups: Coordinated consumers with automatic offset tracking via Hanzo Stream KV
- Compression: GZIP, Snappy, LZ4, and ZSTD compression on the wire
- Horizontal Scaling: Deploy as many broker instances as needed behind a load balancer
- Hanzo Stream Backed: Durable, replicated storage with configurable replication factor
Supported Kafka APIs
| API | Key | Description |
|---|---|---|
| Produce | 0 | Write records to topic partitions |
| Fetch | 1 | Read records from topic partitions |
| ListOffsets | 2 | Query earliest/latest offsets |
| Metadata | 3 | Discover topics, partitions, and brokers |
| OffsetCommit | 8 | Commit consumer group offsets |
| OffsetFetch | 9 | Retrieve committed offsets |
| FindCoordinator | 10 | Locate the group coordinator broker |
| JoinGroup | 11 | Join a consumer group |
| Heartbeat | 12 | Maintain consumer group membership |
| SyncGroup | 14 | Distribute partition assignments |
| ApiVersions | 18 | Negotiate supported API versions |
| CreateTopics | 19 | Create new topics |
| InitProducerId | 22 | Initialize idempotent producer |
| DescribeConfigs | 32 | Query broker and topic configuration |
Quick Start
Connect using any standard Kafka client on port 9092.
kafkacat / kcat
# Create a topic and produce messages
echo "hello world" | kcat -b stream.hanzo.ai:9092 -t my-topic -P
# Consume from the beginning
kcat -b stream.hanzo.ai:9092 -t my-topic -C -o beginningKafka CLI Tools
# Create a topic
kafka-topics.sh --create --topic events \
--partitions 3 \
--bootstrap-server stream.hanzo.ai:9092
# Produce
kafka-console-producer.sh \
--bootstrap-server stream.hanzo.ai:9092 \
--topic events
# Consume
kafka-console-consumer.sh \
--bootstrap-server stream.hanzo.ai:9092 \
--topic events \
--from-beginningPython (kafka-python)
from kafka import KafkaProducer, KafkaConsumer
# Producer
producer = KafkaProducer(
bootstrap_servers=["stream.hanzo.ai:9092"],
compression_type="zstd",
)
producer.send("events", b"hello from python")
producer.flush()
# Consumer
consumer = KafkaConsumer(
"events",
bootstrap_servers=["stream.hanzo.ai:9092"],
group_id="my-group",
auto_offset_reset="earliest",
)
for msg in consumer:
print(f"{msg.topic}:{msg.partition}:{msg.offset} = {msg.value}")Go (sarama)
package main
import (
"fmt"
"log"
"github.com/IBM/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Compression = sarama.CompressionZSTD
// Producer
producer, err := sarama.NewSyncProducer(
[]string{"stream.hanzo.ai:9092"}, config,
)
if err != nil {
log.Fatal(err)
}
defer producer.Close()
partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: "events",
Value: sarama.StringEncoder("hello from go"),
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("sent to partition %d at offset %d\n", partition, offset)
// Consumer
consumer, err := sarama.NewConsumer(
[]string{"stream.hanzo.ai:9092"}, config,
)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
pc, err := consumer.ConsumePartition("events", 0, sarama.OffsetOldest)
if err != nil {
log.Fatal(err)
}
for msg := range pc.Messages() {
fmt.Printf("%s:%d:%d = %s\n",
msg.Topic, msg.Partition, msg.Offset, msg.Value)
}
}Java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.*;
// Producer
Properties props = new Properties();
props.put("bootstrap.servers", "stream.hanzo.ai:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.type", "zstd");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("events", "hello from java"));
producer.close();
// Consumer
Properties cprops = new Properties();
cprops.put("bootstrap.servers", "stream.hanzo.ai:9092");
cprops.put("group.id", "my-group");
cprops.put("auto.offset.reset", "earliest");
cprops.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
cprops.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(cprops);
consumer.subscribe(Collections.singletonList("events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("%s:%d:%d = %s%n",
record.topic(), record.partition(),
record.offset(), record.value());
}
}Architecture
┌─────────────────────────────────────────────────────────────┐
│ Kafka Clients (producers / consumers) │
│ kafkacat, sarama, kafka-python, Java KafkaClient, ... │
└──────────────────────┬──────────────────────────────────────┘
│ Kafka wire protocol (TCP :9092)
v
┌─────────────────────────────────────────────────────────────┐
│ Hanzo Stream Gateway (stateless) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Instance │ │ Instance │ │ Instance │ │
│ │ 1 │ │ 2 │ │ 3 │ │
│ └─────┬────┘ └─────┬────┘ └─────┬────┘ │
│ │ │ │ │
│ └──────┬──────┘──────┬──────┘ │
│ │ │ │
│ Protocol Translation │ │
│ (Kafka <-> PubSub/JS) │ │
├───────────────┴─────────────┴───────────────────────────────┤
│ Hanzo PubSub (Hanzo Stream) │
│ │
│ Streams: KV Store: │
│ kafka-events-0 kafka-consumer-offsets │
│ kafka-events-1 group.topic.partition │
│ kafka-events-2 -> committed offset │
│ kafka-logs-0 │
│ │
│ Replication, Durability, Clustering │
└─────────────────────────────────────────────────────────────┘How It Works
Hanzo Stream translates Kafka concepts to PubSub JetStream primitives:
| Kafka Concept | PubSub JetStream Equivalent |
|---|---|
Topic foo, Partition N | Stream kafka-foo-N, Subject kafka.foo.N |
| Produce record | Publish to stream subject, returns sequence as offset |
| Fetch at offset | Get message at sequence (offset + 1) |
| Consumer group offsets | KV bucket kafka-consumer-offsets |
| Create topic (N partitions) | Create N streams |
| Topic metadata | Stream info queries |
One stream per partition ensures a clean 1:1 mapping between Kafka offsets and PubSub sequences. The broker translates between Kafka's 0-based offsets and PubSub's 1-based sequences automatically.
Stateless Design
The broker maintains no local state. All data, metadata, and consumer offsets are stored in Hanzo PubSub. This means:
- Horizontal scaling: Add or remove broker instances at any time
- Zero data loss: No state to lose on broker restart
- Simple operations: No leader election, no rebalancing, no split-brain concerns
- Shared nothing: Every instance is identical and interchangeable
Configuration
| Flag | Default | Description |
|---|---|---|
--pubsub-url | nats://localhost:4222 | Hanzo PubSub server URL |
--pubsub-creds | Optional PubSub credentials file | |
--port | 9092 | Port for Kafka client connections |
--host | localhost | Advertised hostname |
--node-id | 1 | Broker node ID |
--replicas | 1 | Hanzo Stream replication factor |
--storage | file | Hanzo Stream storage type (file or memory) |
Self-Hosted
Run your own Hanzo Stream instance with Docker:
docker run -d --name stream \
-p 9092:9092 \
ghcr.io/hanzoai/stream:latest \
--pubsub-url nats://your-pubsub:4222 \
--host 0.0.0.0Or build from source:
git clone https://github.com/hanzoai/stream.git
cd stream
go build -o hanzo-stream .
./hanzo-stream --pubsub-url nats://localhost:4222Performance
Hanzo Stream is optimized for high-throughput streaming workloads:
┌────────────────────────────────────────────────────────────────┐
│ Producer Benchmark (1M records, 500B each, acks=1) │
│ │
│ Throughput: ~100,000 records/sec (~47 MB/sec) │
│ Latency: 2.6 ms avg | 1 ms p50 | 14 ms p95 | 53 ms p99 │
└────────────────────────────────────────────────────────────────┘Run your own benchmark with standard Kafka tools:
kafka-producer-perf-test.sh \
--topic perf-test \
--num-records 1000000 \
--record-size 500 \
--throughput 100000 \
--producer-props \
acks=1 \
batch.size=16384 \
linger.ms=5 \
bootstrap.servers=stream.hanzo.ai:9092Related Services
How is this guide?
Last updated on
Hanzo MQ
High-performance message queue and job processing built on Hanzo KV (Redis Streams). BullMQ-compatible protocol with reliable delivery, priority queues, and dead letter support.
Hanzo PubSub
High-performance publish/subscribe messaging with persistent streaming, key-value store, and object storage