Hanzo

Commerce Event Catalog

Reference for all commerce events published to NATS/JetStream, including GA4 and Facebook CAPI normalization.

Commerce Event Catalog

Hanzo Commerce publishes domain events to Hanzo PubSub (NATS JetStream) for real-time downstream consumption. Events are published by events/publisher.go in the commerce service and carry normalized payloads compatible with GA4 Enhanced Ecommerce and Facebook Conversions API (CAPI).

JetStream Stream: COMMERCE Subject Prefix: commerce.> Publisher: events/publisher.go in hanzoai/commerce Wire Format: JSON over NATS JetStream

Stream Configuration

// Stream created by commerce service on startup
jetstream.StreamConfig{
    Name:      "COMMERCE",
    Subjects:  []string{"commerce.>"},
    Storage:   jetstream.FileStorage,
    Replicas:  3,
    Retention: jetstream.LimitsPolicy,
    MaxAge:    30 * 24 * time.Hour, // 30 days
    MaxBytes:  10 * 1024 * 1024 * 1024, // 10 GB
    Discard:   jetstream.DiscardOld,
}

Events

commerce.order.created

Published when a new order is placed (checkout completed, payment authorized).

Subject: commerce.order.created

{
  "event": "commerce.order.created",
  "timestamp": "2026-03-03T12:00:00.000Z",
  "order_id": "ord_abc123",
  "organization_id": "org-hanzo",
  "currency": "USD",
  "total": 149.99,
  "subtotal": 139.99,
  "tax": 10.00,
  "shipping": 0.00,
  "items": [
    {
      "item_id": "prod_001",
      "item_name": "AI API Credits (1000)",
      "price": 49.99,
      "quantity": 2,
      "item_category": "credits",
      "item_variant": "1000-pack"
    },
    {
      "item_id": "prod_002",
      "item_name": "Pro Plan - Monthly",
      "price": 49.99,
      "quantity": 1,
      "item_category": "subscription",
      "item_variant": "monthly"
    }
  ],
  "customer": {
    "customer_id": "cus_456",
    "email_sha256": "a1b2c3d4e5f6...",
    "country": "US",
    "region": "CA"
  },
  "payment": {
    "method": "card",
    "provider": "stripe",
    "transaction_id": "pi_xyz789"
  }
}

commerce.order.completed

Published when an order is fulfilled (all items shipped or delivered).

Subject: commerce.order.completed

{
  "event": "commerce.order.completed",
  "timestamp": "2026-03-03T14:30:00.000Z",
  "order_id": "ord_abc123",
  "organization_id": "org-hanzo",
  "currency": "USD",
  "total": 149.99,
  "items": [
    {
      "item_id": "prod_001",
      "item_name": "AI API Credits (1000)",
      "price": 49.99,
      "quantity": 2
    }
  ],
  "fulfillment": {
    "method": "digital",
    "completed_at": "2026-03-03T14:30:00.000Z"
  }
}

commerce.order.refunded

Published when a full or partial refund is processed.

Subject: commerce.order.refunded

{
  "event": "commerce.order.refunded",
  "timestamp": "2026-03-04T09:15:00.000Z",
  "order_id": "ord_abc123",
  "organization_id": "org-hanzo",
  "currency": "USD",
  "refund_amount": 49.99,
  "refund_type": "partial",
  "reason": "customer_request",
  "items": [
    {
      "item_id": "prod_001",
      "item_name": "AI API Credits (1000)",
      "price": 49.99,
      "quantity": 1
    }
  ],
  "refund": {
    "refund_id": "ref_def456",
    "provider": "stripe",
    "transaction_id": "re_abc789"
  }
}

commerce.checkout.started

Published when a customer initiates checkout (cart converted to checkout session).

Subject: commerce.checkout.started

{
  "event": "commerce.checkout.started",
  "timestamp": "2026-03-03T11:55:00.000Z",
  "checkout_id": "chk_ghi789",
  "organization_id": "org-hanzo",
  "currency": "USD",
  "total": 149.99,
  "items": [
    {
      "item_id": "prod_001",
      "item_name": "AI API Credits (1000)",
      "price": 49.99,
      "quantity": 2
    },
    {
      "item_id": "prod_002",
      "item_name": "Pro Plan - Monthly",
      "price": 49.99,
      "quantity": 1
    }
  ],
  "customer": {
    "customer_id": "cus_456",
    "email_sha256": "a1b2c3d4e5f6..."
  }
}

GA4 Enhanced Ecommerce Normalization

All commerce events use the GA4 Enhanced Ecommerce items[] schema for product data. This means any consumer can forward events directly to Google Analytics 4 without transformation.

Item Schema

Every items[] entry follows the GA4 item format:

FieldTypeDescription
item_idstringProduct ID (maps to GA4 item_id)
item_namestringProduct display name
pricenumberUnit price in the event currency
quantitynumberNumber of units
item_categorystringPrimary category (optional)
item_category2stringSecondary category (optional)
item_variantstringVariant (size, plan tier, etc.) (optional)
item_brandstringBrand name (optional)
couponstringApplied coupon code (optional)
discountnumberDiscount amount per unit (optional)
indexnumberPosition in the list (optional)

GA4 Event Mapping

