← All projects

realtime · intermediate · ~14-22h

Ticker

One price tick reaches every open dashboard in order; offline clients replay what they missed.

You'll build the realtime backbone of an operations control room: products whose price and stock many people watch at once. When an operator changes a price or a sale decrements stock, the change is committed once to Postgres, appended to a Redis Stream as an ordered, durable event, and fanned out over Server-Sent Events to every open dashboard within ~100ms. Each client remembers the last Stream id it saw, so on reconnect it replays exactly the events it missed — no gaps, no full refetch — whether the server is written in Go or TypeScript.

backend Godatabase PostgreSQLai Gemini APIcloud Google Cloud
The one idea that makes this project click

The read path is a bounded tail of a Redis Stream plus replay-from-last-id: every dashboard XREADs a capped window of the change feed and, on reconnect, replays from the last Stream id it saw — or asks for a fresh snapshot if that id was already trimmed — so no committed change is ever missed and no reader blocks forever.

Production spec — the contract you're building toward

A live operations dashboard where one operator's price or stock change reaches every open window in order within ~100ms, and a briefly-offline window replays exactly what it missed. The contract is five HTTP routes plus one SSE event shape (data / replaying / resync frames + keepalive) over a Redis Stream, implemented at parity by a Go and a TypeScript backend.

Read the full production spec →

Why this stack

Realtime is a propagation problem, not a storage problem — a change in one place must appear, correctly and in order, in every watcher, including ones briefly offline. Redis Streams is the load-bearing piece because it is an append-only, ordered log with stable ids and replay-from-id, which plain pub/sub (at-most-once, no replay) and request/response Postgres (no fan-out) cannot provide. Postgres stays the durable source of truth and the home of the one-line atomic oversell guard; the Stream is the change feed derived from it. The backend language is a swappable shell around that store — you build the same fan-out in Go and TypeScript and watch the realtime store, not the language, carry the weight.

What you'll be able to do

  • Tell at-most-once pub/sub apart from a durable, ordered, replayable log (Redis Streams), and know when each is correct
  • Model every state change as an event — commit to Postgres, then `XADD` an ordered change event with a stable id
  • Fan a change feed out to many live clients over SSE so that N server instances behave like one
  • Implement reconnect-and-replay — persist the last Stream id per client and `XREAD` only what was missed, idempotently
  • Enforce optimistic concurrency with a per-row version and a conditional UPDATE, returning a clean 409 plus the current value
  • Guard overselling with one atomic `UPDATE ... WHERE stock >= qty`, and contrast it with Aurora's full-transaction lesson
  • Build a live web dashboard where price and stock update across two windows at once, with a Live/Reconnecting/Replaying status
  • Implement the same fan-out server in Go and TypeScript and see the realtime store, not the language, carry the weight

TechFit — which tools actually suit this build

TechFit — how well each technology suits this project (editorial 1–5).
Technology Fit Role Why
Redis spotlight 5/5 The realtime store — the change feed, the fan-out source, and the reconnect-replay log. Streams give append-only ordering, stable ids, and XREAD-from-id replay that pub/sub cannot.
Go 5/5 Default fan-out server tailing the Stream and pushing SSE. Goroutines plus go-redis make one reader and many subscribers simple, with a tiny deploy.
PostgreSQL 4/5 Durable source of truth; home of the version column and the atomic oversell guard. ACID writes and a one-line conditional UPDATE keep the truth correct, but it cannot fan out or replay.
TypeScript 4/5 Edge-friendly alternative server (Hono on Bun or Node). One typed language down to the browser, and async iterators that model an SSE stream cleanly.
Google Cloud 4/5 Optional host — Cloud Run for the stateless SSE server, alongside managed or self-hosted Redis. Serverless containers fit the stateless fan-out; it is never required to see the project work.
Jetpack Compose 4/5 Optional native Android client beside the web dashboard, consuming the same SSE stream. collectAsStateWithLifecycle makes a live board that ticks per event simple; unlike the browser, the SSE client manages reconnect/Last-Event-ID itself.
Gemini API 3/5 Optional — summarise the recent event stream into a restock or price-anomaly note. A useful read-only add-on, but not load-bearing for realtime correctness.
MongoDB 3/5 Alternative change feed (not chosen) via change streams. Change streams exist, but resume-token replay is clunkier than stable Stream ids for this job.

The build

Your path filters the steps below — pick a backend, a frontend, and any optional modules.

Build it your way — steps below adapt to your choices.

Backend
Frontend
Optional modules — off by default
  • AI Anomaly Note: Summarise the recent change stream with Gemini into a read-only restock / price-anomaly note — surfaced for a human, never auto-applied.
  • Native Jetpack Compose client: A second, native Android client beside the bundled web dashboard — consume the same SSE stream with OkHttp, hold the board in Compose state, and do the same reconnect-and-replay by Last-Event-ID.

Learn the language itself: Go · TypeScript

Pick your backend (Go or TypeScript) above; the steps adapt, and the bundled web dashboard is the same for both. Watch the spotlight: by the fan-out step, one operator’s price change is committed once to Postgres, appended to a Redis Stream, and pushed to every open browser within ~100ms — and a window that was briefly offline replays exactly the events it missed instead of refetching everything. The offline-replay demo near the end is where you’ll see a window catch up — that’s the marquee moment to build toward.

How this differs from Aurora Commerce. Same domain (price and stock), orthogonal lesson. Aurora is about in-transaction correctness — checkout as one BEGIN … COMMIT, where the oversell guard is the whole story. Ticker reduces that guard to a single deliberate atomic line and spends the rest of the build on what Aurora never touches: propagating a committed change, live and in order, to many watchers, with reconnect-replay. The server is the single arbiter of each row — there is no merge (that’s Concord’s CRDT lesson), and clients reconcile against a shared feed by Stream offset, not one device’s offline edits (that’s Vitals).

Stand up Postgres + Redis with one docker compose

Beginner

Start Postgres and Redis together with one Docker Compose file and export their two connection strings — so the durable store and the realtime store both run locally and your server knows where to find each.

New in this step
Docker Compose

A YAML file that defines and runs containers together — here one Postgres and one Redis — so every learner gets the identical, throwaway pair with one command.

docker compose quickstart
DATABASE_URL

The Postgres connection string in an environment variable; reading it from the env means the same build runs locally, in CI, and in the cloud.

twelve-factor config environment track ↗
REDIS_URL

The Redis address in an environment variable, shaped redis://host:port, so the realtime store is configured the same way as the database.

redis connection url format track ↗
redis:// connection string

The single-line address of a Redis server (redis://localhost:6379) that the client parses to connect.

redis url scheme docs ↗
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Why one compose file — and it costs nothing

Both services are free, official open-source images — postgres:16 and redis:7 — so the whole backend runs locally with no account, quota, or card. Compose gives every learner the same versions and a clean reset (docker compose down -v). Your server reads DATABASE_URL and REDIS_URL from the environment, so the identical build runs locally, in CI, and in the cloud — only the connection strings change.

Costs nothing. redis:7 and postgres:16 are open-source images; XADD, XREAD, and consumer groups are core Redis, not a paid tier.

docker-compose.yml
Run these in your terminal / editor
# docker-compose.yml
services:
  db:
    image: postgres:16
    environment:
      POSTGRES_PASSWORD: dev
      POSTGRES_DB: ticker
    ports: ["5432:5432"]
  redis:
    image: redis:7
    ports: ["6379:6379"]
Run it
Run these in your terminal / editor
docker compose up -d
export DATABASE_URL="postgres://postgres:dev@localhost:5432/ticker?sslmode=disable"
export REDIS_URL="redis://localhost:6379"
redis-cli -u "$REDIS_URL" ping   # -> PONG
What success looks like

Both containers are up and reachable. docker compose ps shows db and redis as running, and the redis-cli ... ping prints PONG. The two env vars are now exported in this shell, so the server will read them.

Why a realtime DB is a change feed and fan-out, not a faster query

Beginner

Before any code, lock in the core idea — realtime is a change feed that fans out and can be replayed, not a faster query — because that one distinction is why pub/sub and Streams are not interchangeable and why Streams is the spotlight here.

New in this step
change feed

A running list of what changed (not the current value) so watchers can react to each event as it happens.

change data capture stream
fan-out

Delivering one change to many subscribers at once, instead of each one re-querying for it.

pub sub fan out pattern
replay

Re-reading past events from a chosen point so a client that was away can apply exactly what it missed.

event log replay from offset
Redis pub/sub

Fire-and-forget messaging: at-most-once, with no memory — a subscriber that was offline never learns of the change.

redis pub sub vs streams docs ↗
Redis Streams

An append-only log with stable ordered ids and read-from-any-id replay — the one primitive that gives change feed, fan-out, and replay together.

redis streams introduction track ↗ docs ↗
stream id `<ms>-<seq>`

Every Stream entry’s id is a millisecond timestamp plus a sequence counter, monotonic and ordered — the marker you replay from.

redis stream entry id format
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Pub/sub forgets; a Stream remembers

A dashboard that shows live prices needs three things from its data layer: a change feed (tell me when something changed), fan-out (tell everyone at once), and replay (tell me what I missed while I was away). Redis pub/sub gives you the first two and not the third: it is fire-and-forget and at-most-once, so a tab that was asleep when a price changed never learns about it. Redis Streams are an append-only log — every entry gets a stable, ordered id (<ms>-<seq>), entries persist until you trim them, and XREAD can start reading from any id to replay the gap. That third property is the whole project: it is what lets a briefly-offline client catch up exactly instead of refetching everything.

Postgres is the durable truth, but a request/response store cannot push or replay, which is precisely why a realtime store sits in front of it. The Redis track introduces pub/sub from redis-cli and mentions Streams; here we make Streams load-bearing.

Feel fan-out and replay in raw redis-cli first

Beginner

Prove the spotlight to yourself before any server code — in two terminals, watch XREAD BLOCK park and wake on a new XADD, then reconnect from an explicit id and watch it replay a gap — so the later server steps re-implement something you have already seen, not an act of faith.

New in this step
`XADD`

Appends one entry to a Stream and returns its assigned ordered id; * lets Redis pick the next id.

redis xadd command docs ↗
`XREAD`

Reads Stream entries after a given id — the live tail and the replay both come from this one command.

redis xread command docs ↗
`XREAD BLOCK 0`

Parks the connection waiting for the next entry instead of returning empty; 0 means wait forever (we move to a bounded block in the server).

redis xread block
`$` live-tail id

A special id meaning only entries appended from now on — start here to watch live, not from history.

redis xread dollar last id
replay from an explicit id

Calling XREAD with a real id you noted earlier returns every entry after it immediately — that is the reconnect path.

redis xread from id replay
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
See it work in two windows before you build it

The whole project rests on two Stream behaviours; feel them by hand now so the later server steps are a re-implementation of something you have already seen, not an act of faith. In window A, XREAD BLOCK 0 parks — it returns nothing until window B appends an entry, then wakes with that entry and its id. That is the live fan-out path. Then reconnect: call XREAD again but start from an explicit id you noted earlier, and Redis hands back every entry after it immediately — that is replay-from-id, the reconnect path. Two commands, both halves of the build, in your terminal in under a minute.

Two terminals against the Compose Redis
Run these in your terminal / editor
# Terminal A — park on the live tail ("$" = only entries appended from now on):
redis-cli -u "$REDIS_URL" XREAD BLOCK 0 STREAMS warehouse:1 '$'
# (it hangs here, waiting)

# Terminal B — append one event; note the id it prints, e.g. 1718900000000-0:
redis-cli -u "$REDIS_URL" XADD warehouse:1 '*' product_id 1 price 1399 stock 50 version 1
# -> back in Terminal A, the entry appears instantly with its id

# Terminal B — append two more, then REPLAY from the first id (in Terminal A, Ctrl-C first):
redis-cli -u "$REDIS_URL" XADD warehouse:1 '*' product_id 1 price 1350 stock 49 version 2
redis-cli -u "$REDIS_URL" XADD warehouse:1 '*' product_id 1 price 1300 stock 48 version 3
redis-cli -u "$REDIS_URL" XREAD STREAMS warehouse:1 1718900000000-0   # use YOUR first id
# -> returns the two entries AFTER that id, immediately — that is reconnect-replay
What success looks like

Terminal A’s XREAD BLOCK 0 hangs (the park), then prints the entry the instant Terminal B XADDs it — that is live fan-out. The replay XREAD STREAMS warehouse:1 <your-first-id> returns the two later entries immediately, in id order, and nothing before that id:

1) 1) "warehouse:1"
   2) 1) 1) "1718900001000-0"
         2) 1) "product_id" 2) "1" 3) "price" 4) "1350" ...
      2) 1) "1718900002000-0"
         2) 1) "product_id" 2) "1" 3) "price" 4) "1300" ...

Schema: products with price, stock, and a version

Beginner

Create a products table whose version column and CHECK constraints make every later guarantee possible — the version seeds optimistic concurrency and idempotent replay, and the CHECKs keep stock and price non-negative no matter which client writes.

New in this step
version column

An integer you bump on every write; later it rejects stale writes and lets a replayed event apply at-most-once by comparison.

optimistic concurrency version column track ↗
CHECK constraint

A rule the row must satisfy or the write is rejected, so CHECK (stock >= 0) makes negative stock impossible from any client.

postgres check constraint docs ↗
BIGINT GENERATED ALWAYS AS IDENTITY

The modern auto-incrementing 64-bit primary key (the successor to serial).

postgres identity column docs ↗
money as BIGINT cents

Integers can’t drift the way floats do; store 1499, not 14.99.

why store money as integer cents
TIMESTAMPTZ

A timestamp stored as a UTC instant, so updated_at is unambiguous across time zones.

postgres timestamptz vs timestamp docs ↗
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
The version column is the seed of optimistic concurrency

Money is BIGINT cents, never float. CHECK (stock >= 0) and CHECK (price >= 0) make impossible data unwritable by any client. The new idea versus a plain catalog is version: an integer you bump on every write. It lets two things work later — a stale write is rejected by a conditional UPDATE (optimistic concurrency), and a replayed event is applied idempotently by comparing versions. updated_at gives every change a wall-clock for the dashboard.

db/schema.sql
Run these in your terminal / editor
CREATE TABLE products (
  id          BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
  name        TEXT    NOT NULL,
  price       BIGINT  NOT NULL CHECK (price >= 0),   -- cents
  stock       INTEGER NOT NULL CHECK (stock >= 0),
  version     INTEGER NOT NULL DEFAULT 0,
  updated_at  TIMESTAMPTZ NOT NULL DEFAULT now()
);

INSERT INTO products (name, price, stock) VALUES
  ('Aurora Mug', 1499, 50),
  ('Aurora Tee', 2999, 12),
  ('Sticker Pack', 499, 200);

Load the schema and seed rows into Postgres

Beginner

Run db/schema.sql against the running Postgres yourself — Compose loads nothing automatically — so the products table and its three seed rows exist before any code queries them.

New in this step
`psql`

Postgres’s command-line client; you point it at DATABASE_URL to run SQL against the running container.

postgres psql cli docs ↗
`psql -f`

Runs every statement in a .sql file in order — here it creates the table and inserts the seed rows in one shot.

psql run sql file docs ↗
`docker compose exec`

Runs a command inside a running container, so you can pipe the schema through the db container when you have no local psql.

docker compose exec command
postgres default superuser

The image’s built-in postgres user, used by DATABASE_URL because Compose set only the password and db name (not POSTGRES_USER).

postgres docker default user
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Why this is its own step — nothing auto-loads it

The Compose file mounts no init script, so the table does not appear by magic — you load db/schema.sql yourself once, after docker compose up. The connection string’s user is postgres (the image’s default superuser, because Compose set only POSTGRES_PASSWORD and POSTGRES_DB); if you ever set POSTGRES_USER, update DATABASE_URL to match. After this load, GET /products will return exactly the three seed products the rest of the build assumes.

Load the schema (either form works)
Run these in your terminal / editor
# If you have a local psql client:
psql "$DATABASE_URL" -f db/schema.sql

# No local psql? Pipe it through the db container instead:
docker compose exec -T db psql -U postgres -d ticker < db/schema.sql

# Confirm the three seed rows landed:
psql "$DATABASE_URL" -c "SELECT id, name, price, stock, version FROM products ORDER BY id;"
What success looks like

The SELECT returns exactly three rows in id order, each with version 0:

 id |     name     | price | stock | version
----+--------------+-------+-------+---------
  1 | Aurora Mug   |  1499 |    50 |       0
  2 | Aurora Tee   |  2999 |    12 |       0
  3 | Sticker Pack |   499 |   200 |       0
(3 rows)

Now GET /products has something to return. Re-running schema.sql would error on the existing table — drop the volume (docker compose down -v) for a clean reload.

Model the change event you XADD to a Stream

Intermediate

Design the change event before writing code — a self-contained { product_id, price, stock, version } appended to one fixed Stream key — so every later step (write path, fan-out, replay, idempotency) shares the same shape and key. This build commits to one warehouse, warehouse:1.

New in this step
change event

The four fields a mutation produces (product_id, price, stock, version) — enough state to apply it without reading anything else.

self-contained event payload design
`warehouse:1` fixed key

The single Stream key this build commits to everywhere — write routes append to it, the feed tails it, the dashboard reads it.

redis stream key naming
Stream field values are strings

Redis stores every Stream field as text, so numbers are encoded as decimal strings on XADD and parsed back on read.

redis stream field types
`*` auto-id

Passing * to XADD tells Redis to assign the next monotonic <ms>-<seq> id and return it.

redis xadd auto id asterisk
`XLEN`

Reports how many entries a Stream currently holds — a quick way to confirm an event was appended.

redis xlen command docs ↗
`XRANGE`

Lists entries between two ids (- to + is oldest to newest); COUNT caps how many you get back.

redis xrange command docs ↗
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Design the event before the code

Every mutation produces exactly one event, and the event carries enough state to apply it without reading anything else: { product_id, price, stock, version }. You append it with XADD warehouse:1 * ...; Redis assigns the ordered id <ms>-<seq> and returns it.

Keying a Stream per warehouse (warehouse:{id}) is the general design — it keeps each feed small and lets a dashboard subscribe to just the warehouse it watches. To keep this build focused, we commit to a single warehouse: warehouse:1 is the fixed key everywhere — the write routes XADD to it, GET /stream tails it, and the dashboard reads it. (Generalising to a request-supplied warehouse — GET /stream?warehouse=N, write routes keyed by the product’s warehouse — is a clean exercise; if you do it, make every step use the same id, or a dashboard keyed to warehouse:2 will silently see nothing.)

Because the event is self-contained and the version is monotonic, a client can apply events from anywhere — fresh, replayed, or duplicated — and still converge (you lean on this in the idempotency step). One Redis fact to internalise now: Stream field values are strings, so encode numbers as decimal text and parse them on the way out.

The event, by hand in redis-cli
Run these in your terminal / editor
# Append a change event; * = let Redis assign the next ordered id
redis-cli -u "$REDIS_URL" XADD warehouse:1 '*' product_id 1 price 1399 stock 50 version 1
# -> "1718900000000-0"   (the stable id, shaped <ms>-<seq>)

redis-cli -u "$REDIS_URL" XLEN warehouse:1                  # how many events are buffered
redis-cli -u "$REDIS_URL" XRANGE warehouse:1 - + COUNT 5    # oldest 5 entries  (- is the min id, + is the max id)
What success looks like

XADD returns the assigned id shaped <ms>-<seq> (e.g. 1718900000000-0), XLEN warehouse:1 reports the count of buffered events, and XRANGE lists each entry with its four fields — every value a string, since Stream field values are always strings:

1) 1) "1718900000000-0"
   2) 1) "product_id" 2) "1" 3) "price" 4) "1399" 5) "stock" 6) "50" 7) "version" 8) "1"

Scaffold the Go fan-out server

Go Beginner

Stand up one Go binary that opens a pgxpool and a go-redis client and serves both the dashboard and a GET /products snapshot — the single process every later route hangs off.

New in this step
pgx and `pgxpool`

The standard Postgres driver for Go; pgxpool is its connection pool, opened once at startup and reused per request.

jackc pgx pgxpool track ↗ docs ↗
go-redis

The standard Go Redis client (github.com/redis/go-redis/v9); built from redis.ParseURL(REDIS_URL).

go-redis v9 client track ↗
`http.ServeMux` (1.22 method patterns)

Go’s built-in router; since 1.22 a pattern can include the method and path, e.g. GET /products.

go 1.22 http servemux method patterns docs ↗
`http.FileServer`

Serves files from a directory and auto-returns index.html at /, so one binary also serves the dashboard.

go http fileserver static docs ↗
context (`ctx`)

A per-request value that carries cancellation and deadlines; pass r.Context() to every DB and Redis call.

go context package docs ↗
`PORT` env var

The listen port read from the environment (default 8080) so the later multi-instance step can run a second copy.

read port from environment go
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Why pgx + go-redis, and one binary that serves the UI

pgx is the standard Postgres driver for Go and github.com/redis/go-redis/v9 the standard Redis client; both take a context on every call. The server is deliberately one process doing three jobs — REST writes, an SSE stream, and serving the static public/index.html — so a learner runs one binary and opens one URL. Always pass r.Context() and use parameters ($1); never string-concatenate SQL.

Set up the module
Run these in your terminal / editor
go mod init github.com/you/ticker
go get github.com/jackc/pgx/v5
go get github.com/redis/go-redis/v9
Pool, client, snapshot endpoint
Run these in your terminal / editor
// cmd/server/main.go (essentials)
type Product struct {
	ID      int64  `json:"id"`
	Name    string `json:"name"`
	Price   int64  `json:"price"` // cents
	Stock   int    `json:"stock"`
	Version int    `json:"version"`
}

ctx := context.Background()
pool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
if err != nil { log.Fatal(err) }
defer pool.Close()

opt, err := redis.ParseURL(os.Getenv("REDIS_URL"))
if err != nil { log.Fatal(err) }
rdb := redis.NewClient(opt)

mux := http.NewServeMux()
mux.Handle("GET /", http.FileServer(http.Dir("public"))) // serves public/index.html
mux.HandleFunc("GET /products", func(w http.ResponseWriter, r *http.Request) {
	rows, _ := pool.Query(r.Context(),
		`SELECT id, name, price, stock, version FROM products ORDER BY id`)
	defer rows.Close()
	out := []Product{}
	for rows.Next() {
		var p Product
		_ = rows.Scan(&p.ID, &p.Name, &p.Price, &p.Stock, &p.Version)
		out = append(out, p)
	}
	w.Header().Set("Content-Type", "application/json")
	_ = json.NewEncoder(w).Encode(out)
})

port := os.Getenv("PORT") // honor PORT so the multi-instance step can run two copies
if port == "" {
	port = "8080"
}
log.Fatal(http.ListenAndServe(":"+port, mux))
Agent prompt — paste into an agent with repo access
For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Senior Go engineer in this repo.
Context: Postgres via env DATABASE_URL, Redis via REDIS_URL; schema loaded from db/schema.sql; github.com/jackc/pgx/v5 and github.com/redis/go-redis/v9.
Task: Scaffold cmd/server with a pgxpool, a go-redis client, a GET /products snapshot, and a static file server for ./public.
Requirements:
- Pool and client created once at startup, closed on shutdown; every call takes r.Context().
- Money is int64 cents; parameterised queries only; the redis client is built from redis.ParseURL(REDIS_URL).
- GET /products returns 200 with a JSON array ordered by id; GET / serves public/index.html via http.FileServer(http.Dir("public")) (it serves index.html at / automatically). The listen port reads PORT, default 8080.
Tests / acceptance:
- `go build ./...` passes; `curl -s localhost:8080/` returns the dashboard HTML; after loading the schema, `curl -s localhost:8080/products | jq length` returns 3.
Output: a unified diff plus a one-line note on pool sizing.
What success looks like

go build ./... is clean and the binary starts on :8080. curl -s localhost:8080/ returns the dashboard HTML, and once the schema is loaded curl -s localhost:8080/products | jq length prints 3 — the snapshot as JSON numbers (price, stock, version), ordered by id:

[{"id":1,"name":"Aurora Mug","price":1499,"stock":50,"version":0}, ...]

Scaffold the TypeScript edge server

TypeScript Beginner

Stand up one Hono app on Bun that connects node-postgres and node-redis and serves both the dashboard and a GET /products snapshot — the TypeScript twin of the Go server, byte-identical at the contract.

New in this step
Hono

A tiny web framework built on the Web Request/Response API, so the same app runs on Bun, Node, and the edge.

hono web framework track ↗
Bun

A fast all-in-one JavaScript runtime whose built-in server reads the app’s default export.

bun runtime server docs ↗
node-postgres `Pool`

The pg package’s Postgres connection pool — the TypeScript counterpart to Go’s pgxpool.

node-postgres pool pg track ↗
node-redis (`redis@^5`)

The redis package, pinned to v5 because its Stream and blocking-read behaviour is version-sensitive.

node-redis v5 client track ↗
`serveStatic` path fallback

Hono’s bun static handler maps a path to a file, so a bare / doesn’t auto-serve index.html; the path: catch-all does.

hono servestatic spa fallback
Bun default-export port + fetch

Bun starts a server from export default { port, fetch }; reading process.env.PORT lets a second instance run elsewhere.

bun server default export fetch
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Why Hono + Bun — one app, two runtimes

Hono is a tiny web framework that speaks the Web Request/Response API, so the same app runs on Bun, Node, and the edge. Bun is a fast, batteries-included runtime whose built-in server reads the app’s default export. You’ll use pg (node-postgres) for the durable store and the redis package (node-redis, pinned to v5) for Streams. Keep request handlers free of Node-only APIs so the app stays portable — exactly the discipline the TypeScript track builds.

One static-serving gotcha worth getting right now: Hono’s bun serveStatic({ root }) maps the request path to a file under root, so a bare GET / resolves to the directory and does not reliably auto-serve index.html the way Go’s http.FileServer does. The documented SPA pattern is a catch-all that points at the file directly — serveStatic({ path: "./public/index.html" }) — registered after your API routes. That keeps GET / returning the dashboard, so the page works identically on both backends.

Set up the project
Run these in your terminal / editor
bun init -y
bun add hono pg redis@^5      # pin node-redis v5 — its Stream/blocking-read behaviour is version-sensitive
# Node alternative: npm add hono pg redis@^5 @hono/node-server
App, clients, snapshot endpoint
Run these in your terminal / editor
// src/server.ts
import { Hono } from "hono";
import { serveStatic } from "hono/bun";
import { Pool } from "pg";
import { createClient } from "redis";

const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();

const app = new Hono();

app.get("/products", async (c) => {
  const { rows } = await pool.query(
    "SELECT id, name, price, stock, version FROM products ORDER BY id",
  );
  return c.json(rows);
});

// Serve the single dashboard file. Hono's bun serveStatic maps a request path to a file under `root`,
// so a request for "/" resolves to the *directory*, not index.html. Use the `path:` form as the SPA
// fallback so GET / returns the dashboard. (Register this AFTER your API routes so it only catches the rest.)
app.get("*", serveStatic({ path: "./public/index.html" }));

const port = Number(process.env.PORT) || 8080; // honor PORT for the multi-instance step
export default { port, fetch: app.fetch };       // Bun reads port + fetch from the default export
Agent prompt — paste into an agent with repo access
For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Senior TypeScript engineer in this repo building an edge-portable server.
Context: Bun runtime; hono, pg (node-postgres), and redis (node-redis, pinned redis@^5) installed; Postgres via DATABASE_URL, Redis via REDIS_URL; schema loaded from db/schema.sql.
Task: Build src/server.ts — a Hono app with GET /products (snapshot) and static serving of ./public, exported as a Bun server whose port reads PORT (default 8080).
Requirements:
- Create the pg Pool and the node-redis client once; await redis.connect() at startup.
- GET /products returns the rows as JSON ordered by id; prices stay integer cents.
- Serve public/index.html at / using the catch-all path fallback: app.get("*", serveStatic({ path: "./public/index.html" })), registered AFTER the API routes; no Node-only APIs in request handlers so the app stays portable.
- The exported port reads process.env.PORT (Number(process.env.PORT) || 8080) so a second instance can run on another port.
Tests / acceptance:
- `bun run src/server.ts` starts; `curl -s localhost:8080/` returns the dashboard HTML (not a 404); after loading the schema, `curl -s localhost:8080/products | jq length` returns 3.
Output: a unified diff plus the one-line Node entrypoint variant using @hono/node-server's serve({ fetch: app.fetch }).
What success looks like

bun run src/server.ts starts on :8080. curl -s localhost:8080/ returns the dashboard HTML (not a 404 — the path: catch-all serves it), and curl -s localhost:8080/products | jq length prints 3. The body is byte-identical to the Go server’s snapshot — same id order, same JSON-number fields — which is the parity the rest of the build leans on.

★ The write path: commit to Postgres, then XADD the event

Intermediate

Make every write two ordered steps — commit to Postgres first, then XADD the post-write state to warehouse:1 — so the Stream never advertises a change that did not actually happen. This is the spotlight’s spine; the next step wires it to HTTP routes.

New in this step
commit-before-publish

Persist to the durable store first, then publish the event, so the feed can never advertise a change that did not commit.

transactional outbox commit before publish
conditional UPDATE

An UPDATE ... WHERE that only changes a row when the predicate still holds (matching version, or stock >= qty); zero rows means it did not.

postgres conditional update where docs ↗
`UPDATE ... RETURNING`

Hands back columns from the rows the UPDATE touched, so you get the post-write price/stock/version in one round trip to serialise into the XADD.

postgres update returning clause docs ↗
expectedVersion

The version the client last saw, passed into the WHERE; if it no longer matches, someone else won the race and zero rows update.

optimistic concurrency expected version
`XADD MAXLEN ~ 10000`

Appends the event while capping the Stream to ~10000 entries; ~ trims in efficient batches (a little over is fine).

redis xadd maxlen approximate docs ↗
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Commit first, then publish — and why that order

This is the spine of the project. A write does two things: it updates the row in Postgres (the durable truth) and appends an event to the Stream (the thing watchers read). Do them in that order — Postgres commits first, so the Stream never advertises a change that didn’t actually happen. The UPDATE ... RETURNING price, stock, version hands you the post-write state in one round trip; you serialise exactly that into the XADD. Because the event id is assigned by Redis and is monotonic, the Stream is now the canonical order of changes — which is what makes fan-out and replay agree across every client and every server instance.

This step gives you the two store operations — setPrice(id, newPrice, expectedVersion) and sell(id, qty) — as the SQL + commit-then-XADD order. The HTTP routes that call them (PUT /products/{id}/price, POST /products/{id}/sell) are the very next step, so the dashboard has real endpoints to hit.