Commerce EventGA4 Event
commerce.checkout.startedbegin_checkout
commerce.order.createdpurchase
commerce.order.completedpurchase (with fulfillment data)
commerce.order.refundedrefund

Forwarding to GA4

import { connect } from 'hanzo-pubsub';

const nc = await connect({ servers: 'pubsub.hanzo.ai:4222' });
const js = nc.jetstream();

const consumer = await js.consumers.get('COMMERCE', 'ga4-forwarder');
const messages = await consumer.consume();

for await (const msg of messages) {
  const event = JSON.parse(new TextDecoder().decode(msg.data));

  // Forward to GA4 Measurement Protocol
  await fetch('https://www.google-analytics.com/mp/collect?' +
    `measurement_id=${GA4_MEASUREMENT_ID}&api_secret=${GA4_API_SECRET}`, {
    method: 'POST',
    body: JSON.stringify({
      client_id: event.customer?.customer_id || 'anonymous',
      events: [{
        name: GA4_EVENT_MAP[event.event],
        params: {
          currency: event.currency,
          value: event.total || event.refund_amount,
          items: event.items,
          transaction_id: event.order_id,
        },
      }],
    }),
  });

  msg.ack();
}

Facebook Conversions API Normalization

Commerce events include SHA256-hashed customer data for server-side Facebook CAPI integration. All personally identifiable information is hashed before publishing to the event stream.

Hashed Fields

FieldHashingDescription
customer.email_sha256SHA256(lowercase(trim(email)))Hashed email address
customer.phone_sha256SHA256(digits_only(phone))Hashed phone number (when available)
customer.first_name_sha256SHA256(lowercase(trim(first_name)))Hashed first name (when available)
customer.last_name_sha256SHA256(lowercase(trim(last_name)))Hashed last name (when available)

CAPI Event Mapping

Commerce EventFacebook Event
commerce.checkout.startedInitiateCheckout
commerce.order.createdPurchase
commerce.order.refunded(no direct mapping -- use custom event)

Forwarding to Facebook CAPI

for await (const msg of messages) {
  const event = JSON.parse(new TextDecoder().decode(msg.data));

  const fbEvent = {
    event_name: FB_EVENT_MAP[event.event],
    event_time: Math.floor(new Date(event.timestamp).getTime() / 1000),
    action_source: 'website',
    user_data: {
      em: [event.customer?.email_sha256],        // already SHA256
      ph: [event.customer?.phone_sha256],         // already SHA256
      fn: [event.customer?.first_name_sha256],    // already SHA256
      ln: [event.customer?.last_name_sha256],     // already SHA256
      country: [event.customer?.country?.toLowerCase()],
      st: [event.customer?.region?.toLowerCase()],
    },
    custom_data: {
      currency: event.currency,
      value: event.total || event.refund_amount,
      contents: event.items?.map(item => ({
        id: item.item_id,
        quantity: item.quantity,
        item_price: item.price,
      })),
      content_type: 'product',
      order_id: event.order_id,
    },
  };

  await fetch(
    `https://graph.facebook.com/v19.0/${PIXEL_ID}/events?access_token=${FB_ACCESS_TOKEN}`,
    {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ data: [fbEvent] }),
    },
  );

  msg.ack();
}

Consuming Events

Go Consumer

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    pubsub "github.com/hanzoai/pubsub-go"
    "github.com/hanzoai/pubsub-go/jetstream"
)

type CommerceEvent struct {
    Event          string  `json:"event"`
    Timestamp      string  `json:"timestamp"`
    OrderID        string  `json:"order_id"`
    OrganizationID string  `json:"organization_id"`
    Currency       string  `json:"currency"`
    Total          float64 `json:"total"`
}

func main() {
    nc, err := pubsub.Connect("pubsub.hanzo.ai:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    ctx := context.Background()
    js, _ := jetstream.New(nc)

    consumer, _ := js.CreateOrUpdateConsumer(ctx, "COMMERCE", jetstream.ConsumerConfig{
        Durable:       "my-commerce-processor",
        AckPolicy:     jetstream.AckExplicitPolicy,
        FilterSubject: "commerce.order.>",
    })

    consumer.Consume(func(msg jetstream.Msg) {
        var event CommerceEvent
        json.Unmarshal(msg.Data(), &event)
        fmt.Printf("[%s] Order %s: $%.2f %s\n",
            event.Event, event.OrderID, event.Total, event.Currency)
        msg.Ack()
    })
}

Python Consumer

import asyncio
import json
from hanzo.pubsub import connect

async def main():
    nc = await connect("pubsub.hanzo.ai:4222")
    js = nc.jetstream()

    sub = await js.pull_subscribe(
        "commerce.>",
        durable="my-python-processor",
        stream="COMMERCE",
    )

    while True:
        messages = await sub.fetch(batch=10, timeout=5)
        for msg in messages:
            event = json.loads(msg.data.decode())
            print(f"[{event['event']}] Order {event.get('order_id', 'N/A')}: "
                  f"${event.get('total', 0):.2f}")
            await msg.ack()

asyncio.run(main())

Hanzo Commerce -- the headless e-commerce platform that publishes these events

Hanzo PubSub (NATS JetStream) -- the messaging layer for event delivery

Kafka-compatible gateway for consuming events with Kafka clients

Behavioral analytics event catalog for the @hanzo/insights ecosystem

How is this guide?

Last updated on

On this page