Commit, then XADD (language-agnostic order)
Run these in your terminal / editor
1. BEGIN
2. UPDATE products SET price = $new, version = version + 1, updated_at = now()
        WHERE id = $id AND version = $expected
        RETURNING price, stock, version
3. COMMIT
4. XADD warehouse:1 MAXLEN ~ 10000 * product_id $id price $price stock $stock version $version
        -> returns the event id, e.g. 1718900000000-0   (MAXLEN ~ bounds the Stream; see the bounding step)
Agent prompt — paste into an agent with repo access
For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Senior backend engineer in this repo (use the selected backend).
Context: Postgres has products(id, name, price, stock, version); a Redis client is connected. This build uses one fixed Stream key, "warehouse:1".
Task: Implement the two write-path store operations — setPrice(id, newPrice, expectedVersion) and sell(id, qty) — that commit to Postgres, then XADD the change event. (The HTTP routes that call these are the next step.)
Requirements:
- setPrice updates Postgres first: `... SET price=$1, version=version+1, updated_at=now() WHERE id=$2 AND version=$3 RETURNING price, stock, version`; only on a committed (one-row) result do you XADD.
- sell updates atomically: `... SET stock=stock-$1, version=version+1, updated_at=now() WHERE id=$2 AND stock>=$1 RETURNING price, stock, version`; XADD only on a one-row result.
- XADD to "warehouse:1" with auto-id * and fields product_id, price, stock, version (values as decimal strings); capture the returned event id. Add MAXLEN ~ 10000.
- Never XADD before the commit succeeds; a zero-row result is NOT an event.
- A zero-row UPDATE has two causes — stale version / insufficient stock (row exists) or unknown id (no row) — and the conditional UPDATE alone cannot tell them apart. On zero rows, re-SELECT the row by id: if present it is a conflict / out-of-stock; if absent the id is unknown.
- Return enough to the caller to form the HTTP response: on success the post-write {id,name,price,stock,version}; on a zero-row failure both a found signal (the route maps not-found to 404) and, when found, the current row (the route maps it to 409).
Tests / acceptance:
- After a successful price change, XLEN warehouse:1 increases by exactly 1 and XRANGE shows the new price and version.
- A failed (zero-row) setPrice or sell appends no event (XLEN unchanged).
Output: a unified diff plus a one-paragraph note on why commit-before-XADD is the correct order.
What success looks like

A successful setPrice/sell commits the row and appends exactly one event: XLEN warehouse:1 grows by 1 and XREVRANGE warehouse:1 + - COUNT 1 shows the post-write price/stock/version. A zero-row write (stale version, insufficient stock, or unknown id) commits nothing and appends nothing — XLEN is unchanged, proving the Stream never advertises a change that did not happen:

# after one successful price change of product 1:
XLEN warehouse:1            -> (integer) 1
XADD ... returns           -> "1718900000000-0"   # the new event's id
# after a stale-version PUT: XLEN warehouse:1 -> (integer) 1  (no new event)

Wire the write routes: PUT price and POST sell

Go Intermediate

Give the dashboard real endpoints by exposing the write path as PUT /products/{id}/price and POST /products/{id}/sell — each maps the store result to the contract (200+row, 409+current row, 404 unknown id) and fans out via the write path’s XADD on success.

New in this step
`r.PathValue`

Reads a named path segment from a 1.22 route pattern, e.g. the {id} in PUT /products/{id}/price.

go 1.22 r.pathvalue docs ↗
`json.NewDecoder`

Decodes the request body’s JSON into a struct; a malformed body is your 400 branch.

go json newdecoder decode body docs ↗
`pgx.ErrNoRows`

The error a RETURNING query gives when the conditional UPDATE matched zero rows — the signal you must disambiguate.

pgx errnorows no rows track ↗
re-SELECT disambiguation

On zero rows, re-SELECT the row by id: present ⇒ stale/out-of-stock (409 + current row); absent ⇒ unknown id (404).

distinguish 404 from 409 conditional update
status codes 200 / 409 / 404

Success returns 200 + the new row, a version/stock conflict returns 409 + the live row to rebase to, an unknown id returns 404.

http status 200 409 404 meaning
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
The routes are the deliverable — and the zero-row case is two different errors

The dashboard’s slider PUTs {price, version} and its Sell button POSTs {qty}; without these routes both 404. Each handler is thin: parse the JSON body and the {id} path value (Go 1.22’s r.PathValue("id")), call the matching store op from the write path, and translate the result to the HTTP contract. The XADD already lives inside the write path, so a successful route automatically fans the change out.

The subtle part is the zero-row case. The conditional UPDATE ... WHERE id=$id AND version=$expected (or ... AND stock>=$qty) returns no row for two different reasons: the version was stale / the stock was insufficient (the row exists), or there is no such id at all. pgx.ErrNoRows from the UPDATE’s RETURNING cannot tell them apart — so on zero rows you re-SELECT the row by id alone. If that re-SELECT finds the row, it is a real conflict / out-of-stock: return 409 with the current {price, stock, version} so the client rebases to the live value. If the re-SELECT finds nothing, the id is unknown: return 404. One extra indexed PK lookup, only on the cold failure path, is what makes the two errors distinguishable — a route that always returned 409 could never surface a genuine bad id.

The two write routes (Go, net/http 1.22 path values)
Run these in your terminal / editor
type priceReq struct {
	Price   int64 `json:"price"`
	Version int   `json:"version"`
}
type sellReq struct {
	Qty int `json:"qty"`
}

// PUT /products/{id}/price  — body {price, version}
mux.HandleFunc("PUT /products/{id}/price", func(w http.ResponseWriter, r *http.Request) {
	id, _ := strconv.ParseInt(r.PathValue("id"), 10, 64)
	var req priceReq
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, "bad_request", http.StatusBadRequest)
		return
	}
	// conditional UPDATE ... WHERE id=$ AND version=$ RETURNING price, stock, version
	// found=false => the re-SELECT found no such id; conflict=true => row exists but version was stale.
	p, conflict, found, err := setPrice(r.Context(), pool, rdb, id, req.Price, req.Version)
	writeResult(w, p, conflict, found, err)
})

// POST /products/{id}/sell  — body {qty}
mux.HandleFunc("POST /products/{id}/sell", func(w http.ResponseWriter, r *http.Request) {
	id, _ := strconv.ParseInt(r.PathValue("id"), 10, 64)
	var req sellReq
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Qty < 1 {
		http.Error(w, "bad_request", http.StatusBadRequest)
		return
	}
	// atomic UPDATE ... WHERE id=$ AND stock>=$qty RETURNING price, stock, version
	p, outOfStock, found, err := sell(r.Context(), pool, rdb, id, req.Qty)
	writeResult(w, p, outOfStock, found, err)
})

// writeResult maps the store result to the contract:
//   404 unknown id  |  409 + current row on conflict/out-of-stock  |  200 + product on success.
func writeResult(w http.ResponseWriter, p Product, failed bool, found bool, err error) {
	w.Header().Set("Content-Type", "application/json")
	if err != nil {
		http.Error(w, "server_error", http.StatusInternalServerError)
		return
	}
	if !found { // the re-SELECT found no row for this id
		http.Error(w, "not_found", http.StatusNotFound)
		return
	}
	if failed {
		w.WriteHeader(http.StatusConflict) // 409; body is the current row (+ "error" for sell)
	}
	_ = json.NewEncoder(w).Encode(p)
}
Agent prompt — paste into an agent with repo access
For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Senior Go engineer in this repo.
Context: cmd/server already has the pool, the go-redis client (rdb), GET /products, and the write-path store ops setPrice(ctx,pool,rdb,id,price,version) and sell(ctx,pool,rdb,id,qty) that commit then XADD to "warehouse:1". Go 1.22 net/http path values.
Task: Add PUT /products/{id}/price and POST /products/{id}/sell that call the write path and return the contract.
Requirements:
- Parse {id} via r.PathValue("id"); parse the JSON body ({price,version} for price, {qty} for sell); a malformed body or qty < 1 is 400.
- On a one-row update: 200 with the post-write product JSON {id,name,price,stock,version}. The store op XADDs on success — do not XADD in the handler.
- On a zero-row update, disambiguate by re-SELECTing the row by id: if the row exists it is a conflict / out-of-stock -> 409 with the CURRENT row JSON (for sell include an "error":"out_of_stock" field); if the re-SELECT finds nothing the id is unknown -> 404. The store op returns both signals (e.g. a found bool plus a conflict/outOfStock bool). Emit no event on either failure.
- price/stock/version are JSON numbers in these bodies (cents for price).
Tests / acceptance:
- curl -X PUT localhost:8080/products/1/price -d '{"price":1399,"version":0}' returns 200 and {"version":1,...}; repeating with the stale version 0 returns 409 with the current row.
- curl -X POST localhost:8080/products/1/sell -d '{"qty":1}' returns 200 with stock-1; selling more than stock returns 409 {"error":"out_of_stock",...}.
- curl -X PUT localhost:8080/products/9999/price -d '{"price":1,"version":0}' (and the same for /sell) returns 404, not 409 — the id does not exist.
Output: a unified diff plus a one-paragraph note on why a re-SELECT on the zero-row path is what separates 404 from 409.
What success looks like

The three branches of the contract are reachable. A first PUT /products/1/price with {"price":1399,"version":0} returns 200 with version bumped to 1; repeating it with the now-stale version:0 returns 409 carrying the current row (so the client can rebase). Selling more than stock returns 409 with {"error":"out_of_stock",...}, and a write to a missing id returns 404, not 409 — the re-SELECT is what makes that distinction real:

PUT /products/1/price {"price":1399,"version":0}  -> 200 {"id":1,...,"version":1}
PUT /products/1/price {"price":1500,"version":0}  -> 409 {"id":1,...,"version":1}
PUT /products/9999/price {...}                    -> 404

Wire the write routes: PUT price and POST sell

TypeScript Intermediate

Give the dashboard real endpoints by exposing the write path as two Hono routes (PUT /products/:id/price, POST /products/:id/sell) whose 200/409/404 bodies are byte-identical to the Go server — the parity the project claims.

New in this step
`c.req.param`

Reads a named route parameter, e.g. the :id in PUT /products/:id/price.

hono path param c.req.param track ↗
`c.req.json`

Parses the request body as JSON into a typed shape; a non-integer field or qty < 1 is your 400 branch.

hono parse json body
`result.rowCount`

node-postgres’s count of rows the query affected; 0 means the conditional UPDATE matched nothing — the case to disambiguate.

node-postgres rowcount track ↗
`c.json(body, status)`

Returns a JSON response with an explicit status, e.g. c.json(row, 409) for a conflict carrying the current row.

hono c.json status code
re-SELECT disambiguation

On a zero-row result, re-SELECT the row by id: present ⇒ 409 + current row; absent ⇒ 404 — identical to the Go server.

distinguish 404 from 409 conditional update
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Thin handlers over the write path — and the zero-row case is two errors

The dashboard’s slider PUTs {price, version} and its Sell button POSTs {qty}; without these routes both 404. Each Hono handler reads the :id param (c.req.param("id")) and the JSON body (await c.req.json()), calls the matching write-path op, and maps the result to the contract. The XADD lives inside the write path, so a successful route fans the change out automatically.

The zero-row case carries two distinct errors, exactly as in the Go server. The conditional UPDATE matches no row when the version is stale / stock is insufficient (the row exists) or when there is no such id — result.rowCount === 0 (node-postgres) cannot tell which. So on a zero-row result, re-SELECT the row by id: if it comes back, return c.json(currentRow, 409) (a real conflict or out-of-stock, the client rebases to the live value); if it comes back empty, return c.json({ error: "not_found" }, 404). The write-path op returns both signals (e.g. { row, conflict, found }) so the handler stays a thin mapper — and the two backends return byte-identical 404/409/200 responses.

The two write routes (Hono + node-postgres)
Run these in your terminal / editor
// PUT /products/:id/price — body {price, version}
app.put("/products/:id/price", async (c) => {
  const id = Number(c.req.param("id"));
  const { price, version } = await c.req.json<{ price: number; version: number }>();
  if (!Number.isInteger(price) || !Number.isInteger(version)) {
    return c.json({ error: "bad_request" }, 400);
  }
  // found=false => the re-SELECT found no such id; conflict=true => row exists but version was stale.
  const { row, conflict, found } = await setPrice(pool, redis, id, price, version); // commits, then XADD
  if (!found) return c.json({ error: "not_found" }, 404);
  return conflict ? c.json(row, 409) : c.json(row, 200);
});

// POST /products/:id/sell — body {qty}
app.post("/products/:id/sell", async (c) => {
  const id = Number(c.req.param("id"));
  const { qty } = await c.req.json<{ qty: number }>();
  if (!Number.isInteger(qty) || qty < 1) {
    return c.json({ error: "bad_request" }, 400);
  }
  const { row, outOfStock, found } = await sell(pool, redis, id, qty); // atomic decrement, then XADD
  if (!found) return c.json({ error: "not_found" }, 404);
  return outOfStock ? c.json({ error: "out_of_stock", ...row }, 409) : c.json(row, 200);
});
Agent prompt — paste into an agent with repo access
For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Senior TypeScript engineer in this repo.
Context: src/server.ts has the pg Pool, the node-redis client, GET /products, and the write-path ops setPrice(pool,redis,id,price,version) and sell(pool,redis,id,qty) that commit then XADD to "warehouse:1". Hono on Bun.
Task: Add PUT /products/:id/price and POST /products/:id/sell that call the write path and return the contract, matching the Go server exactly.
Requirements:
- Read :id via c.req.param; parse the JSON body ({price,version} or {qty}); a non-integer field or qty < 1 is 400.
- On a one-row update: 200 with the post-write product {id,name,price,stock,version}. The write-path op XADDs on success — do not XADD in the handler.
- On a zero-row update, disambiguate by re-SELECTing the row by id: if the row exists -> 409 with the CURRENT row (for sell add "error":"out_of_stock"); if the re-SELECT finds nothing -> 404 {"error":"not_found"}. The write-path op returns both signals ({ row, conflict|outOfStock, found }). Emit no event on either failure.
- price/stock/version are JSON numbers (cents for price); register these routes BEFORE the serveStatic catch-all.
Tests / acceptance:
- PUT /products/1/price {"price":1399,"version":0} -> 200 {"version":1,...}; repeat with version 0 -> 409 current row.
- POST /products/1/sell {"qty":1} -> 200 stock-1; over-sell -> 409 {"error":"out_of_stock",...}.
- PUT /products/9999/price and POST /products/9999/sell -> 404, not 409 (no such id). Behaviour byte-identical to the Go server.
Output: a unified diff plus a one-paragraph note on keeping the 404/409/200 body shapes identical across backends.
What success looks like

Byte-identical to the Go server. The same PUT/POST/over-sell/missing-id sequence returns the same 200/409/409 out_of_stock/404 codes and the same JSON bodies (the 409 carries the current row, with "error":"out_of_stock" added for sell). A learner can swap backends mid-build and the dashboard cannot tell — that is the parity the project claims.

Optimistic concurrency: conditional UPDATE, stale write returns 409

Intermediate

Let writers race without locks and detect the loser — the conditional UPDATE succeeds only if the client’s version still matches, and a mismatch returns 409 + the current row so the client rebases in one retry.

New in this step
optimistic concurrency

Assume writes rarely collide: let them race, then reject any whose version changed underneath — no lock held across the edit.

optimistic concurrency control track ↗
pessimistic locking

The opposite approach — lock the row before editing so others wait; cheaper to reason about, but slower when conflicts are rare.

pessimistic locking select for update
rebase on conflict

Because the 409 body has the same shape as a change event, the client adopts the fresh value and retries once, the same way it would on fan-out.

rebase to current value on 409
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Optimistic concurrency without locks

Many operators may edit the same product. Rather than lock rows, you let writers race and detect the loser: the write carries the version the client last saw, and UPDATE ... WHERE id = $id AND version = $expected bumps the version only if nothing changed underneath. Zero rows means someone else won the version race; the write-path step already covers how a zero-row result re-SELECTs by id to return 409 (row present) versus 404 (no such id). What this step adds is the rebase: because the 409 body has the same shape as a change event, the client reconciles to the fresh value the same way whether it learns of it by conflict or by fan-out — so a stale write costs one retry, not a lock held across the edit. That makes optimistic concurrency cheaper than pessimistic locking exactly when conflicts are rare.

The conditional UPDATE
Run these in your terminal / editor
UPDATE products
   SET price = $1, version = version + 1, updated_at = now()
 WHERE id = $2 AND version = $3
RETURNING price, stock, version;
-- 0 rows -> re-SELECT the row by id: found = version conflict (HTTP 409 + that row); not found = unknown id (HTTP 404)
-- 1 row  -> success: XADD the returned state as a change event

The oversell guard in one atomic line

Intermediate

Make overselling impossible with a single atomic statement — ... WHERE stock >= qty — so two clients racing for the last unit serialise and exactly one wins; this one line is the whole concurrency story here, unlike Aurora’s full transaction.

New in this step
oversell guard

The WHERE stock >= qty predicate that lets the decrement match a row only when enough stock remains — no row, no sale.

prevent overselling sql where stock
atomic statement

A single UPDATE runs as one indivisible operation, so the check and the decrement can’t be split by another writer.

atomic single statement update docs ↗
row lock

Postgres locks the matched row for the duration of the statement, so two clients racing for the last unit serialise — one wins, the other matches none.

postgres row lock during update docs ↗
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
One line here; a whole transaction there

A sale decrements stock and, like every write, bumps the version and emits an event. The safety is one atomic statement: `UPDATE products SET stock = stock - $qty, version = version + 1 WHERE id = $id AND stock

= $qty. The stock >= $qty predicate *is* the guard: Postgres row-locks the matched row for the duration of the statement, so two clients racing for the last unit serialise — exactly one affects a row, the other matches none. The zero-row outcome then flows through the same disambiguation the [write-path step](#write-path) defines (re-SELECTby id →409 out_of_stockif the row exists,404if it does not). This is deliberately the same primitive Aurora uses, but here it is a single line, not the lesson: **[Aurora Commerce](/projects/aurora-commerce/)** spends a wholeBEGIN … COMMIT` making a multi-row checkout atomic; Ticker needs only this one guard and spends its energy on propagating the result of the sale to every watcher. Same guarantee, opposite emphasis.

Atomic decrement = the oversell guard
Run these in your terminal / editor
UPDATE products
   SET stock = stock - $1, version = version + 1, updated_at = now()
 WHERE id = $2 AND stock >= $1
RETURNING price, stock, version;
-- 0 rows -> re-SELECT by id: found = insufficient stock (409 "out_of_stock"); not found = unknown id (404). No event either way.
-- 1 row  -> sale committed: XADD the new {price, stock, version} to the Stream

★ Tail the Stream and fan out over SSE

Go Advanced

Turn GET /stream into a live fan-out — loop on a bounded XREAD BLOCK and write one SSE id:/data: frame per event (plus a keepalive on each idle timeout) — so every connected dashboard, across every instance, tails the same Stream.

New in this step
Server-Sent Events (`text/event-stream`)

A long-lived HTTP response that streams text events to the browser one-way — exactly the live feed shape.

server sent events sse
`id:` / `data:` frames

Each SSE event is id: <stream-id> then data: <json> then a blank line; id: is what the browser echoes back on reconnect.

sse event stream frame format
`http.Flusher`

The interface whose Flush() pushes buffered bytes to the client immediately — without it the browser sees nothing until a buffer fills.

go http flusher streaming docs ↗
bounded `XREAD BLOCK`

XREAD BLOCK 15000 parks for at most 15s, so a closed tab frees its reader instead of blocking forever like BLOCK 0.

redis xread block timeout
`redis.Nil` on timeout

The error go-redis returns when the bounded block elapsed with no entries — your cue to heartbeat and re-check the connection.

go-redis redis.nil error
`:keepalive` heartbeat

A comment line (:-prefixed) EventSource ignores but idle proxies don’t — it keeps the connection from being killed.

sse keepalive comment line
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
One reader, many subscribers — and N instances act as one

Server-Sent Events is just a long-lived HTTP response with Content-Type: text/event-stream; you write id: <stream-id> then data: <json> then a blank line per event, and flush. The source of those events is XREAD BLOCK 15000 STREAMS warehouse:1 $, which parks the connection until the next entry is appended (or the block elapses), then returns it with its id. Because every server instance tails the same Stream, you can run N servers and they all deliver the same ordered events — the Stream, not the process, is the fan-out point, so horizontal scaling is free. Pass the id the client last saw (the next step wires Last-Event-ID) instead of $ to begin with replay. Flush after every write (http.Flusher) or the browser sees nothing until a buffer fills.

SSE fan-out (go-redis + http.Flusher)
Run these in your terminal / editor
// GET /stream — Server-Sent Events fan-out from a Redis Stream
mux.HandleFunc("GET /stream", func(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	flusher, ok := w.(http.Flusher)
	if !ok { http.Error(w, "streaming unsupported", 500); return }

	stream := "warehouse:1"
	lastID := r.Header.Get("Last-Event-ID") // empty on the first connect
	if lastID == "" { lastID = "$" }         // "$" = only events appended from now on

	for {
		// Bounded block, NOT Block: 0 — a 15s read deadline lets a closed tab
		// (cancelled r.Context()) free this reader instead of parking forever.
		res, err := rdb.XRead(r.Context(), &redis.XReadArgs{
			Streams: []string{stream, lastID},
			Block:   15 * time.Second,
			Count:   500,
		}).Result() // XRead returns *XStreamSliceCmd; .Result() gives ([]XStream, error)
		if err == redis.Nil { // block elapsed with no entries
			fmt.Fprint(w, ":keepalive\n\n") // comment line; EventSource ignores it, proxies stay alive
			flusher.Flush()
			if r.Context().Err() != nil { return } // tab closed during the idle window
			continue
		}
		if err != nil { return } // client gone or context cancelled
		for _, msg := range res[0].Messages {
			lastID = msg.ID
			payload, _ := json.Marshal(msg.Values)
			fmt.Fprintf(w, "id: %s\ndata: %s\n\n", msg.ID, payload)
		}
		flusher.Flush()
	}
})
Agent prompt — paste into an agent with repo access

Before you write it: when a data frame goes out on /stream, which of price, stock, version are JSON numbers and which are strings — and why does the wire format differ from GET /products?

For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Senior Go engineer in this repo.
Context: go-redis v9 client at rdb; Stream key "warehouse:1"; the server already serves /products and the dashboard.
Task: Add GET /stream that fans out the Redis Stream to the client over Server-Sent Events.
Requirements:
- Set Content-Type: text/event-stream and Cache-Control: no-cache; assert http.Flusher and Flush() after writing.
- Read the start id from the Last-Event-ID header; default to "$" (only-new) when absent.
- Loop on rdb.XRead with XReadArgs{Streams: []string{stream, lastID}, Block: 15 * time.Second, Count: 500}. Do NOT use Block: 0 — in go-redis v9 a cancelled context does not interrupt a parked Block: 0 read, leaking a connection per closed tab.
- On a timeout (err == redis.Nil) write a ":keepalive\n\n" comment, Flush(), and return if r.Context().Err() != nil so a closed tab is released within ~15s; otherwise continue.
- For each message write "id: <id>\ndata: <json>\n\n", advance lastID to its id, then Flush(); return on any other error.
Tests / acceptance:
- curl -N localhost:8080/stream stays open; an XADD to warehouse:1 (or a price change) appears as an SSE frame within ~100ms; an idle connection still receives a ":keepalive" comment about every 15s.
- Reconnecting with -H "Last-Event-ID: <id>" replays only events after that id.
- Closing the client (Ctrl-C) frees the server-side reader within ~15s (the goroutine returns on the next timeout).
Output: a unified diff plus a one-paragraph note on why a bounded block beats Block: 0 for connection cleanup.
What success looks like

curl -N localhost:8080/stream stays open. A price change (or a raw XADD) appears within ~100ms as a data frame — an id: line carrying the Stream id and a data: line whose JSON fields are strings (forwarded msg.Values), terminated by a blank line. On an idle connection you see a :keepalive comment about every 15s (the bounded-block timeout), which EventSource ignores but proxies don’t:

id: 1718900000000-0
data: {"product_id":"1","price":"1399","stock":"50","version":"4"}

:keepalive

★ Tail the Stream and fan out over SSE

TypeScript Advanced

Build the same live fan-out in Hono — streamSSE over a dedicated Redis connection with a bounded xRead BLOCK, writeSSE per event, keepalive on idle — so the wire output is contract-identical to the Go server.

New in this step
`streamSSE` / `writeSSE`

Hono’s helper that turns a handler into a text/event-stream response; writeSSE({ id, data }) emits one frame.

hono streamsse writesse track ↗
dedicated `redis.duplicate()` connection

A blocking read ties up its connection, so run it on a duplicated client opened per stream — never the shared one your REST routes use.

node-redis duplicate blocking connection
bounded `xRead BLOCK`

xRead(..., { BLOCK: 15000 }) parks at most 15s; node-redis also documents a BLOCK 0 foot-gun that can fail to resolve on a duplicated connection.

node-redis xread block timeout
`null` on timeout

node-redis returns null when the bounded block elapsed with no entries — your cue to heartbeat and re-check the loop guard.

node-redis xread returns null
`sse.closed` / `sse.aborted`

Flags Hono sets when the client disconnects; re-checking them each loop frees the duplicated reader within ~15s.

hono sse closed aborted
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Async iteration models an SSE stream cleanly

Hono’s streamSSE helper turns a handler into a text/event-stream response and gives you a stream object with writeSSE({ id, data }). The events come from node-redis xRead. One rule matters: a blocking xRead ties up its connection, so you must run it off your shared client. node-redis documents createClientPool() for isolating blocking commands; for a long-lived per-client stream the simplest correct choice is a dedicated connection via redis.duplicate() that you open on connect and close on disconnect — never the shared one your REST handlers use, or the whole app stalls behind the blocked read. Each connected client gets its own reader, loops awaiting the next event, advances its last id, and writes a frame. The id you start from is the Last-Event-ID header (replay) or $ (only-new).

SSE fan-out (Hono streamSSE + node-redis)
Run these in your terminal / editor
import { streamSSE } from "hono/streaming";

app.get("/stream", (c) => {
  const stream = "warehouse:1";
  let lastID = c.req.header("Last-Event-ID") ?? "$"; // "$" = only new events

  return streamSSE(c, async (sse) => {
    const reader = redis.duplicate(); // blocking reads need their own connection
    await reader.connect();
    try {
      while (!sse.closed && !sse.aborted) {
        // Bounded block, NOT BLOCK: 0 — node-redis can fail to resolve a BLOCK 0 xRead on a
        // duplicated connection (#2258), and a parked BLOCK 0 never re-checks sse.closed, leaking
        // the reader when the tab closes. A 15s block returns null on timeout so we re-check below.
        const res = await reader.xRead(
          { key: stream, id: lastID },
          { BLOCK: 15000, COUNT: 500 },
        );
        if (!res) {
          await sse.write(":keepalive\n\n"); // comment line; EventSource ignores it, proxies stay alive
          continue;                          // loop guard re-checks sse.closed/aborted before re-blocking
        }
        for (const msg of res[0].messages) {
          lastID = msg.id;
          await sse.writeSSE({ id: msg.id, data: JSON.stringify(msg.message) });
        }
      }
    } finally {
      await reader.quit(); // free the dedicated connection once the client is gone
    }
  });
});
Agent prompt — paste into an agent with repo access

Before you write it: a learner closes the browser tab during an idle connection. With a bounded 15s BLOCK, what is the longest the dedicated reader can stay alive before the loop notices and quits it — and what would BLOCK: 0 do instead?

For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Senior TypeScript engineer in this repo.
Context: Hono app; node-redis client at redis (already connected); Stream key "warehouse:1"; streamSSE from "hono/streaming".
Task: Add GET /stream that fans out the Redis Stream to the client over SSE using a per-client async loop.
Requirements:
- Use streamSSE(c, async (sse) => { ... }); start id from the Last-Event-ID header, default "$".
- Run the blocking xRead on a DEDICATED connection, not the shared client: open redis.duplicate() (node-redis also documents createClientPool() for this), await connect(), and reader.quit() in finally.
- Loop while (!sse.closed && !sse.aborted) awaiting reader.xRead({ key: stream, id: lastID }, { BLOCK: 15000, COUNT: 500 }). Do NOT use BLOCK: 0 — node-redis can fail to resolve a BLOCK 0 xRead on a duplicated connection (#2258), and a parked BLOCK 0 never re-checks sse.closed, leaking the reader when the tab closes.
- On a timeout (xRead returns null) await sse.write(":keepalive\n\n") then continue, so the loop guard re-checks sse.closed/aborted and a closed tab is released within ~15s. For each message writeSSE({ id, data }) and advance lastID.
Tests / acceptance:
- curl -N localhost:8080/stream stays open; a price change appears as an SSE frame within ~100ms (parity with the Go server); an idle connection still receives a ":keepalive" comment about every 15s.
- Reconnecting with -H "Last-Event-ID: <id>" replays only events after that id.
- Closing the client frees the duplicated connection within ~15s (the finally block runs reader.quit()).
Output: a unified diff plus a one-paragraph note on why a bounded block beats BLOCK 0 for connection cleanup, and why a blocking xRead needs its own connection.
What success looks like

Contract-identical wire output to the Go server: the same id: / data: frames, the same four string-valued fields. curl -N localhost:8080/stream delivers a data: frame within ~100ms of a change and the same :keepalive comment about every 15s on an idle connection — the data: fields are strings on both backends, since each forwards Redis’s string-valued msg.message verbatim. Key order is not part of the contract — Go’s json.Marshal over a map sorts keys alphabetically while node-redis preserves insertion order, so the same frame can serialise its fields in a different order; clients read by name and coerce with Number(...). The shared SSE contract is exactly why the dashboard never knows which backend it is talking to.

Client offset protocol: every event carries its Stream id

Intermediate

Pin down the one rule that makes replay possible — the unit of progress is the Stream id: the server stamps it on every SSE frame and the client remembers the last id it applied, so a reconnect resumes exactly where it left off.

New in this step
`EventSource`

The browser’s built-in SSE client — it auto-reconnects and remembers the last id: for you, so the live path needs no bespoke bookkeeping.

javascript eventsource api
`Last-Event-ID`

The header EventSource resends on auto-reconnect, carrying the last id it saw, so the server knows where to resume.

sse last-event-id header
opaque, monotonic id

Treat the Stream id as a black-box marker that only ever increases — compare and store it, never compute the next one yourself.

treat id as opaque monotonic
`localStorage`

Browser key-value storage; persist the last id here when you want replay to survive a full page reload, then send it explicitly.

javascript localstorage persist
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
The id is the whole protocol

Reconnect-and-replay works because of one shared rule — the unit of progress is the Stream id. The server already writes it as the SSE id: field; the browser’s EventSource automatically remembers the last id: it saw and, on an auto-reconnect, sends it back as the Last-Event-ID request header. So the client needs no bespoke bookkeeping for the live path — the platform does it. For a catch-up that must survive a full page reload, persist the last id yourself (e.g. localStorage) and send it explicitly. Either way the server’s job is identical: start reading just after the given id. Treat the id as opaque and monotonic; never try to compute the next one yourself.

Reconnect-and-replay: Last-Event-ID drives XREAD from that id

Go Advanced

Make reconnects replay the gap with no second code path — when Last-Event-ID is a real id the same XREAD loop returns the missed events first, so the client catches up before live resumes, and a trimmed id falls back to a fresh snapshot.

New in this step
`replaying N` frame

A one-off event: replaying SSE frame whose data is the count of missed events, guarded on > 0 so a clean reconnect never flashes Replaying 0.

sse custom event frame
`resync` frame

A one-off event: resync frame telling the client to re-GET /products when its gap is no longer in the Stream.

sse resync fallback event
`XINFO STREAM` / `first-entry`

XINFO STREAM warehouse:1 reports stream metadata; its first-entry id is the oldest entry still retained.

redis xinfo stream first-entry docs ↗
trimmed id

A Last-Event-ID older than first-entry — the gap was trimmed away, so it is unrecoverable from the Stream and needs a snapshot.

redis stream trimmed entry replay gap
component-wise id comparison

Split each <ms>-<seq> on - and compare ms then seq as integers — a string compare wrongly orders 5-2 after 5-10.

compare redis stream ids numerically
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Replay is the same loop, started from a different id

You already pass Last-Event-ID into XREAD, and the elegant part is that no second code path is needed. XREAD BLOCK 15000 STREAMS warehouse:1 1718900000000-0 returns every entry after that id immediately (Redis ranges are id-ordered — the bounded block only matters once you have caught up to the live tail), so a client that was offline for ten seconds receives its seven missed events back-to-back, then the loop blocks ~15s at a time for the next live one. Two refinements make it production-honest:

  1. Count the gap. The first XREAD batch after a reconnect is the missed set; emit a one-off event: replaying frame with its length so the dashboard can show Replaying N events. Guard it on a real (non-null) batch so a clean reconnect that lands on an idle timeout never flashes a count.
  2. Handle a trimmed id. If the client’s Last-Event-ID is older than the Stream’s first retained entry (XINFO STREAM warehouse:1 exposes first-entry), the gap is unrecoverable from the Stream — send a one-off event: resync telling the client to re-GET /products for a fresh snapshot, then continue live. Replay restores deltas; the snapshot restores a baseline when the deltas are gone.

The block stays bounded here too (Block: 15 * time.Second, never Block: 0): once the backlog drains, this is the live tail again, so the same redis.Nil-on-timeout → keepalive → context-recheck cleanup applies.

Replay the gap, then go live
Run these in your terminal / editor
// Stream ids are <ms>-<seq> integer pairs — compare component-wise, NOT lexically
// ("5-2" > "5-10" is true as strings but false as ids). idOlder reports a < b.
func idOlder(a, b string) bool {
	ams, aseq := splitID(a)
	bms, bseq := splitID(b)
	if ams != bms {
		return ams < bms
	}
	return aseq < bseq
}

func splitID(id string) (ms, seq int64) {
	if i := strings.IndexByte(id, '-'); i >= 0 {
		ms, _ = strconv.ParseInt(id[:i], 10, 64)
		seq, _ = strconv.ParseInt(id[i+1:], 10, 64)
	}
	return
}

// Extend the fan-out loop: check for a trimmed id, then the first real batch is the missed set.
if lastID != "$" {
	info, err := rdb.XInfoStream(r.Context(), stream).Result()
	// resync when the oldest retained entry is NEWER than the client's last id (the gap was trimmed).
	if err == nil && idOlder(lastID, info.FirstEntry.ID) {
		fmt.Fprint(w, "event: resync\ndata: 1\n\n")
		flusher.Flush()
		lastID = "$" // skip the unrecoverable gap and jump to the live tail
	}
}

first := true
for {
	res, err := rdb.XRead(r.Context(), &redis.XReadArgs{
		Streams: []string{stream, lastID}, Block: 15 * time.Second, Count: 500,
	}).Result() // .Result() unwraps the *XStreamSliceCmd into ([]XStream, error)
	if err == redis.Nil { // idle timeout, no entries — keepalive and re-check the tab
		fmt.Fprint(w, ":keepalive\n\n")
		flusher.Flush()
		if r.Context().Err() != nil { return }
		continue
	}
	if err != nil { return }
	msgs := res[0].Messages
	if first && lastID != "$" && len(msgs) > 0 {
		fmt.Fprintf(w, "event: replaying\ndata: %d\n\n", len(msgs)) // -> pill: "Replaying N" (guarded >0)
		first = false
	}
	for _, msg := range msgs {
		lastID = msg.ID
		payload, _ := json.Marshal(msg.Values)
		fmt.Fprintf(w, "id: %s\ndata: %s\n\n", msg.ID, payload)
	}
	flusher.Flush()
}
Agent prompt — paste into an agent with repo access

A client reconnects with a Last-Event-ID that has already been trimmed from the Stream. What single frame does the server send before any live events resume, and what does the client do in response?

For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Senior Go engineer in this repo.
Context: GET /stream already tails the Stream with a bounded XREAD (Block: 15 * time.Second) from a start id, keepaliving on each redis.Nil timeout; XINFO STREAM exposes the first retained entry id.
Task: Extend /stream so a reconnect with Last-Event-ID replays exactly the missed events, with a status signal and a trimmed-id fallback.
Requirements:
- When lastID is a real id, the first non-empty XREAD batch is the missed events; emit one "event: replaying" frame with that count, guarded on len(msgs) > 0 so a clean reconnect never flashes "Replaying 0".
- Keep the block bounded (Block: 15 * time.Second, never Block: 0); the redis.Nil timeout path still writes ":keepalive\n\n" and returns when r.Context() is done.
- Advance lastID per message so live events resume seamlessly after the backlog drains.
- If lastID is older than XINFO STREAM's first-entry id, emit "event: resync\ndata: 1\n\n" (the client re-fetches GET /products) instead of a partial replay, and reset lastID to "$" to resume the live tail. Compare the two ids COMPONENT-WISE, never lexically: split each "<ms>-<seq>" on "-", compare ms as an int64, then seq as an int64 — a string compare wrongly orders "5-2" after "5-10" and falsely resyncs once ≥10 events share one millisecond.
Tests / acceptance:
- Connect, note the last id, disconnect, make 5 changes, reconnect with Last-Event-ID: the client receives "replaying 5", then exactly those 5 events, ending identical to a fresh snapshot.
- A Last-Event-ID older than the trim window yields a resync event with data: 1, not missing data.
Output: a unified diff plus a one-paragraph note on how the snapshot + Stream together guarantee no gaps.
What success looks like

Reconnect with a real Last-Event-ID and the first batch is the gap: one event: replaying frame whose data is the missed count (guarded > 0, so a clean reconnect never flashes Replaying 0), then exactly those missed ids in order, then live frames resume. Reconnect with an id older than the Stream’s first retained entry and you instead get one event: resync frame, telling the client to re-GET /products — no partial replay:

# reconnect after missing 5 events:
event: replaying
data: 5

id: 1718900001000-0
data: {"product_id":"1",...}
...
# reconnect with a trimmed id:
event: resync
data: 1

Reconnect-and-replay with Last-Event-ID (parity)

TypeScript Advanced

Mirror the replay logic in TypeScript so both servers behave identically — the first xRead batch after a reconnect is the missed set: emit replaying N, resume live, and fall back to resync on a trimmed id.

New in this step
`replaying N` frame

A one-off { event: replaying, data: String(count) } SSE frame, guarded on length > 0 so a clean reconnect never flashes Replaying 0.

sse custom event frame
`resync` frame

A one-off { event: resync, data: "1" } frame telling the client to re-GET /products when its gap is no longer retained.

sse resync fallback event
`xInfoStream` + `first-entry` key

node-redis v5 returns the oldest retained id under the hyphenated key info["first-entry"].id, not info.firstEntry (which is undefined and would dead-end the resync).

node-redis xinfostream first-entry docs ↗
trimmed id

A Last-Event-ID older than the oldest retained entry — the gap is gone from the Stream, so the client needs a fresh snapshot.

redis stream trimmed entry replay gap
component-wise id comparison

Split each <ms>-<seq> on - and compare ms then seq as numbers — a string compare wrongly orders 5-2 after 5-10.

compare redis stream ids numerically
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Same protocol, async-iterator shape

The TypeScript path is the Go path in different syntax. The dedicated reader’s first non-empty xRead from a real Last-Event-ID returns the missed batch; you writeSSE({ event: "replaying", data: String(batch.length) }), stream the frames, advance lastID, and the loop then blocks ~15s at a time for live events. The trimmed-id case uses node-redis xInfoStream(stream) to read the oldest retained id — which node-redis v5 returns under the hyphenated key info["first-entry"].id, not info.firstEntry (that property is undefined, which would silently disable the whole resync branch). If the client’s id is older, write a resync event and let it re-GET /products. Compare the two ids component-wise on <ms>-<seq> (split on -, the ms as a number, then the seq as a number) — a string compare wrongly orders 5-2 after 5-10 and falsely resyncs once ten events share one millisecond. Keeping the two servers behaviourally identical is the point — a learner can swap backends and the dashboard cannot tell, including the same bounded block (BLOCK: 15000, never BLOCK: 0), the null-on-timeout keepalive, and the sse.closed/sse.aborted re-check that frees the duplicated connection when the tab closes.

Replay then live (node-redis)
Run these in your terminal / editor
// Stream ids are <ms>-<seq> integer pairs — compare component-wise, NOT lexically
// ("5-2" > "5-10" is true as strings but false as ids). idOlder(a, b) reports a < b.
function idOlder(a: string, b: string): boolean {
  const [ams, aseq] = a.split("-").map(Number);
  const [bms, bseq] = b.split("-").map(Number);
  return ams !== bms ? ams < bms : aseq < bseq;
}

if (lastID !== "$") {
  const info = await reader.xInfoStream(stream).catch(() => null);
  // node-redis v5 returns the oldest entry under the HYPHENATED key "first-entry",
  // not info.firstEntry (that is undefined, which would silently kill the resync branch).
  const firstEntry = info?.["first-entry"];
  if (firstEntry && idOlder(lastID, firstEntry.id)) {
    await sse.writeSSE({ event: "resync", data: "1" });
    lastID = "$"; // skip the unrecoverable gap and jump to the live tail
  }
}

let first = true;
while (!sse.closed && !sse.aborted) {
  const res = await reader.xRead(
    { key: stream, id: lastID },
    { BLOCK: 15000, COUNT: 500 }, // bounded, never BLOCK: 0 — see the fan-out step
  );
  if (!res) {
    await sse.write(":keepalive\n\n"); // idle timeout: keepalive, then the guard re-checks closed/aborted
    continue;
  }
  const msgs = res[0].messages;
  if (first && lastID !== "$" && msgs.length > 0) {
    await sse.writeSSE({ event: "replaying", data: String(msgs.length) }); // guarded >0
    first = false;
  }
  for (const msg of msgs) {
    lastID = msg.id;
    await sse.writeSSE({ id: msg.id, data: JSON.stringify(msg.message) });
  }
}
Agent prompt — paste into an agent with repo access

A client reconnects with a Last-Event-ID for 5 missed events. In what order do the frames arrive — replaying, the 5 data frames, then live — and which value would a lexical (string) id-compare get wrong once ten of those events share one millisecond?

For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Senior TypeScript engineer in this repo.
Context: GET /stream tails the Stream on a dedicated node-redis connection with a bounded xRead ({ BLOCK: 15000 }), keepaliving on each null timeout and re-checking sse.closed/sse.aborted; node-redis v5 xInfoStream returns the oldest retained id under the HYPHENATED key info["first-entry"].id (NOT info.firstEntry, which is undefined).
Task: Extend /stream so a reconnect with Last-Event-ID replays exactly the missed events, matching the Go server.
Requirements:
- The first non-empty xRead batch from a real lastID is the missed events; writeSSE an { event: "replaying", data: String(count) } before the data frames, guarded on msgs.length > 0 so a clean reconnect never flashes "Replaying 0".
- Keep the block bounded ({ BLOCK: 15000 }, never BLOCK: 0); on the null timeout await sse.write(":keepalive\n\n") and continue so the loop guard re-checks sse.closed/sse.aborted.
- Advance lastID per message; resume the live bounded-BLOCK loop afterwards.
- If lastID is older than the oldest retained id, writeSSE an { event: "resync", data: "1" } so the client re-fetches GET /products, and reset lastID to "$" to resume the live tail. Read that id as info["first-entry"].id (node-redis v5's hyphenated key — info.firstEntry is undefined and would dead-end the resync). Compare COMPONENT-WISE, never lexically: split each "<ms>-<seq>" on "-", compare ms as a number, then seq as a number — a string compare wrongly orders "5-2" after "5-10" and falsely resyncs once ≥10 events share one millisecond.
Tests / acceptance:
- Disconnect, make 5 changes, reconnect with Last-Event-ID: receive "replaying 5", then those 5 events; final state equals a fresh snapshot (parity with Go).
- An id older than the trim window yields a resync event with data: 1.
Output: a unified diff plus a one-paragraph note on keeping Go and TS replay behaviour identical.
What success looks like

Identical replay frames to the Go server. A reconnect after K missed events sends event: replaying with data: K, then those K ids in order; a trimmed id sends event: resync with data: 1. Because the trimmed check compares the ids component-wise on <ms>-<seq> (and reads the oldest entry via the hyphenated first-entry key), it does not falsely resync once ten events share one millisecond — the foot-gun a lexical compare would hit.

Make replay idempotent: apply by (id, version)

Intermediate

Funnel every event — live, replayed, or duplicated — through one reducer keyed by product id and version, accepting only strictly-newer versions, so at-least-once delivery is safe without the cost of exactly-once.

New in this step
idempotency

Applying the same event twice has the same effect as once — the property that makes a re-delivered or duplicated frame harmless.

idempotent operation meaning
at-least-once vs exactly-once

Replay may re-deliver events (at-least-once); guaranteeing exactly-once is expensive, so you make apply idempotent instead.

at-least-once vs exactly-once delivery
reducer keyed by `(id, version)`

One function holding the current version per product that accepts an event only if its version is strictly newer — collapsing duplicates and out-of-order arrivals.

idempotent reducer version dedupe
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Idempotency is what makes at-least-once safe

Replay and live delivery can overlap — a reconnect might re-deliver an event you already applied, or two servers might both forward the same change. That’s fine if applying an event is idempotent. The rule: hold the current version per product, and when an event arrives apply it only if it is newer than the version you already show; otherwise drop it. Because version is monotonic per row, this collapses duplicates, tolerates out-of-order arrivals during the replay-to-live handoff, and means you never have to guarantee exactly-once delivery (which is expensive and, here, unnecessary). The same predicate works for snapshots: a snapshot is just a set of (id, version) baselines you apply the same way.

Idempotent apply (the client's one reducer)
Run these in your terminal / editor
// state: Map<productId, {price, stock, version}>
function applyEvent(state, e) {
  const cur = state.get(e.product_id);
  if (cur && e.version <= cur.version) return false; // stale or duplicate — ignore
  state.set(e.product_id, { price: e.price, stock: e.stock, version: e.version });
  return true; // changed — re-render this row
}
What success looks like

The reducer is a clean dedupe. Feed it the same product at version 1 then 2: the first returns true (row updates), the second returns true (newer wins). Re-feed version 2 (a duplicate id from an overlapping replay) and it returns false — state unchanged, no re-render. Feed version 1 after 2 (out-of-order) and it also returns false. That is what makes at-least-once replay safe without exactly-once delivery.

Build the live dashboard (single index.html)

Web dashboard Beginner

Make the payoff something you can see — one zero-build public/index.html that fetches the snapshot, opens an EventSource to /stream, and renders each product with a slider, a Sell button, and a Sold-Out badge that ticks live across windows.

New in this step
`fetch`

The browser’s built-in HTTP call — used for the initial GET /products snapshot and for the PUT/POST writes.

javascript fetch api
`EventSource.onmessage`

Fires for each default data: frame; you JSON.parse(ev.data) and feed it to the same idempotent reducer.

eventsource onmessage handler
zero-build static file

One self-contained index.html the backend serves at / — no framework, no bundler, nothing to install.

single html file no build step
coercing with `Number()`

Stream data: fields arrive as strings, so wrap price/stock/version in Number(...) before comparing or rendering.

javascript coerce string to number
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Zero-install on purpose

The payoff has to be something you see, so the dashboard is one static file the backend already serves at /. It uses two browser built-ins: fetch for the snapshot and writes, and EventSource for the live feed (it handles reconnect and Last-Event-ID for you). The render is a tiny reducer over the applyEvent rule from the last step. Open the file in two browser windows: drag the price slider in window A and window B’s number ticks within ~100ms; click Sell 1 and stock drops in both; when stock hits 0 the Sold-Out badge flips and a second window trying to sell the last unit gets a live out of stock toast — that’s the atomic guard, visible.

public/index.html
Run these in your terminal / editor
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>Ticker — live control room</title>
<style>
  body { font: 15px system-ui, sans-serif; margin: 24px; background: #0b0f17; color: #e8edf5; }
  .row { display: flex; align-items: center; gap: 16px; padding: 12px; border: 1px solid #243049; border-radius: 10px; margin: 8px 0; }
  .name { font-weight: 700; min-width: 130px; }
  .price { font-variant-numeric: tabular-nums; min-width: 84px; transition: color .1s; }
  .flash { color: #f5b14c; }
  .sold-out { color: #ff6b6b; font-weight: 700; }
  button { font: inherit; cursor: pointer; border-radius: 8px; border: 1px solid #355; background: #16203a; color: inherit; padding: 6px 12px; }
  #toast { position: fixed; bottom: 20px; right: 20px; background: #3a1620; border: 1px solid #a44; padding: 10px 14px; border-radius: 8px; opacity: 0; transition: opacity .2s; }
  #toast.show { opacity: 1; }
</style>
</head>
<body>
<h1>Ticker <span id="pill">connecting…</span></h1>
<div id="list"></div>
<div id="toast"></div>
<script>
const API = location.origin;            // the backend serves this file, so same origin
const state = new Map();                // id -> { id, name, price, stock, version }

// Idempotent: only a strictly-newer version wins. Stream fields arrive as strings, so coerce.
function applyEvent(raw) {
  const id = Number(raw.product_id ?? raw.id);
  const version = Number(raw.version);
  const cur = state.get(id);
  if (cur && version <= cur.version) return;
  state.set(id, { id, name: raw.name ?? cur?.name ?? ("Product " + id), price: Number(raw.price), stock: Number(raw.stock), version });
  renderRow(id);
}

const fmt = (cents) => "$" + (cents / 100).toFixed(2);

function renderRow(id) {
  const p = state.get(id);
  let el = document.getElementById("p" + id);
  if (!el) {
    el = document.createElement("div");
    el.className = "row";
    el.id = "p" + id;
    el.innerHTML =
      '<span class="name"></span>' +
      '<input type="range" min="0" max="10000" step="1" />' +
      '<span class="price"></span><span class="stock"></span>' +
      '<button>Sell 1</button>';
    el.querySelector("input").addEventListener("change", (ev) => setPrice(id, Number(ev.target.value)));
    el.querySelector("button").addEventListener("click", () => sell(id));
    document.getElementById("list").appendChild(el);
  }
  el.querySelector(".name").textContent = p.name;
  el.querySelector("input").value = p.price;
  const price = el.querySelector(".price");
  price.textContent = fmt(p.price);
  price.classList.remove("flash"); void price.offsetWidth; price.classList.add("flash");
  el.querySelector(".stock").innerHTML =
    p.stock > 0 ? (p.stock + " in stock") : '<span class="sold-out">SOLD OUT</span>';
}

async function setPrice(id, price) {
  const p = state.get(id);
  const r = await fetch(API + "/products/" + id + "/price", {
    method: "PUT", headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ price, version: p.version }),
  });
  applyEvent(await r.json());            // 200 confirms, 409 rebases to the current row
}

async function sell(id) {
  const r = await fetch(API + "/products/" + id + "/sell", {
    method: "POST", headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ qty: 1 }),
  });
  const body = await r.json();
  if (r.status === 409) { toast("Out of stock"); return; }
  applyEvent(body);
}

function toast(msg) {
  const t = document.getElementById("toast");
  t.textContent = msg; t.classList.add("show");
  setTimeout(() => t.classList.remove("show"), 2000);
}

async function start() {
  const snapshot = await (await fetch(API + "/products")).json();
  for (const p of snapshot) applyEvent(p);
  const es = new EventSource(API + "/stream");
  es.onmessage = (ev) => applyEvent(JSON.parse(ev.data));
  window._es = es;                       // the next step adds the status-pill listeners
}
start();
</script>
</body>
</html>
Agent prompt — paste into an agent with repo access
For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Front-end engineer in this repo (vanilla JS, no framework, no build step).
Context: The backend serves this file at / and exposes GET /products (snapshot), PUT /products/:id/price {price,version}, POST /products/:id/sell {qty}, and GET /stream (SSE). Stream event fields arrive as strings.
Task: Write public/index.html — a single self-contained file that renders each product with a price slider, a Sell button, a Sold-Out badge, and live updates from /stream.
Requirements:
- One idempotent applyEvent(raw) keyed by product id that ignores any event whose version is not newer; coerce string fields to numbers.
- fetch the snapshot first, then open new EventSource("/stream") and apply each message; format cents as currency for display only.
- A slider change PUTs {price, version}; Sell POSTs {qty:1}; a 409 on sell shows an "Out of stock" toast; a 409 on price rebases to the returned current value.
Tests / acceptance:
- Open in two windows: a slider drag in one updates the other within ~100ms; selling the last unit flips both to SOLD OUT and toasts the loser.
Output: the complete public/index.html, no commentary.
What success looks like

Open http://localhost:8080/ in two windows. Dragging the price slider in window A makes window B’s number tick within ~100ms — the change went Postgres → XADD → SSE → both browsers. Clicking Sell 1 drops stock in both; selling the last unit flips both to SOLD OUT, and the second window’s attempt to sell it gets a live Out of stock toast (the 409 from the atomic guard). No polling, no refresh — the board is genuinely live across windows.

The Live / Reconnecting / Replaying status pill + the offline demo

Web dashboard Intermediate

Surface the durable-realtime machinery with a status pill — green Live, amber Reconnecting…, Replaying N while a backlog drains — then break the network on purpose so you can watch a window catch up.

New in this step
`onopen` / `onerror`

EventSource fires onopen when connected and onerror when the connection drops (then it auto-reconnects) — your Live and Reconnecting cues.

eventsource onopen onerror
`addEventListener` for custom events

Named server frames (event: replaying, event: resync) arrive via addEventListener("replaying", …), not onmessage.

eventsource addeventlistener named event
DevTools offline throttle

The browser’s network panel can force a tab Offline, so you can simulate a dropped connection and watch the reconnect-replay.

chrome devtools offline network
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Make the machinery visible, then break the network on purpose

The whole point of durable realtime is invisible when everything is online — so surface it. EventSource fires onopen when connected and onerror when the connection drops (it then auto-reconnects, re-sending Last-Event-ID); listen to those for Live and Reconnecting…. Your server emits a custom replaying event with a count at the start of a reconnect, so addEventListener("replaying", …) shows Replaying N events while the gap drains, and the next live message returns you to Live. A resync event tells the dashboard to re-GET /products.

Now the money demo: open two windows, in DevTools set window B’s network to Offline, change prices and sell in window A for ten seconds, then bring B back Online — the pill reads Reconnecting…, then Replaying 7 events, the backlog fast-forwards, and B ends byte-identical to A. No gaps, no full refetch.

The status-pill additions to index.html
Run these in your terminal / editor
// Replace the EventSource setup inside start() with connect(); keep applyEvent/renderRow as-is.
function setPill(text, color) {
  const pill = document.getElementById("pill");
  pill.textContent = text;
  pill.style.color = color;
}

function connect() {
  const es = new EventSource(API + "/stream");
  es.onopen = () => setPill("● Live", "#3FD27A");
  es.onerror = () => setPill("● Reconnecting…", "#f5b14c"); // EventSource auto-retries for us
  es.onmessage = (ev) => applyEvent(JSON.parse(ev.data));
  es.addEventListener("replaying", (ev) => setPill("● Replaying " + ev.data + " events", "#f5b14c"));
  es.addEventListener("resync", async () => {
    const snap = await (await fetch(API + "/products")).json();
    for (const p of snap) applyEvent(p);
  });
  return es;
}
Agent prompt — paste into an agent with repo access
For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Front-end engineer in this repo (vanilla JS).
Context: public/index.html already renders products and applies SSE events. The server sends data frames, a custom "replaying" event (data = count) on reconnect, and a "resync" event when the client's id was trimmed.
Task: Add a status pill reflecting Live / Reconnecting… / Replaying N, and handle resync.
Requirements:
- EventSource onopen -> "Live"; onerror -> "Reconnecting…" (do NOT manually reconnect — the browser does, re-sending Last-Event-ID).
- addEventListener("replaying", e => show "Replaying {e.data} events"); addEventListener("resync", refetch GET /products).
- Keep applyEvent idempotent so replayed events that overlap live ones are ignored.
Tests / acceptance:
- DevTools offline demo: take the tab offline, make 7 changes elsewhere, go online -> the pill shows Reconnecting…, then Replaying 7 events, and the tab ends identical to a freshly loaded one.
Output: the updated index.html (or a unified diff), no commentary.
What success looks like

The machinery becomes visible. On connect the pill goes green ● Live; pull window B offline in DevTools and it flips amber ● Reconnecting…. Make 7 changes in window A, then bring B back online: B’s EventSource auto-reconnects with its Last-Event-ID, the server replays the gap, the pill briefly reads ● Replaying 7 events, the rows fast-forward, and the next live frame returns it to ● Live — B is now byte-identical to A, with no full refetch.

Multi-instance fan-out: two servers, one Stream

Advanced

Prove the design scales by running a second instance on another port — because both tail the same Redis Stream, a write through A surfaces on a dashboard connected to B, so fan-out grows horizontally without one line of fan-out code changing.

New in this step
horizontal scaling

Adding more identical instances to handle load, rather than making one bigger — only possible if instances need no shared in-memory state.

horizontal scaling stateless server
in-process fan-out

Pushing changes through a Go channel or a Node EventEmitter reaches only clients on that process — it breaks the moment you run two.

in-memory pub sub eventemitter
sticky sessions

Pinning a client to one instance; not needed here because every instance sees every event, so any instance can serve any dashboard.

sticky sessions load balancer
the Stream as fan-out point

Every instance runs the same XREAD against the same Stream, so Redis — not any process — is the coordination point.

redis stream shared fan out
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
The Stream is the fan-out point, not the process

In-process fan-out (a Go channel, a Node EventEmitter) only reaches clients connected to that process, so it breaks the moment you run two instances behind a load balancer. Redis Streams sidestep this entirely: every instance runs the same XREAD BLOCK against the same Stream, so all of them see every event and forward it to their own SSE subscribers. There is no leader, no sticky sessions, no inter-node gossip — the realtime store is the coordination point. Write through instance A (:8080), connect a dashboard to instance B (:8081), and the tick still arrives. That is the property that lets this design scale from one box to many without changing a line of fan-out code.

Two instances, one Stream
Run these in your terminal / editor
# Two instances of the SAME server, same DATABASE_URL + REDIS_URL, different ports.
PORT=8080 ./ticker &   # instance A   (Go: PORT=8080 go run ./cmd/server)
PORT=8081 ./ticker &   # instance B

curl -N localhost:8081/stream &                                    # subscribe on B
curl -X POST localhost:8080/products/1/sell -d '{"qty":1}'         # write on A
# -> the sale event prints on B's stream within ~100ms, proving cross-instance delivery
What success looks like

The write hits instance A (:8080); the SSE frame surfaces on the subscriber connected to instance B (:8081) within ~100ms — because both tail the same warehouse:1 Stream with plain XREAD, every instance sees every event. No sticky sessions, no leader, no inter-node messaging: the Stream is the fan-out point, so adding instances does not change a line of fan-out code:

# on B's curl -N localhost:8081/stream :
id: 1718900003000-0
data: {"product_id":"1","price":"1499","stock":"49","version":"5"}

Bound the Stream: XADD MAXLEN ~ plus a snapshot

Advanced

Stop the Stream growing forever by capping it with MAXLEN ~ N, and lean on the products snapshot as the floor under that cap — a client whose id was trimmed away recovers from GET /products, so bounded memory never means a lost client.

New in this step
approximate trimming (`MAXLEN ~`)

The ~ lets Redis trim in efficient batches, so the Stream sits a little over the cap — far cheaper than exact trimming.

redis stream maxlen approximate trimming docs ↗
snapshot as baseline floor

The products table holds current state for every product, so a gap the Stream no longer retains is still recoverable from it.

event log snapshot plus deltas
snapshot-first recovery

Always load GET /products first, then tail the Stream; a reconnect replays the retained gap or falls back to a fresh snapshot via resync.

snapshot first then stream tail
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Replay needs a floor and a snapshot

A Stream you never trim grows without bound and eventually costs more RAM than the data is worth. Cap it at append time with approximate trimming — XADD warehouse:1 MAXLEN ~ 10000 * … — where the ~ lets Redis trim in efficient batches (a little over the cap is fine, and much cheaper than exact trimming). Trimming raises a question: what about a client whose Last-Event-ID is older than the oldest retained entry? That gap is unrecoverable from the Stream, which is exactly why Postgres holds the snapshot — current price, stock, and version for every product. The protocol is snapshot-first: a client always loads GET /products (the baseline), then tails the Stream from live; a reconnect replays the gap if it is still retained, or falls back to a fresh snapshot via the resync event if it is not. Snapshot = baseline truth; Stream = recent deltas; together they bound memory and guarantee no gaps.

Bounded append + snapshot-first recovery
Run these in your terminal / editor
# Append with an approximate cap so the Stream self-trims:
XADD warehouse:1 MAXLEN ~ 10000 * product_id 1 price 1399 stock 50 version 7

# Recovery order for any client:
#   1) GET /products         -> snapshot baseline (current version per product)
#   2) open /stream          -> live tail; a reconnect replays the gap from Last-Event-ID
#   3) if the id was trimmed  -> server sends `resync` -> client repeats step 1
Agent prompt — paste into an agent with repo access
For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Senior backend engineer in this repo (use the selected backend).
Context: Every write XADDs a change event; the products table holds current state; clients recover via GET /products then /stream.
Task: Bound the Stream with approximate trimming and confirm the snapshot path recovers a trimmed client.
Requirements:
- Add MAXLEN ~ 10000 to every XADD (go-redis: XAddArgs{MaxLen: 10000, Approx: true}; node-redis: { TRIM: { strategy: "MAXLEN", strategyModifier: "~", threshold: 10000 } }).
- Keep GET /products as the snapshot; ensure a reconnect whose Last-Event-ID is below the first retained id triggers a resync.
Tests / acceptance:
- Drive well over 10000 events; XLEN stays approximately at the cap, not unbounded.
- A client with a trimmed Last-Event-ID receives resync and recovers a correct, complete view via GET /products.
Output: a unified diff plus a one-paragraph note on why approximate (~) trimming is cheaper than exact.
What success looks like

Drive well over 10000 events and XLEN warehouse:1 settles approximately at the cap (a little over is the ~ trade-off — efficient batch trimming, not exact), instead of growing without bound. A client whose Last-Event-ID now falls below XINFO STREAM warehouse:1’s first-entry no longer replays a partial gap: it receives the resync frame and recovers a complete, correct view from GET /products. Snapshot is the floor under a trimmed Stream:

XLEN warehouse:1   -> (integer) 10000   # ~, may read a little above the cap

Consumer groups: the other half of Streams

Advanced

Meet the commands this design deliberately does not use — XGROUP CREATE, XREADGROUP, XACK, XPENDING, XAUTOCLAIM — and learn the judgement: a dashboard mirror wants plain XREAD (every reader sees every event), a worker pool wants a consumer group (each event processed once).

New in this step
consumer group

A named group over a Stream that distributes entries so each is processed once across a pool — the opposite of a mirror.

redis consumer group xgroup docs ↗
`XREADGROUP` (`>`)

Reads on behalf of a group; the > id means messages never yet delivered to this group, so different consumers get different entries.

redis xreadgroup command docs ↗
`XACK`

Confirms a consumer finished an entry, removing it from the group’s pending list — delivery is tracked, not fire-and-forget.

redis xack command docs ↗
Pending Entries List / `XPENDING`

The per-group set of delivered-but-not-yet-acked entries; XPENDING inspects what is still outstanding.

redis xpending pending entries list docs ↗
`XAUTOCLAIM`

Reassigns entries a crashed consumer grabbed but never acked, after a min-idle time — the reclaim half of group delivery.

redis xautoclaim command docs ↗
fan-out read vs group read

Plain XREAD gives every reader the whole feed (a mirror); a group splits the feed across workers (each job once) — Ticker needs the former.

redis xread vs xreadgroup
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Fan-out read vs group read — and why this build picks fan-out

Streams answer two different questions, and Ticker has been using only the first. The fan-out read you built — XREAD against warehouse:1 — gives every reader every entry: run N server instances and each one tails the same Stream independently, so each connected dashboard sees the full ordered feed. That is exactly what a live mirror wants, and it is why the multi-instance step scaled for free.

A consumer group answers the opposite question: distribute the work so each entry is processed once across a pool. XGROUP CREATE warehouse:1 workers $ creates the group; each worker calls XREADGROUP GROUP workers <consumer> COUNT N STREAMS warehouse:1 > and Redis hands different entries to different consumers (the > means “messages never delivered to this group”). Delivery is tracked, not fire-and-forget: an entry stays in the group’s Pending Entries List until the worker confirms it with XACK warehouse:1 workers <id>. XPENDING warehouse:1 workers inspects what is still un-acked, and XAUTOCLAIM warehouse:1 workers <consumer> <min-idle> 0 reassigns entries a crashed worker grabbed but never acked — that ack-and-reclaim machinery is the whole point of a group, and it is pure overhead for a mirror.

So the choice is load-bearing, not accidental. If you put the SSE servers in a consumer group, each change event would be delivered to one server, and only the dashboards connected to that server would tick — the other windows would silently miss it. Plain XREAD is correct here precisely because it does not distribute: every instance must see every event for the fan-out to be complete. Reach for a group when you want competing workers (a re-embed worker, an email sender, an audit sink) to share a backlog so each job runs once; reach for plain XREAD when every reader needs the whole feed. (A consumer-group-worker that processes each change exactly once is a natural future add-on — out of scope here.)

What a group would look like (contrast — not wired into this build)
Run these in your terminal / editor
# A consumer GROUP distributes entries (each processed once) — the opposite of the dashboard mirror.
redis-cli -u "$REDIS_URL" XGROUP CREATE warehouse:1 workers '$'      # create the group at the live tail
redis-cli -u "$REDIS_URL" XREADGROUP GROUP workers w1 COUNT 10 STREAMS warehouse:1 '>'  # w1 gets NEW entries
redis-cli -u "$REDIS_URL" XACK warehouse:1 workers 1718900000000-0   # confirm one entry is done
redis-cli -u "$REDIS_URL" XPENDING warehouse:1 workers               # what is delivered-but-not-acked
redis-cli -u "$REDIS_URL" XAUTOCLAIM warehouse:1 workers w2 60000 0  # w2 reclaims entries idle > 60s

# Ticker's fan-out uses the plain read instead — every instance sees EVERY entry, no group, no ack:
redis-cli -u "$REDIS_URL" XREAD BLOCK 15000 STREAMS warehouse:1 '$'

Integration test: live delivery + exact replay

Go Intermediate

Prove both guarantees in code with a Go integration test against the Compose stack — a change reaches a second subscriber within ~100ms, and a reconnect with Last-Event-ID receives exactly the missed events — because ordering, blocking reads, and replay exist only in a real server.

New in this step
integration test vs unit test

A unit test isolates one function; an integration test runs against real Postgres and Redis to prove behaviour that only emerges across the whole system.

integration test vs unit test docs ↗
why a fake Redis can't prove this

Ordering, blocking XREAD, and replay-from-id are real-server behaviours a mock cannot reproduce — so this test must point at the Compose stack.

why mock redis insufficient streams
`t.Skip`

Skips a test at runtime (here when DATABASE_URL/REDIS_URL are unset) so the suite still passes without infra instead of failing.

go testing t.skip docs ↗
resetting state between runs

Truncating products and clearing the Stream before each run keeps tests independent and repeatable.

reset database stream test isolation
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Test the propagation, not just the write

Unit tests with a fake Redis can’t prove ordering, blocking reads, or replay — those exist only in a real server. Point the test at the Compose stack: start the fan-out handler, connect subscriber S1, make a change, and assert S1 receives it within ~100ms with the right version. Then for replay: connect S2, record its last id, disconnect it, make K changes, reconnect S2 with Last-Event-ID, and assert it receives exactly those K events (the replaying count equals K) and ends with state identical to a fresh GET /products. Skip cleanly if the env vars aren’t set, so the suite still runs without infra.

Agent prompt — paste into an agent with repo access
For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Senior Go engineer in this repo.
Context: The fan-out server, write path, and schema exist; Postgres via DATABASE_URL and Redis via REDIS_URL (the Compose stack).
Task: Add integration tests for live delivery and reconnect-replay.
Requirements:
- Live: subscribe via an SSE client, make a price change, assert the event arrives within 100ms carrying the new version.
- Replay: record the last id, disconnect, make K=5 changes, reconnect with Last-Event-ID, assert exactly 5 events (a "replaying 5" frame) and final state equal to GET /products.
- t.Skip when DATABASE_URL or REDIS_URL is unset; reset the products table and the Stream between runs.
Tests / acceptance:
- `go test ./... -run TestRealtime` passes reliably across 10 runs against the Compose stack.
Output: a unified diff plus how the test isolates Redis and Postgres between runs.
What success looks like

go test ./... -run TestRealtime passes against the Compose stack and proves both guarantees in code: the live subscriber receives a change within ~100ms carrying the new version, and a subscriber that reconnects with Last-Event-ID after 5 changes gets a replaying 5 frame, exactly those 5 events, and a final state equal to GET /products. With env vars unset the test t.Skips instead of failing:

ok  	github.com/you/ticker	0.42s
--- (with no DATABASE_URL/REDIS_URL: SKIP: realtime infra not configured)

Integration test: same assertions in the TS stack

TypeScript Intermediate

Make parity a test, not a promise — assert the same two guarantees in TypeScript against the same Compose stack, so the project can honestly claim the realtime store, not the language, carries the weight.

New in this step
integration test

A test against real Postgres and Redis (not mocks) — the only way to prove ordering, blocking reads, and replay actually hold.

integration test real services
`bun test`

Bun’s built-in test runner (vitest works too); runs the TS assertions against the same Compose stack the Go suite uses.

bun test runner track ↗
parity testing

Asserting both backends produce identical behaviour, so a learner can swap languages and the dashboard cannot tell.

cross-implementation parity test
skip when infra unset

Skip the test when DATABASE_URL/REDIS_URL are absent, so the suite passes without infra rather than failing.

skip test when env missing
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Parity is a test, not a promise

The TS suite asserts the identical behaviour with bun test (or vitest): an SSE client receives a change within ~100ms, and a reconnect with Last-Event-ID replays exactly the missed events, ending byte-identical to GET /products. Running both suites against the same Compose stack is what lets the project claim, honestly, that the realtime store — not the language — carries the weight.

Agent prompt — paste into an agent with repo access
For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Senior TypeScript engineer in this repo.
Context: The Hono fan-out server, write path, and schema exist; Postgres via DATABASE_URL and Redis via REDIS_URL.
Task: Add integration tests (bun test or vitest) for live delivery and reconnect-replay, matching the Go suite one-for-one.
Requirements:
- Live: an SSE/EventSource client receives a price change within 100ms with the new version.
- Replay: record the last id, disconnect, make 5 changes, reconnect with Last-Event-ID, assert exactly 5 events (a "replaying 5" frame) and final state equal to GET /products.
- Skip when DATABASE_URL or REDIS_URL is unset; reset products and the Stream between runs.
Tests / acceptance:
- `bun test` passes reliably; the assertions match the Go suite.
Output: a unified diff plus a note on any timing or flake mitigation you added.
What success looks like

bun test passes the same two assertions one-for-one against the same Compose stack: live delivery within ~100ms with the new version, and a reconnect that replays exactly 5 missed events (a replaying 5 frame) ending equal to GET /products. Both suites green against one Redis + Postgres is what lets the project claim, honestly, that the realtime store — not the language — carries the weight.

Optional: deploy online (Cloud Run + managed or self-hosted Redis)

Intermediate

Get a public URL if you want one by deploying the stateless server to Cloud Run against a real Redis and Postgres — entirely optional, since docker compose up is already the whole demo, and only the three connection strings change.

New in this step
Cloud Run

A serverless container host that scales to zero with a free monthly allotment — a fit for the stateless fan-out server.

google cloud run deploy container
stateless container

The server keeps no in-memory state (the Stream does), so any instance can serve any client and it deploys anywhere unchanged.

stateless service horizontal scale
blocking `XREAD` caveat

Some serverless Redis free tiers support Streams but not blocking XREAD; on those, switch to short non-blocking polling instead.

serverless redis blocking xread support
proxy response buffering

SSE is a long-lived response; a buffering CDN or proxy would withhold frames, so disable buffering on the /stream route.

disable proxy buffering sse
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
Optional, and free if you do it

Nothing about seeing the project work requires the cloud — docker compose up is the whole demo. If you want a public URL, the server is a stateless container, so Cloud Run fits: it scales to zero and has a generous free monthly allotment. Redis Streams need a real Redis, and this design relies on blocking XREAD BLOCK, so self-hosting redis:7 (a tiny VM or container host) keeps it unchanged. One honest constraint to know: some serverless Redis offerings — Upstash’s free tier among them — support Streams (XADD / XRANGE) but not the blocking form of XREAD; on those you’d switch the reader to short-interval non-blocking polling (XREAD with no BLOCK, on a timer) instead. Postgres can be Cloud SQL or any managed Postgres. Only the three connection strings change; the binary and the dashboard are identical to local.

Deploy (optional)
Run these in your terminal / editor
# Build + deploy the server container to Cloud Run (free monthly allotment eligible).
gcloud run deploy ticker \
  --source . \
  --set-env-vars DATABASE_URL="$DATABASE_URL",REDIS_URL="$REDIS_URL" \
  --region us-central1 --allow-unauthenticated
# REDIS_URL points at your own redis:7 (blocking XREAD works) or a managed Redis that supports it.

Summarise the change stream into an anomaly note with Gemini

Optional add-on Intermediate

Turn the change feed you already keep into a read-only advisory — read the last slice with XRANGE, ask Gemini for structured JSON flagging restock needs and price swings, and surface it for a human to act on, never auto-applied.

New in this step
structured JSON output

Asking the model to return typed JSON you can parse directly, instead of free text you must scrape.

llm structured json output
`responseMimeType: application/json`

The Gemini setting that forces the response to be valid JSON rather than prose.

gemini response mime type json
`responseSchema`

A schema the JSON must match (here { headline, restock[], price_anomalies[] }), so the output is typed and predictable.

gemini response schema structured output
read-only advisory endpoint

GET /insights only suggests a restock or price review — it never changes a price or stock; a human decides.

read-only recommendation endpoint
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
AI reads the feed; humans act

The change feed you already maintain is a perfect input for a cheap summary: pull the recent events with XRANGE warehouse:1 - + COUNT 200, and ask Gemini to return structured JSON — products trending toward sold-out, prices that moved more than a threshold, a one-line headline. Use responseMimeType: "application/json" with a responseSchema so the output is typed, not free text you have to parse. Keep the key server-side and expose GET /insights that the dashboard can show as an advisory banner. Crucially this is read-only: it suggests a restock or a price review, it never changes a price or places an order — the platform shows the way, it doesn’t act. Costs nothing: a free Google AI Studio key covers this.

Ask Gemini for a structured anomaly note
Run these in your terminal / editor
System: You are an inventory analyst. Given recent change events, return ONLY JSON.
User: <the last 200 events: product_id, price, stock, version>

responseSchema (conceptual):
{ "type": "object", "properties": {
    "headline": { "type": "string" },
    "restock": { "type": "array", "items": { "type": "object",
        "properties": { "product_id": {"type":"integer"}, "reason": {"type":"string"} } } },
    "price_anomalies": { "type": "array", "items": { "type": "object",
        "properties": { "product_id": {"type":"integer"}, "delta_pct": {"type":"number"} } } } } }
Chat prompt — paste into a chat to get the code
For a plain chat. It returns complete code; you paste it in yourself.
Role: Gemini integration engineer. The reader has no repo here — return complete code.
Context: Server-side handler in the user's selected backend; GEMINI_API_KEY in env; recent events available via XRANGE on the warehouse Stream.
Task: Implement GET /insights that reads the last 200 Stream events and returns Gemini's structured anomaly note { headline, restock[], price_anomalies[] }.
Requirements:
- Use responseMimeType="application/json" plus a responseSchema matching that shape; validate the JSON before returning.
- Read events with XRANGE warehouse:{id} - + COUNT 200; keep GEMINI_API_KEY server-side; time out after 20s.
- Read-only: the endpoint must NOT change any price or stock — it returns advice a human acts on.
- Link to the official structured-output docs rather than hardcoding a model name that may change.
Tests / acceptance (describe):
- A stream where one product hit stock 0 yields it in restock[]; a price jump beyond 25% appears in price_anomalies[].
- Malformed model output is rejected, not returned raw.
Output: the complete handler, no commentary.

A second client: native Jetpack Compose against the same backend

Optional add-on Beginner

Give the live board a native Android face by pointing a Jetpack Compose app at the unchanged backend — same /products snapshot, same /stream feed — to prove the realtime protocol is client-agnostic.

New in this step
Jetpack Compose

Android’s declarative UI toolkit; here it renders a live board that ticks per event, just like the web dashboard.

jetpack compose android ui track ↗ docs ↗
`10.0.2.2` emulator host

The Android emulator’s alias for the host machine’s localhost, so the app reaches the local backend at http://10.0.2.2:8080.

android emulator 10.0.2.2 localhost
client-agnostic protocol

Snapshot-first then an ordered SSE feed works the same for any client — a browser and an Android app reconcile against it identically.

protocol client agnostic
native SSE vs browser `EventSource`

A native SSE client does not auto-reconnect or auto-send Last-Event-ID the way the browser’s EventSource does — so Android owns that itself.

android sse no auto reconnect
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
One backend, two clients — and why a native one earns its keep

Nothing about the server changes here. The Android app points at the identical /products and /stream endpoints the bundled public/index.html already calls; run the backend with docker compose up and the emulator reaches it at http://10.0.2.2:8080 (the Android emulator’s alias for your host’s localhost). The lesson is that the realtime protocol — snapshot-first, then an ordered SSE feed whose every frame carries a Stream id — is client-agnostic: a browser and an Android app reconcile against the same feed the same way. The one honest difference, which the next steps lean on, is that the browser’s EventSource auto-reconnects and resends Last-Event-ID for you, whereas a native SSE client does not — so on Android you own the reconnect-and-replay discipline. Costs nothing: the backend is the same local Docker stack, and the Android emulator ships free with Android Studio — no device, no account, no paid service.

To learn the UI toolkit itself, see the Jetpack Compose track; for the language, Kotlin.

Consume the SSE stream with OkHttp's EventSource

Optional add-on Intermediate

Open GET /stream from Android with the okhttp-sse artifact, parsing each frame’s id, type, and data exactly as the web client does — and notice this client gives you no auto-reconnect, which is precisely why this module exists.

New in this step
`okhttp-sse` artifact

The SSE companion to core OkHttp (com.squareup.okhttp3:okhttp-sse) that does the SSE framing for you.

okhttp-sse eventsource
`EventSources.createFactory` / `newEventSource`

Build a factory from your OkHttp client, then factory.newEventSource(request, listener) opens the stream.

okhttp eventsources createfactory
`EventSourceListener.onEvent`

The callback handing you each frame’s id (Stream id), type (event name), and data (the JSON to parse).

okhttp eventsourcelistener onevent
no auto-reconnect / auto `Last-Event-ID`

Unlike the browser’s EventSource, OkHttp’s only does request()/cancel() — you own reconnect and resending the last id.

okhttp sse manual reconnect
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
okhttp-sse gives you id + type + data per event — and nothing more

OkHttp ships SSE in a companion artifact, com.squareup.okhttp3:okhttp-sse, beside the core okhttp. You build an EventSource.Factory from your client with EventSources.createFactory(client), then factory.newEventSource(request, listener). The listener’s key callback is onEvent(eventSource, id: String?, type: String?, data: String) — OkHttp has already done the SSE framing for you, so id is the Stream id the server stamped, type is the event name (message by default, or the server’s custom replaying / resync), and data is the JSON body you parse into a change event. onOpen, onClosed, and onFailure(eventSource, t, response) round out the lifecycle.

One accuracy point that is the whole feature: unlike the browser’s EventSource, OkHttp’s is a low-level client — its interface is just request() and cancel(), with no automatic reconnect and no automatic Last-Event-ID. That is not a gap to apologise for; it is exactly why this module exists. You reconnect and resend the last id yourself (next step). If you would rather not hand-roll the retry loop, the documented higher-level LaunchDarkly okhttp-eventsource library wraps OkHttp and manages retry + Last-Event-ID for you; this module uses the plain okhttp-sse artifact so the discipline stays visible. Official artifact: https://square.github.io/okhttp/5.x/okhttp-sse/okhttp3.sse/

Add the dependencies
Run these in your terminal / editor
// app/build.gradle.kts
dependencies {
    implementation("com.squareup.okhttp3:okhttp")          // take the latest from Maven Central
    implementation("com.squareup.okhttp3:okhttp-sse")      // SSE companion: EventSource + EventSourceListener
    implementation("androidx.lifecycle:lifecycle-runtime-compose") // collectAsStateWithLifecycle
}
Open the stream and parse each event
Run these in your terminal / editor
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener
import okhttp3.sse.EventSources
import org.json.JSONObject

// A self-contained change event. Stream fields arrive as JSON strings, so coerce to numbers.
data class Tick(val productId: Long, val price: Long, val stock: Int, val version: Int)

fun parseTick(data: String): Tick {
    val j = JSONObject(data)
    // server sends product_id on stream frames, id on the snapshot — accept either
    val pid = if (j.has("product_id")) j.getString("product_id").toLong() else j.getLong("id")
    return Tick(pid, j.getString("price").toLong(), j.getString("stock").toInt(), j.getString("version").toInt())
}

fun openStream(client: OkHttpClient, baseUrl: String, lastId: String?, listener: EventSourceListener): EventSource {
    val builder = Request.Builder().url("$baseUrl/stream")
    if (lastId != null) builder.header("Last-Event-ID", lastId) // we resend it ourselves — OkHttp won't
    return EventSources.createFactory(client).newEventSource(builder.build(), listener)
}

// In the listener, onEvent hands you id, type, and the JSON payload directly:
val listener = object : EventSourceListener() {
    override fun onOpen(eventSource: EventSource, response: Response) { /* status: Live */ }
    override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) {
        when (type) {
            "replaying" -> { /* data = count; status: Replaying N */ }
            "resync"    -> { /* re-fetch GET /products, then keep streaming */ }
            else        -> { /* a data frame: applyTick(id, parseTick(data)) */ }
        }
    }
    override fun onClosed(eventSource: EventSource) { /* reconnect from lastId */ }
    override fun onFailure(eventSource: EventSource, t: Throwable?, response: Response?) { /* status: Reconnecting */ }
}

Hold the board in Compose state and render a live list

Optional add-on Intermediate

Make the board live by holding it in a ViewModel as a StateFlow<BoardUiState> and collecting it with collectAsStateWithLifecycle() — each event produces a new immutable state, so Compose recomposes only the changed rows and the same status pill the web client shows.

New in this step
`ViewModel`

The lifecycle-scoped holder of the board’s truth; the SSE listener calls into it rather than touching the UI directly.

android viewmodel track ↗
`StateFlow`

An observable value holder updated with update { … }; each new immutable snapshot is what the UI collects.

kotlin stateflow docs ↗
`collectAsStateWithLifecycle`

Collects a flow only while the screen is active, so a backgrounded screen doesn’t leak the stream.

collectasstatewithlifecycle compose docs ↗
`LazyColumn`

A scrolling list that renders only visible rows; keyed by product id so Compose updates just the row that changed.

jetpack compose lazycolumn docs ↗
recomposition

Compose re-runs only the composables whose inputs changed, so a new state snapshot ticks one row, not the whole board.

jetpack compose recomposition
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
StateFlow in, collectAsStateWithLifecycle out — the board ticks per event

The ViewModel owns the truth: a StateFlow<BoardUiState> holding a Map<Long, Row> keyed by product id plus a status enum (Live, Reconnecting, Replaying). The SSE listener doesn’t touch the UI — it calls into the ViewModel, which updates the flow with update { … }. The Compose screen collects it with collectAsStateWithLifecycle() (from androidx.lifecycle:lifecycle-runtime-compose), which is lifecycle-aware: it stops collecting when the app is in the background and resumes on return, so you don’t leak the stream behind a backgrounded screen. Render the map as a LazyColumn; because the state is a new immutable snapshot per event, Compose recomposes only the rows that changed and the board ticks live. The status pill reads the same status field the web dashboard shows — green Live on open, amber Reconnecting… on failure, Replaying N while a backlog drains — so the two clients are visibly the same machine with different skins.

UI state, ViewModel flow, and the Compose screen
Run these in your terminal / editor
import androidx.lifecycle.ViewModel
import androidx.lifecycle.compose.collectAsStateWithLifecycle
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.update

enum class Status { Live, Reconnecting, Replaying }
data class Row(val id: Long, val name: String, val price: Long, val stock: Int, val version: Int)
data class BoardUiState(val rows: Map<Long, Row> = emptyMap(), val status: Status = Status.Reconnecting, val replayCount: Int = 0)

class BoardViewModel : ViewModel() {
    private val _state = MutableStateFlow(BoardUiState())
    val state = _state.asStateFlow()

    fun setStatus(s: Status, replayCount: Int = 0) = _state.update { it.copy(status = s, replayCount = replayCount) }
    // applyTick lives in the next step (idempotent by version); it calls _state.update { ... } too.
}

@Composable
fun BoardScreen(vm: BoardViewModel) {
    val ui by vm.state.collectAsStateWithLifecycle()      // lifecycle-aware collection
    Column {
        Text(when (ui.status) {                            // the same pill the web client shows
            Status.Live -> "● Live"
            Status.Reconnecting -> "● Reconnecting…"
            Status.Replaying -> "● Replaying ${ui.replayCount} events"
        })
        LazyColumn {
            items(ui.rows.values.toList(), key = { it.id }) { r ->
                Row(Modifier.fillMaxWidth().padding(12.dp), Arrangement.SpaceBetween) {
                    Text(r.name)
                    Text("$" + "%.2f".format(r.price / 100.0))   // format cents at the edge only
                    Text(if (r.stock > 0) "${r.stock} in stock" else "SOLD OUT")
                }
            }
        }
    }
}

Reconnect-and-replay: persist the last id, resend Last-Event-ID, apply idempotently

Optional add-on Advanced

Own on Android what the browser gets for free — remember the last applied Stream id, resend it as Last-Event-ID on every reconnect so the server replays exactly the gap, and apply each event idempotently so a duplicate id changes nothing.

New in this step
persist last id (`DataStore`)

Hold the last applied id in the ViewModel, and store it in DataStore if you want replay to survive the app being killed.

android datastore persist value track ↗
manual reconnect loop

On onFailure/onClosed, reopen /stream from the last id yourself — OkHttp will not, so the discipline is explicit.

okhttp sse manual reconnect loop
backoff

Wait briefly (and growing) between reconnect attempts so a flapping connection doesn’t hammer the server.

reconnect backoff retry
idempotent apply by version

Accept an event only if its version is strictly newer than the stored one — identical to the web client’s reducer, so re-delivered ids are ignored.

idempotent apply version dedupe
Still fuzzy? Copy this into any AI chat — it explains, it doesn't do the step for you.
The same protocol the browser gets for free, made explicit

This is the heart of the module, and it is the same contract the web client follows — the Make replay idempotent and client offset protocol steps define it once for every client. Two responsibilities, both yours on Android because OkHttp won’t do them:

  1. Persist the last id. Each applied data frame’s id is your progress marker. Hold it in the ViewModel, and persist it (e.g. DataStore) if you want replay to survive the app being killed. On reconnect, pass it as the Last-Event-ID request header — the server’s XREAD-from-id returns just the gap (and a replaying N frame first, which drives the pill); a resync event means the id was trimmed, so re-fetch GET /products and continue.
  2. Apply idempotently by version. Replay and live delivery can overlap, so the apply must be a no-op for anything you’ve already seen. Hold the current version per product and accept an event only if its version is strictly newer — identical to the web client’s applyEvent reducer. This is what makes at-least-once replay safe: a re-delivered id is simply ignored.

The reconnect loop itself is plain: on onFailure / onClosed, set the pill to Reconnecting…, back off briefly, and call openStream(client, baseUrl, lastId, listener) again. Snapshot-first on cold start (GET /products), then the Stream from lastId — baseline plus deltas, no gaps.

Idempotent apply + Last-Event-ID reconnect (ViewModel core)
Run these in your terminal / editor
// In BoardViewModel — the one reducer every event flows through.
private var lastId: String? = null   // the Stream id of the newest applied event; resent on reconnect

fun applyTick(eventId: String?, t: Tick) {
    _state.update { s ->
        val cur = s.rows[t.productId]
        if (cur != null && t.version <= cur.version) return@update s   // stale or duplicate — ignore
        val name = cur?.name ?: "Product ${t.productId}"
        s.copy(rows = s.rows + (t.productId to Row(t.productId, name, t.price, t.stock, t.version)))
    }
    if (eventId != null) lastId = eventId   // advance progress only after a successful apply
}

// Reconnect = reopen /stream from lastId; the server replays the gap, applyTick dedupes any overlap.
fun reconnect(client: OkHttpClient, baseUrl: String, listener: EventSourceListener) {
    setStatus(Status.Reconnecting)
    openStream(client, baseUrl, lastId, listener)   // resends Last-Event-ID — OkHttp will not do this for you
}
Agent prompt — paste into an agent with repo access
For Claude Code / Cursor / an agent that can read & edit this repo.
Role: Senior Android engineer (Kotlin, Jetpack Compose, coroutines) in this repo.
Context: The Ticker backend is unchanged and reachable at http://10.0.2.2:8080 from the emulator. It exposes GET /products (snapshot), GET /stream (SSE: data frames carry the Stream id and a JSON {product_id,price,stock,version}; custom "replaying" frame with data=count on reconnect; "resync" frame when the client's id was trimmed). Dependencies: okhttp, okhttp-sse, lifecycle-runtime-compose.
Task: Build BoardViewModel (StateFlow<BoardUiState>) plus an OkHttp SSE client that opens /stream, applies events idempotently, and reconnects resending Last-Event-ID.
Requirements:
- Open the stream with EventSources.createFactory(client).newEventSource(request, listener); set the Last-Event-ID request header to the last applied id (OkHttp does NOT auto-reconnect or auto-send it).
- applyTick(eventId, tick) is idempotent: ignore any event whose version is not strictly greater than the stored version for that product id; advance the stored lastId only after a successful apply.
- Map listener callbacks to status: onOpen -> Live; onFailure/onClosed -> Reconnecting then reconnect from lastId; type "replaying" -> Replaying(count); type "resync" -> re-fetch GET /products and continue.
- Cold start is snapshot-first: GET /products applied through the same reducer, then open the stream.
Tests / acceptance:
- A unit test feeds a FAKE EventSourceListener two events for the same product (versions 1 then 2): assert the board row ends at version 2 with the second event's price/stock.
- Re-deliver the version-2 event a second time (same id): assert the board state is unchanged (duplicate ignored) and the row count stays 1.
- Provide a fake EventSource so the test needs no network; assert lastId equals the id of the newest applied event.
Output: a unified diff plus a one-paragraph note on why the Android client must own reconnect/Last-Event-ID while the browser's EventSource does not.

Where to take it next

  • Go deep on the store that carries this whole build: the Redis track — Streams, pub/sub versus Streams, persistence, and SCAN.
  • Keep the durable truth honest with PostgreSQL — constraints, RETURNING, optimistic concurrency, and isolation levels.
  • Build the same fan-out in either language: Go or TypeScript (Hono on the edge).
  • Contrast the lessons: Aurora Commerce owns in-transaction correctness; Ticker owns after-commit propagation. See why on the Compare page, where Redis scores 5 here and a document store’s change-stream replay scores lower.