Pick your backend (Go or TypeScript) above; the steps adapt, and the bundled web dashboard is the same for both. Watch the spotlight: by the fan-out step, one operator’s price change is committed once to Postgres, appended to a Redis Stream, and pushed to every open browser within ~100ms — and a window that was briefly offline replays exactly the events it missed instead of refetching everything. The offline-replay demo near the end is where you’ll see a window catch up — that’s the marquee moment to build toward.
How this differs from Aurora Commerce. Same domain (price and stock),
orthogonal lesson. Aurora is about in-transaction correctness — checkout as one BEGIN … COMMIT, where
the oversell guard is the whole story. Ticker reduces that guard to a single deliberate atomic line and
spends the rest of the build on what Aurora never touches: propagating a committed change, live and in
order, to many watchers, with reconnect-replay. The server is the single arbiter of each row — there is no
merge (that’s Concord’s CRDT lesson), and clients reconcile against a shared feed by
Stream offset, not one device’s offline edits (that’s Vitals).
Stand up Postgres + Redis with one docker compose
BeginnerStart Postgres and Redis together with one Docker Compose file and export their two connection strings — so the durable store and the realtime store both run locally and your server knows where to find each.
New in this step
Docker Compose A YAML file that defines and runs containers together — here one Postgres and one Redis — so every learner gets the identical, throwaway pair with one command.
DATABASE_URL The Postgres connection string in an environment variable; reading it from the env means the same build runs locally, in CI, and in the cloud.
REDIS_URL The Redis address in an environment variable, shaped redis://host:port, so the realtime store is configured the same way as the database.
redis:// connection string The single-line address of a Redis server (redis://localhost:6379) that the client parses to connect.
Why one compose file — and it costs nothing
Both services are free, official open-source images — postgres:16 and redis:7 — so the whole backend
runs locally with no account, quota, or card. Compose gives every learner the same versions and a clean
reset (docker compose down -v). Your server reads DATABASE_URL and REDIS_URL from the environment, so
the identical build runs locally, in CI, and in the cloud — only the connection strings change.
Costs nothing. redis:7 and postgres:16 are open-source images; XADD, XREAD, and consumer groups
are core Redis, not a paid tier.
docker-compose.yml
# docker-compose.yml
services:
db:
image: postgres:16
environment:
POSTGRES_PASSWORD: dev
POSTGRES_DB: ticker
ports: ["5432:5432"]
redis:
image: redis:7
ports: ["6379:6379"]Run it
docker compose up -d
export DATABASE_URL="postgres://postgres:dev@localhost:5432/ticker?sslmode=disable"
export REDIS_URL="redis://localhost:6379"
redis-cli -u "$REDIS_URL" ping # -> PONGWhat success looks like
Both containers are up and reachable. docker compose ps shows db and redis as running, and the redis-cli ... ping prints PONG. The two env vars are now exported in this shell, so the server will read them.
Why a realtime DB is a change feed and fan-out, not a faster query
BeginnerBefore any code, lock in the core idea — realtime is a change feed that fans out and can be replayed, not a faster query — because that one distinction is why pub/sub and Streams are not interchangeable and why Streams is the spotlight here.
New in this step
change feed A running list of what changed (not the current value) so watchers can react to each event as it happens.
fan-out Delivering one change to many subscribers at once, instead of each one re-querying for it.
replay Re-reading past events from a chosen point so a client that was away can apply exactly what it missed.
Redis pub/sub Fire-and-forget messaging: at-most-once, with no memory — a subscriber that was offline never learns of the change.
Redis Streams An append-only log with stable ordered ids and read-from-any-id replay — the one primitive that gives change feed, fan-out, and replay together.
stream id `<ms>-<seq>` Every Stream entry’s id is a millisecond timestamp plus a sequence counter, monotonic and ordered — the marker you replay from.
Pub/sub forgets; a Stream remembers
A dashboard that shows live prices needs three things from its data layer: a change feed (tell me when
something changed), fan-out (tell everyone at once), and replay (tell me what I missed while I was
away). Redis pub/sub gives you the first two and not the third: it is fire-and-forget and at-most-once,
so a tab that was asleep when a price changed never learns about it. Redis Streams are an append-only
log — every entry gets a stable, ordered id (<ms>-<seq>), entries persist until you trim them, and XREAD
can start reading from any id to replay the gap. That third property is the whole project: it is what lets a
briefly-offline client catch up exactly instead of refetching everything.
Postgres is the durable truth, but a request/response store cannot push or replay, which is precisely why a
realtime store sits in front of it. The Redis track introduces pub/sub from redis-cli
and mentions Streams; here we make Streams load-bearing.
Feel fan-out and replay in raw redis-cli first
BeginnerProve the spotlight to yourself before any server code — in two terminals, watch XREAD BLOCK park and wake on a new XADD, then reconnect from an explicit id and watch it replay a gap — so the later server steps re-implement something you have already seen, not an act of faith.
New in this step
`XADD` Appends one entry to a Stream and returns its assigned ordered id; * lets Redis pick the next id.
`XREAD` Reads Stream entries after a given id — the live tail and the replay both come from this one command.
`XREAD BLOCK 0` Parks the connection waiting for the next entry instead of returning empty; 0 means wait forever (we move to a bounded block in the server).
`$` live-tail id A special id meaning only entries appended from now on — start here to watch live, not from history.
replay from an explicit id Calling XREAD with a real id you noted earlier returns every entry after it immediately — that is the reconnect path.
See it work in two windows before you build it
The whole project rests on two Stream behaviours; feel them by hand now so the later server steps are a
re-implementation of something you have already seen, not an act of faith. In window A, XREAD BLOCK 0
parks — it returns nothing until window B appends an entry, then wakes with that entry and its id. That is
the live fan-out path. Then reconnect: call XREAD again but start from an explicit id you noted earlier,
and Redis hands back every entry after it immediately — that is replay-from-id, the reconnect path. Two
commands, both halves of the build, in your terminal in under a minute.
Two terminals against the Compose Redis
# Terminal A — park on the live tail ("$" = only entries appended from now on):
redis-cli -u "$REDIS_URL" XREAD BLOCK 0 STREAMS warehouse:1 '$'
# (it hangs here, waiting)
# Terminal B — append one event; note the id it prints, e.g. 1718900000000-0:
redis-cli -u "$REDIS_URL" XADD warehouse:1 '*' product_id 1 price 1399 stock 50 version 1
# -> back in Terminal A, the entry appears instantly with its id
# Terminal B — append two more, then REPLAY from the first id (in Terminal A, Ctrl-C first):
redis-cli -u "$REDIS_URL" XADD warehouse:1 '*' product_id 1 price 1350 stock 49 version 2
redis-cli -u "$REDIS_URL" XADD warehouse:1 '*' product_id 1 price 1300 stock 48 version 3
redis-cli -u "$REDIS_URL" XREAD STREAMS warehouse:1 1718900000000-0 # use YOUR first id
# -> returns the two entries AFTER that id, immediately — that is reconnect-replayWhat success looks like
Terminal A’s XREAD BLOCK 0 hangs (the park), then prints the entry the instant Terminal B XADDs it — that is live fan-out. The replay XREAD STREAMS warehouse:1 <your-first-id> returns the two later entries immediately, in id order, and nothing before that id:
1) 1) "warehouse:1"
2) 1) 1) "1718900001000-0"
2) 1) "product_id" 2) "1" 3) "price" 4) "1350" ...
2) 1) "1718900002000-0"
2) 1) "product_id" 2) "1" 3) "price" 4) "1300" ...Schema: products with price, stock, and a version
BeginnerCreate a products table whose version column and CHECK constraints make every later guarantee possible — the version seeds optimistic concurrency and idempotent replay, and the CHECKs keep stock and price non-negative no matter which client writes.
New in this step
version column An integer you bump on every write; later it rejects stale writes and lets a replayed event apply at-most-once by comparison.
CHECK constraint A rule the row must satisfy or the write is rejected, so CHECK (stock >= 0) makes negative stock impossible from any client.
BIGINT GENERATED ALWAYS AS IDENTITY The modern auto-incrementing 64-bit primary key (the successor to serial).
money as BIGINT cents Integers can’t drift the way floats do; store 1499, not 14.99.
TIMESTAMPTZ A timestamp stored as a UTC instant, so updated_at is unambiguous across time zones.
The version column is the seed of optimistic concurrency
Money is BIGINT cents, never float. CHECK (stock >= 0) and CHECK (price >= 0) make impossible data
unwritable by any client. The new idea versus a plain catalog is version: an integer you bump on every
write. It lets two things work later — a stale write is rejected by a conditional UPDATE (optimistic
concurrency), and a replayed event is applied idempotently by comparing versions. updated_at gives every
change a wall-clock for the dashboard.
db/schema.sql
CREATE TABLE products (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
name TEXT NOT NULL,
price BIGINT NOT NULL CHECK (price >= 0), -- cents
stock INTEGER NOT NULL CHECK (stock >= 0),
version INTEGER NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
INSERT INTO products (name, price, stock) VALUES
('Aurora Mug', 1499, 50),
('Aurora Tee', 2999, 12),
('Sticker Pack', 499, 200);Load the schema and seed rows into Postgres
BeginnerRun db/schema.sql against the running Postgres yourself — Compose loads nothing automatically — so the products table and its three seed rows exist before any code queries them.
New in this step
`psql` Postgres’s command-line client; you point it at DATABASE_URL to run SQL against the running container.
`psql -f` Runs every statement in a .sql file in order — here it creates the table and inserts the seed rows in one shot.
`docker compose exec` Runs a command inside a running container, so you can pipe the schema through the db container when you have no local psql.
postgres default superuser The image’s built-in postgres user, used by DATABASE_URL because Compose set only the password and db name (not POSTGRES_USER).
Why this is its own step — nothing auto-loads it
The Compose file mounts no init script, so the table does not appear by magic — you load db/schema.sql
yourself once, after docker compose up. The connection string’s user is postgres (the image’s default
superuser, because Compose set only POSTGRES_PASSWORD and POSTGRES_DB); if you ever set POSTGRES_USER,
update DATABASE_URL to match. After this load, GET /products will return exactly the three seed products
the rest of the build assumes.
Load the schema (either form works)
# If you have a local psql client:
psql "$DATABASE_URL" -f db/schema.sql
# No local psql? Pipe it through the db container instead:
docker compose exec -T db psql -U postgres -d ticker < db/schema.sql
# Confirm the three seed rows landed:
psql "$DATABASE_URL" -c "SELECT id, name, price, stock, version FROM products ORDER BY id;"What success looks like
The SELECT returns exactly three rows in id order, each with version 0:
id | name | price | stock | version
----+--------------+-------+-------+---------
1 | Aurora Mug | 1499 | 50 | 0
2 | Aurora Tee | 2999 | 12 | 0
3 | Sticker Pack | 499 | 200 | 0
(3 rows)Now GET /products has something to return. Re-running schema.sql would error on the existing table — drop the volume (docker compose down -v) for a clean reload.
Model the change event you XADD to a Stream
IntermediateDesign the change event before writing code — a self-contained { product_id, price, stock, version } appended to one fixed Stream key — so every later step (write path, fan-out, replay, idempotency) shares the same shape and key. This build commits to one warehouse, warehouse:1.
New in this step
change event The four fields a mutation produces (product_id, price, stock, version) — enough state to apply it without reading anything else.
`warehouse:1` fixed key The single Stream key this build commits to everywhere — write routes append to it, the feed tails it, the dashboard reads it.
Stream field values are strings Redis stores every Stream field as text, so numbers are encoded as decimal strings on XADD and parsed back on read.
`*` auto-id Passing * to XADD tells Redis to assign the next monotonic <ms>-<seq> id and return it.
`XLEN` Reports how many entries a Stream currently holds — a quick way to confirm an event was appended.
`XRANGE` Lists entries between two ids (- to + is oldest to newest); COUNT caps how many you get back.
Design the event before the code
Every mutation produces exactly one event, and the event carries enough state to apply it without reading
anything else: { product_id, price, stock, version }. You append it with XADD warehouse:1 * ...; Redis
assigns the ordered id <ms>-<seq> and returns it.
Keying a Stream per warehouse (warehouse:{id}) is the general design — it keeps each feed small and lets a
dashboard subscribe to just the warehouse it watches. To keep this build focused, we commit to a single
warehouse: warehouse:1 is the fixed key everywhere — the write routes XADD to it, GET /stream tails
it, and the dashboard reads it. (Generalising to a request-supplied warehouse — GET /stream?warehouse=N,
write routes keyed by the product’s warehouse — is a clean exercise; if you do it, make every step use the
same id, or a dashboard keyed to warehouse:2 will silently see nothing.)
Because the event is self-contained and the version is monotonic, a client can apply events from anywhere — fresh, replayed, or duplicated — and still converge (you lean on this in the idempotency step). One Redis fact to internalise now: Stream field values are strings, so encode numbers as decimal text and parse them on the way out.
The event, by hand in redis-cli
# Append a change event; * = let Redis assign the next ordered id
redis-cli -u "$REDIS_URL" XADD warehouse:1 '*' product_id 1 price 1399 stock 50 version 1
# -> "1718900000000-0" (the stable id, shaped <ms>-<seq>)
redis-cli -u "$REDIS_URL" XLEN warehouse:1 # how many events are buffered
redis-cli -u "$REDIS_URL" XRANGE warehouse:1 - + COUNT 5 # oldest 5 entries (- is the min id, + is the max id)What success looks like
XADD returns the assigned id shaped <ms>-<seq> (e.g. 1718900000000-0), XLEN warehouse:1 reports the count of buffered events, and XRANGE lists each entry with its four fields — every value a string, since Stream field values are always strings:
1) 1) "1718900000000-0"
2) 1) "product_id" 2) "1" 3) "price" 4) "1399" 5) "stock" 6) "50" 7) "version" 8) "1"Scaffold the Go fan-out server
Go BeginnerStand up one Go binary that opens a pgxpool and a go-redis client and serves both the dashboard and a GET /products snapshot — the single process every later route hangs off.
New in this step
pgx and `pgxpool` The standard Postgres driver for Go; pgxpool is its connection pool, opened once at startup and reused per request.
go-redis The standard Go Redis client (github.com/redis/go-redis/v9); built from redis.ParseURL(REDIS_URL).
`http.ServeMux` (1.22 method patterns) Go’s built-in router; since 1.22 a pattern can include the method and path, e.g. GET /products.
`http.FileServer` Serves files from a directory and auto-returns index.html at /, so one binary also serves the dashboard.
context (`ctx`) A per-request value that carries cancellation and deadlines; pass r.Context() to every DB and Redis call.
`PORT` env var The listen port read from the environment (default 8080) so the later multi-instance step can run a second copy.
Why pgx + go-redis, and one binary that serves the UI
pgx is the standard Postgres driver for Go and github.com/redis/go-redis/v9 the standard Redis client;
both take a context on every call. The server is deliberately one process doing three jobs — REST writes,
an SSE stream, and serving the static public/index.html — so a learner runs one binary and opens one URL.
Always pass r.Context() and use parameters ($1); never string-concatenate SQL.
Set up the module
go mod init github.com/you/ticker
go get github.com/jackc/pgx/v5
go get github.com/redis/go-redis/v9Pool, client, snapshot endpoint
// cmd/server/main.go (essentials)
type Product struct {
ID int64 `json:"id"`
Name string `json:"name"`
Price int64 `json:"price"` // cents
Stock int `json:"stock"`
Version int `json:"version"`
}
ctx := context.Background()
pool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
if err != nil { log.Fatal(err) }
defer pool.Close()
opt, err := redis.ParseURL(os.Getenv("REDIS_URL"))
if err != nil { log.Fatal(err) }
rdb := redis.NewClient(opt)
mux := http.NewServeMux()
mux.Handle("GET /", http.FileServer(http.Dir("public"))) // serves public/index.html
mux.HandleFunc("GET /products", func(w http.ResponseWriter, r *http.Request) {
rows, _ := pool.Query(r.Context(),
`SELECT id, name, price, stock, version FROM products ORDER BY id`)
defer rows.Close()
out := []Product{}
for rows.Next() {
var p Product
_ = rows.Scan(&p.ID, &p.Name, &p.Price, &p.Stock, &p.Version)
out = append(out, p)
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(out)
})
port := os.Getenv("PORT") // honor PORT so the multi-instance step can run two copies
if port == "" {
port = "8080"
}
log.Fatal(http.ListenAndServe(":"+port, mux))Agent prompt — paste into an agent with repo access
Role: Senior Go engineer in this repo.
Context: Postgres via env DATABASE_URL, Redis via REDIS_URL; schema loaded from db/schema.sql; github.com/jackc/pgx/v5 and github.com/redis/go-redis/v9.
Task: Scaffold cmd/server with a pgxpool, a go-redis client, a GET /products snapshot, and a static file server for ./public.
Requirements:
- Pool and client created once at startup, closed on shutdown; every call takes r.Context().
- Money is int64 cents; parameterised queries only; the redis client is built from redis.ParseURL(REDIS_URL).
- GET /products returns 200 with a JSON array ordered by id; GET / serves public/index.html via http.FileServer(http.Dir("public")) (it serves index.html at / automatically). The listen port reads PORT, default 8080.
Tests / acceptance:
- `go build ./...` passes; `curl -s localhost:8080/` returns the dashboard HTML; after loading the schema, `curl -s localhost:8080/products | jq length` returns 3.
Output: a unified diff plus a one-line note on pool sizing.What success looks like
go build ./... is clean and the binary starts on :8080. curl -s localhost:8080/ returns the dashboard HTML, and once the schema is loaded curl -s localhost:8080/products | jq length prints 3 — the snapshot as JSON numbers (price, stock, version), ordered by id:
[{"id":1,"name":"Aurora Mug","price":1499,"stock":50,"version":0}, ...]Scaffold the TypeScript edge server
TypeScript BeginnerStand up one Hono app on Bun that connects node-postgres and node-redis and serves both the dashboard and a GET /products snapshot — the TypeScript twin of the Go server, byte-identical at the contract.
New in this step
Hono A tiny web framework built on the Web Request/Response API, so the same app runs on Bun, Node, and the edge.
Bun A fast all-in-one JavaScript runtime whose built-in server reads the app’s default export.
node-postgres `Pool` The pg package’s Postgres connection pool — the TypeScript counterpart to Go’s pgxpool.
node-redis (`redis@^5`) The redis package, pinned to v5 because its Stream and blocking-read behaviour is version-sensitive.
`serveStatic` path fallback Hono’s bun static handler maps a path to a file, so a bare / doesn’t auto-serve index.html; the path: catch-all does.
Bun default-export port + fetch Bun starts a server from export default { port, fetch }; reading process.env.PORT lets a second instance run elsewhere.
Why Hono + Bun — one app, two runtimes
Hono is a tiny web framework that speaks the Web Request/Response API, so the same app runs on Bun,
Node, and the edge. Bun is a fast, batteries-included runtime whose built-in server reads the app’s default
export. You’ll use pg (node-postgres) for the durable store and the redis package (node-redis, pinned to
v5) for Streams. Keep request handlers free of Node-only APIs so the app stays portable — exactly the
discipline the TypeScript track builds.
One static-serving gotcha worth getting right now: Hono’s bun serveStatic({ root }) maps the request path
to a file under root, so a bare GET / resolves to the directory and does not reliably auto-serve
index.html the way Go’s http.FileServer does. The documented SPA pattern is a catch-all that points at
the file directly — serveStatic({ path: "./public/index.html" }) — registered after your API routes. That
keeps GET / returning the dashboard, so the page works identically on both backends.
Set up the project
bun init -y
bun add hono pg redis@^5 # pin node-redis v5 — its Stream/blocking-read behaviour is version-sensitive
# Node alternative: npm add hono pg redis@^5 @hono/node-serverApp, clients, snapshot endpoint
// src/server.ts
import { Hono } from "hono";
import { serveStatic } from "hono/bun";
import { Pool } from "pg";
import { createClient } from "redis";
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();
const app = new Hono();
app.get("/products", async (c) => {
const { rows } = await pool.query(
"SELECT id, name, price, stock, version FROM products ORDER BY id",
);
return c.json(rows);
});
// Serve the single dashboard file. Hono's bun serveStatic maps a request path to a file under `root`,
// so a request for "/" resolves to the *directory*, not index.html. Use the `path:` form as the SPA
// fallback so GET / returns the dashboard. (Register this AFTER your API routes so it only catches the rest.)
app.get("*", serveStatic({ path: "./public/index.html" }));
const port = Number(process.env.PORT) || 8080; // honor PORT for the multi-instance step
export default { port, fetch: app.fetch }; // Bun reads port + fetch from the default exportAgent prompt — paste into an agent with repo access
Role: Senior TypeScript engineer in this repo building an edge-portable server.
Context: Bun runtime; hono, pg (node-postgres), and redis (node-redis, pinned redis@^5) installed; Postgres via DATABASE_URL, Redis via REDIS_URL; schema loaded from db/schema.sql.
Task: Build src/server.ts — a Hono app with GET /products (snapshot) and static serving of ./public, exported as a Bun server whose port reads PORT (default 8080).
Requirements:
- Create the pg Pool and the node-redis client once; await redis.connect() at startup.
- GET /products returns the rows as JSON ordered by id; prices stay integer cents.
- Serve public/index.html at / using the catch-all path fallback: app.get("*", serveStatic({ path: "./public/index.html" })), registered AFTER the API routes; no Node-only APIs in request handlers so the app stays portable.
- The exported port reads process.env.PORT (Number(process.env.PORT) || 8080) so a second instance can run on another port.
Tests / acceptance:
- `bun run src/server.ts` starts; `curl -s localhost:8080/` returns the dashboard HTML (not a 404); after loading the schema, `curl -s localhost:8080/products | jq length` returns 3.
Output: a unified diff plus the one-line Node entrypoint variant using @hono/node-server's serve({ fetch: app.fetch }).What success looks like
bun run src/server.ts starts on :8080. curl -s localhost:8080/ returns the dashboard HTML (not a 404 — the path: catch-all serves it), and curl -s localhost:8080/products | jq length prints 3. The body is byte-identical to the Go server’s snapshot — same id order, same JSON-number fields — which is the parity the rest of the build leans on.
★ The write path: commit to Postgres, then XADD the event
IntermediateMake every write two ordered steps — commit to Postgres first, then XADD the post-write state to warehouse:1 — so the Stream never advertises a change that did not actually happen. This is the spotlight’s spine; the next step wires it to HTTP routes.
New in this step
commit-before-publish Persist to the durable store first, then publish the event, so the feed can never advertise a change that did not commit.
conditional UPDATE An UPDATE ... WHERE that only changes a row when the predicate still holds (matching version, or stock >= qty); zero rows means it did not.
`UPDATE ... RETURNING` Hands back columns from the rows the UPDATE touched, so you get the post-write price/stock/version in one round trip to serialise into the XADD.
expectedVersion The version the client last saw, passed into the WHERE; if it no longer matches, someone else won the race and zero rows update.
`XADD MAXLEN ~ 10000` Appends the event while capping the Stream to ~10000 entries; ~ trims in efficient batches (a little over is fine).
Commit first, then publish — and why that order
This is the spine of the project. A write does two things: it updates the row in Postgres (the durable
truth) and appends an event to the Stream (the thing watchers read). Do them in that order — Postgres
commits first, so the Stream never advertises a change that didn’t actually happen. The
UPDATE ... RETURNING price, stock, version hands you the post-write state in one round trip; you serialise
exactly that into the XADD. Because the event id is assigned by Redis and is monotonic, the Stream is now
the canonical order of changes — which is what makes fan-out and replay agree across every client and
every server instance.
This step gives you the two store operations — setPrice(id, newPrice, expectedVersion) and
sell(id, qty) — as the SQL + commit-then-XADD order. The HTTP routes that call them
(PUT /products/{id}/price, POST /products/{id}/sell) are the very next step, so the dashboard has real
endpoints to hit.
Commit, then XADD (language-agnostic order)
1. BEGIN
2. UPDATE products SET price = $new, version = version + 1, updated_at = now()
WHERE id = $id AND version = $expected
RETURNING price, stock, version
3. COMMIT
4. XADD warehouse:1 MAXLEN ~ 10000 * product_id $id price $price stock $stock version $version
-> returns the event id, e.g. 1718900000000-0 (MAXLEN ~ bounds the Stream; see the bounding step)Agent prompt — paste into an agent with repo access
Role: Senior backend engineer in this repo (use the selected backend).
Context: Postgres has products(id, name, price, stock, version); a Redis client is connected. This build uses one fixed Stream key, "warehouse:1".
Task: Implement the two write-path store operations — setPrice(id, newPrice, expectedVersion) and sell(id, qty) — that commit to Postgres, then XADD the change event. (The HTTP routes that call these are the next step.)
Requirements:
- setPrice updates Postgres first: `... SET price=$1, version=version+1, updated_at=now() WHERE id=$2 AND version=$3 RETURNING price, stock, version`; only on a committed (one-row) result do you XADD.
- sell updates atomically: `... SET stock=stock-$1, version=version+1, updated_at=now() WHERE id=$2 AND stock>=$1 RETURNING price, stock, version`; XADD only on a one-row result.
- XADD to "warehouse:1" with auto-id * and fields product_id, price, stock, version (values as decimal strings); capture the returned event id. Add MAXLEN ~ 10000.
- Never XADD before the commit succeeds; a zero-row result is NOT an event.
- A zero-row UPDATE has two causes — stale version / insufficient stock (row exists) or unknown id (no row) — and the conditional UPDATE alone cannot tell them apart. On zero rows, re-SELECT the row by id: if present it is a conflict / out-of-stock; if absent the id is unknown.
- Return enough to the caller to form the HTTP response: on success the post-write {id,name,price,stock,version}; on a zero-row failure both a found signal (the route maps not-found to 404) and, when found, the current row (the route maps it to 409).
Tests / acceptance:
- After a successful price change, XLEN warehouse:1 increases by exactly 1 and XRANGE shows the new price and version.
- A failed (zero-row) setPrice or sell appends no event (XLEN unchanged).
Output: a unified diff plus a one-paragraph note on why commit-before-XADD is the correct order.What success looks like
A successful setPrice/sell commits the row and appends exactly one event: XLEN warehouse:1 grows by 1 and XREVRANGE warehouse:1 + - COUNT 1 shows the post-write price/stock/version. A zero-row write (stale version, insufficient stock, or unknown id) commits nothing and appends nothing — XLEN is unchanged, proving the Stream never advertises a change that did not happen:
# after one successful price change of product 1:
XLEN warehouse:1 -> (integer) 1
XADD ... returns -> "1718900000000-0" # the new event's id
# after a stale-version PUT: XLEN warehouse:1 -> (integer) 1 (no new event)Wire the write routes: PUT price and POST sell
Go IntermediateGive the dashboard real endpoints by exposing the write path as PUT /products/{id}/price and POST /products/{id}/sell — each maps the store result to the contract (200+row, 409+current row, 404 unknown id) and fans out via the write path’s XADD on success.
New in this step
`r.PathValue` Reads a named path segment from a 1.22 route pattern, e.g. the {id} in PUT /products/{id}/price.
`json.NewDecoder` Decodes the request body’s JSON into a struct; a malformed body is your 400 branch.
`pgx.ErrNoRows` The error a RETURNING query gives when the conditional UPDATE matched zero rows — the signal you must disambiguate.
re-SELECT disambiguation On zero rows, re-SELECT the row by id: present ⇒ stale/out-of-stock (409 + current row); absent ⇒ unknown id (404).
status codes 200 / 409 / 404 Success returns 200 + the new row, a version/stock conflict returns 409 + the live row to rebase to, an unknown id returns 404.
The routes are the deliverable — and the zero-row case is two different errors
The dashboard’s slider PUTs {price, version} and its Sell button POSTs {qty}; without these routes
both 404. Each handler is thin: parse the JSON body and the {id} path value (Go 1.22’s r.PathValue("id")),
call the matching store op from the write path, and translate the result to the HTTP contract. The XADD
already lives inside the write path, so a successful route automatically fans the change out.
The subtle part is the zero-row case. The conditional UPDATE ... WHERE id=$id AND version=$expected
(or ... AND stock>=$qty) returns no row for two different reasons: the version was stale / the stock was
insufficient (the row exists), or there is no such id at all. pgx.ErrNoRows from the UPDATE’s
RETURNING cannot tell them apart — so on zero rows you re-SELECT the row by id alone. If that
re-SELECT finds the row, it is a real conflict / out-of-stock: return 409 with the current
{price, stock, version} so the client rebases to the live value. If the re-SELECT finds nothing, the id
is unknown: return 404. One extra indexed PK lookup, only on the cold failure path, is what makes the two
errors distinguishable — a route that always returned 409 could never surface a genuine bad id.
The two write routes (Go, net/http 1.22 path values)
type priceReq struct {
Price int64 `json:"price"`
Version int `json:"version"`
}
type sellReq struct {
Qty int `json:"qty"`
}
// PUT /products/{id}/price — body {price, version}
mux.HandleFunc("PUT /products/{id}/price", func(w http.ResponseWriter, r *http.Request) {
id, _ := strconv.ParseInt(r.PathValue("id"), 10, 64)
var req priceReq
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "bad_request", http.StatusBadRequest)
return
}
// conditional UPDATE ... WHERE id=$ AND version=$ RETURNING price, stock, version
// found=false => the re-SELECT found no such id; conflict=true => row exists but version was stale.
p, conflict, found, err := setPrice(r.Context(), pool, rdb, id, req.Price, req.Version)
writeResult(w, p, conflict, found, err)
})
// POST /products/{id}/sell — body {qty}
mux.HandleFunc("POST /products/{id}/sell", func(w http.ResponseWriter, r *http.Request) {
id, _ := strconv.ParseInt(r.PathValue("id"), 10, 64)
var req sellReq
if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Qty < 1 {
http.Error(w, "bad_request", http.StatusBadRequest)
return
}
// atomic UPDATE ... WHERE id=$ AND stock>=$qty RETURNING price, stock, version
p, outOfStock, found, err := sell(r.Context(), pool, rdb, id, req.Qty)
writeResult(w, p, outOfStock, found, err)
})
// writeResult maps the store result to the contract:
// 404 unknown id | 409 + current row on conflict/out-of-stock | 200 + product on success.
func writeResult(w http.ResponseWriter, p Product, failed bool, found bool, err error) {
w.Header().Set("Content-Type", "application/json")
if err != nil {
http.Error(w, "server_error", http.StatusInternalServerError)
return
}
if !found { // the re-SELECT found no row for this id
http.Error(w, "not_found", http.StatusNotFound)
return
}
if failed {
w.WriteHeader(http.StatusConflict) // 409; body is the current row (+ "error" for sell)
}
_ = json.NewEncoder(w).Encode(p)
}Agent prompt — paste into an agent with repo access
Role: Senior Go engineer in this repo.
Context: cmd/server already has the pool, the go-redis client (rdb), GET /products, and the write-path store ops setPrice(ctx,pool,rdb,id,price,version) and sell(ctx,pool,rdb,id,qty) that commit then XADD to "warehouse:1". Go 1.22 net/http path values.
Task: Add PUT /products/{id}/price and POST /products/{id}/sell that call the write path and return the contract.
Requirements:
- Parse {id} via r.PathValue("id"); parse the JSON body ({price,version} for price, {qty} for sell); a malformed body or qty < 1 is 400.
- On a one-row update: 200 with the post-write product JSON {id,name,price,stock,version}. The store op XADDs on success — do not XADD in the handler.
- On a zero-row update, disambiguate by re-SELECTing the row by id: if the row exists it is a conflict / out-of-stock -> 409 with the CURRENT row JSON (for sell include an "error":"out_of_stock" field); if the re-SELECT finds nothing the id is unknown -> 404. The store op returns both signals (e.g. a found bool plus a conflict/outOfStock bool). Emit no event on either failure.
- price/stock/version are JSON numbers in these bodies (cents for price).
Tests / acceptance:
- curl -X PUT localhost:8080/products/1/price -d '{"price":1399,"version":0}' returns 200 and {"version":1,...}; repeating with the stale version 0 returns 409 with the current row.
- curl -X POST localhost:8080/products/1/sell -d '{"qty":1}' returns 200 with stock-1; selling more than stock returns 409 {"error":"out_of_stock",...}.
- curl -X PUT localhost:8080/products/9999/price -d '{"price":1,"version":0}' (and the same for /sell) returns 404, not 409 — the id does not exist.
Output: a unified diff plus a one-paragraph note on why a re-SELECT on the zero-row path is what separates 404 from 409.What success looks like
The three branches of the contract are reachable. A first PUT /products/1/price with {"price":1399,"version":0} returns 200 with version bumped to 1; repeating it with the now-stale version:0 returns 409 carrying the current row (so the client can rebase). Selling more than stock returns 409 with {"error":"out_of_stock",...}, and a write to a missing id returns 404, not 409 — the re-SELECT is what makes that distinction real:
PUT /products/1/price {"price":1399,"version":0} -> 200 {"id":1,...,"version":1}
PUT /products/1/price {"price":1500,"version":0} -> 409 {"id":1,...,"version":1}
PUT /products/9999/price {...} -> 404Wire the write routes: PUT price and POST sell
TypeScript IntermediateGive the dashboard real endpoints by exposing the write path as two Hono routes (PUT /products/:id/price, POST /products/:id/sell) whose 200/409/404 bodies are byte-identical to the Go server — the parity the project claims.
New in this step
`c.req.param` Reads a named route parameter, e.g. the :id in PUT /products/:id/price.
`c.req.json` Parses the request body as JSON into a typed shape; a non-integer field or qty < 1 is your 400 branch.
`result.rowCount` node-postgres’s count of rows the query affected; 0 means the conditional UPDATE matched nothing — the case to disambiguate.
`c.json(body, status)` Returns a JSON response with an explicit status, e.g. c.json(row, 409) for a conflict carrying the current row.
re-SELECT disambiguation On a zero-row result, re-SELECT the row by id: present ⇒ 409 + current row; absent ⇒ 404 — identical to the Go server.
Thin handlers over the write path — and the zero-row case is two errors
The dashboard’s slider PUTs {price, version} and its Sell button POSTs {qty}; without these routes
both 404. Each Hono handler reads the :id param (c.req.param("id")) and the JSON body
(await c.req.json()), calls the matching write-path op, and maps the result to the contract. The XADD
lives inside the write path, so a successful route fans the change out automatically.
The zero-row case carries two distinct errors, exactly as in the Go server. The conditional UPDATE matches
no row when the version is stale / stock is insufficient (the row exists) or when there is no such id —
result.rowCount === 0 (node-postgres) cannot tell which. So on a zero-row result, re-SELECT the row by
id: if it comes back, return c.json(currentRow, 409) (a real conflict or out-of-stock, the client rebases
to the live value); if it comes back empty, return c.json({ error: "not_found" }, 404). The write-path op
returns both signals (e.g. { row, conflict, found }) so the handler stays a thin mapper — and the two
backends return byte-identical 404/409/200 responses.
The two write routes (Hono + node-postgres)
// PUT /products/:id/price — body {price, version}
app.put("/products/:id/price", async (c) => {
const id = Number(c.req.param("id"));
const { price, version } = await c.req.json<{ price: number; version: number }>();
if (!Number.isInteger(price) || !Number.isInteger(version)) {
return c.json({ error: "bad_request" }, 400);
}
// found=false => the re-SELECT found no such id; conflict=true => row exists but version was stale.
const { row, conflict, found } = await setPrice(pool, redis, id, price, version); // commits, then XADD
if (!found) return c.json({ error: "not_found" }, 404);
return conflict ? c.json(row, 409) : c.json(row, 200);
});
// POST /products/:id/sell — body {qty}
app.post("/products/:id/sell", async (c) => {
const id = Number(c.req.param("id"));
const { qty } = await c.req.json<{ qty: number }>();
if (!Number.isInteger(qty) || qty < 1) {
return c.json({ error: "bad_request" }, 400);
}
const { row, outOfStock, found } = await sell(pool, redis, id, qty); // atomic decrement, then XADD
if (!found) return c.json({ error: "not_found" }, 404);
return outOfStock ? c.json({ error: "out_of_stock", ...row }, 409) : c.json(row, 200);
});Agent prompt — paste into an agent with repo access
Role: Senior TypeScript engineer in this repo.
Context: src/server.ts has the pg Pool, the node-redis client, GET /products, and the write-path ops setPrice(pool,redis,id,price,version) and sell(pool,redis,id,qty) that commit then XADD to "warehouse:1". Hono on Bun.
Task: Add PUT /products/:id/price and POST /products/:id/sell that call the write path and return the contract, matching the Go server exactly.
Requirements:
- Read :id via c.req.param; parse the JSON body ({price,version} or {qty}); a non-integer field or qty < 1 is 400.
- On a one-row update: 200 with the post-write product {id,name,price,stock,version}. The write-path op XADDs on success — do not XADD in the handler.
- On a zero-row update, disambiguate by re-SELECTing the row by id: if the row exists -> 409 with the CURRENT row (for sell add "error":"out_of_stock"); if the re-SELECT finds nothing -> 404 {"error":"not_found"}. The write-path op returns both signals ({ row, conflict|outOfStock, found }). Emit no event on either failure.
- price/stock/version are JSON numbers (cents for price); register these routes BEFORE the serveStatic catch-all.
Tests / acceptance:
- PUT /products/1/price {"price":1399,"version":0} -> 200 {"version":1,...}; repeat with version 0 -> 409 current row.
- POST /products/1/sell {"qty":1} -> 200 stock-1; over-sell -> 409 {"error":"out_of_stock",...}.
- PUT /products/9999/price and POST /products/9999/sell -> 404, not 409 (no such id). Behaviour byte-identical to the Go server.
Output: a unified diff plus a one-paragraph note on keeping the 404/409/200 body shapes identical across backends.What success looks like
Byte-identical to the Go server. The same PUT/POST/over-sell/missing-id sequence returns the same 200/409/409 out_of_stock/404 codes and the same JSON bodies (the 409 carries the current row, with "error":"out_of_stock" added for sell). A learner can swap backends mid-build and the dashboard cannot tell — that is the parity the project claims.
Optimistic concurrency: conditional UPDATE, stale write returns 409
IntermediateLet writers race without locks and detect the loser — the conditional UPDATE succeeds only if the client’s version still matches, and a mismatch returns 409 + the current row so the client rebases in one retry.
New in this step
optimistic concurrency Assume writes rarely collide: let them race, then reject any whose version changed underneath — no lock held across the edit.
pessimistic locking The opposite approach — lock the row before editing so others wait; cheaper to reason about, but slower when conflicts are rare.
rebase on conflict Because the 409 body has the same shape as a change event, the client adopts the fresh value and retries once, the same way it would on fan-out.
Optimistic concurrency without locks
Many operators may edit the same product. Rather than lock rows, you let writers race and detect the loser:
the write carries the version the client last saw, and UPDATE ... WHERE id = $id AND version = $expected
bumps the version only if nothing changed underneath. Zero rows means someone else won the version race; the
write-path step already covers how a zero-row result re-SELECTs by id to return 409 (row
present) versus 404 (no such id). What this step adds is the rebase: because the 409 body has the same
shape as a change event, the client reconciles to the fresh value the same way whether it learns of it by
conflict or by fan-out — so a stale write costs one retry, not a lock held across the edit. That makes
optimistic concurrency cheaper than pessimistic locking exactly when conflicts are rare.
The conditional UPDATE
UPDATE products
SET price = $1, version = version + 1, updated_at = now()
WHERE id = $2 AND version = $3
RETURNING price, stock, version;
-- 0 rows -> re-SELECT the row by id: found = version conflict (HTTP 409 + that row); not found = unknown id (HTTP 404)
-- 1 row -> success: XADD the returned state as a change eventThe oversell guard in one atomic line
IntermediateMake overselling impossible with a single atomic statement — ... WHERE stock >= qty — so two clients racing for the last unit serialise and exactly one wins; this one line is the whole concurrency story here, unlike Aurora’s full transaction.
New in this step
oversell guard The WHERE stock >= qty predicate that lets the decrement match a row only when enough stock remains — no row, no sale.
atomic statement A single UPDATE runs as one indivisible operation, so the check and the decrement can’t be split by another writer.
row lock Postgres locks the matched row for the duration of the statement, so two clients racing for the last unit serialise — one wins, the other matches none.
One line here; a whole transaction there
A sale decrements stock and, like every write, bumps the version and emits an event. The safety is one atomic statement: `UPDATE products SET stock = stock - $qty, version = version + 1 WHERE id = $id AND stock
= $qty
. Thestock >= $qtypredicate *is* the guard: Postgres row-locks the matched row for the duration of the statement, so two clients racing for the last unit serialise — exactly one affects a row, the other matches none. The zero-row outcome then flows through the same disambiguation the [write-path step](#write-path) defines (re-SELECTby id →409 out_of_stockif the row exists,404if it does not). This is deliberately the same primitive Aurora uses, but here it is a single line, not the lesson: **[Aurora Commerce](/projects/aurora-commerce/)** spends a wholeBEGIN … COMMIT` making a multi-row checkout atomic; Ticker needs only this one guard and spends its energy on propagating the result of the sale to every watcher. Same guarantee, opposite emphasis.
Atomic decrement = the oversell guard
UPDATE products
SET stock = stock - $1, version = version + 1, updated_at = now()
WHERE id = $2 AND stock >= $1
RETURNING price, stock, version;
-- 0 rows -> re-SELECT by id: found = insufficient stock (409 "out_of_stock"); not found = unknown id (404). No event either way.
-- 1 row -> sale committed: XADD the new {price, stock, version} to the Stream★ Tail the Stream and fan out over SSE
Go AdvancedTurn GET /stream into a live fan-out — loop on a bounded XREAD BLOCK and write one SSE id:/data: frame per event (plus a keepalive on each idle timeout) — so every connected dashboard, across every instance, tails the same Stream.
New in this step
Server-Sent Events (`text/event-stream`) A long-lived HTTP response that streams text events to the browser one-way — exactly the live feed shape.
`id:` / `data:` frames Each SSE event is id: <stream-id> then data: <json> then a blank line; id: is what the browser echoes back on reconnect.
`http.Flusher` The interface whose Flush() pushes buffered bytes to the client immediately — without it the browser sees nothing until a buffer fills.
bounded `XREAD BLOCK` XREAD BLOCK 15000 parks for at most 15s, so a closed tab frees its reader instead of blocking forever like BLOCK 0.
`redis.Nil` on timeout The error go-redis returns when the bounded block elapsed with no entries — your cue to heartbeat and re-check the connection.
`:keepalive` heartbeat A comment line (:-prefixed) EventSource ignores but idle proxies don’t — it keeps the connection from being killed.
One reader, many subscribers — and N instances act as one
Server-Sent Events is just a long-lived HTTP response with Content-Type: text/event-stream; you write
id: <stream-id> then data: <json> then a blank line per event, and flush. The source of those events is
XREAD BLOCK 15000 STREAMS warehouse:1 $, which parks the connection until the next entry is appended (or
the block elapses), then returns it with its id. Because every server instance tails the same Stream, you
can run N servers and they all deliver the same ordered events — the Stream, not the process, is the fan-out
point, so horizontal scaling is free. Pass the id the client last saw (the next step wires Last-Event-ID)
instead of $ to begin with replay. Flush after every write (http.Flusher) or the browser sees nothing
until a buffer fills.
SSE fan-out (go-redis + http.Flusher)
// GET /stream — Server-Sent Events fan-out from a Redis Stream
mux.HandleFunc("GET /stream", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok { http.Error(w, "streaming unsupported", 500); return }
stream := "warehouse:1"
lastID := r.Header.Get("Last-Event-ID") // empty on the first connect
if lastID == "" { lastID = "$" } // "$" = only events appended from now on
for {
// Bounded block, NOT Block: 0 — a 15s read deadline lets a closed tab
// (cancelled r.Context()) free this reader instead of parking forever.
res, err := rdb.XRead(r.Context(), &redis.XReadArgs{
Streams: []string{stream, lastID},
Block: 15 * time.Second,
Count: 500,
}).Result() // XRead returns *XStreamSliceCmd; .Result() gives ([]XStream, error)
if err == redis.Nil { // block elapsed with no entries
fmt.Fprint(w, ":keepalive\n\n") // comment line; EventSource ignores it, proxies stay alive
flusher.Flush()
if r.Context().Err() != nil { return } // tab closed during the idle window
continue
}
if err != nil { return } // client gone or context cancelled
for _, msg := range res[0].Messages {
lastID = msg.ID
payload, _ := json.Marshal(msg.Values)
fmt.Fprintf(w, "id: %s\ndata: %s\n\n", msg.ID, payload)
}
flusher.Flush()
}
})Agent prompt — paste into an agent with repo access
Before you write it: when a data frame goes out on /stream, which of price, stock, version are JSON numbers and which are strings — and why does the wire format differ from GET /products?
Role: Senior Go engineer in this repo.
Context: go-redis v9 client at rdb; Stream key "warehouse:1"; the server already serves /products and the dashboard.
Task: Add GET /stream that fans out the Redis Stream to the client over Server-Sent Events.
Requirements:
- Set Content-Type: text/event-stream and Cache-Control: no-cache; assert http.Flusher and Flush() after writing.
- Read the start id from the Last-Event-ID header; default to "$" (only-new) when absent.
- Loop on rdb.XRead with XReadArgs{Streams: []string{stream, lastID}, Block: 15 * time.Second, Count: 500}. Do NOT use Block: 0 — in go-redis v9 a cancelled context does not interrupt a parked Block: 0 read, leaking a connection per closed tab.
- On a timeout (err == redis.Nil) write a ":keepalive\n\n" comment, Flush(), and return if r.Context().Err() != nil so a closed tab is released within ~15s; otherwise continue.
- For each message write "id: <id>\ndata: <json>\n\n", advance lastID to its id, then Flush(); return on any other error.
Tests / acceptance:
- curl -N localhost:8080/stream stays open; an XADD to warehouse:1 (or a price change) appears as an SSE frame within ~100ms; an idle connection still receives a ":keepalive" comment about every 15s.
- Reconnecting with -H "Last-Event-ID: <id>" replays only events after that id.
- Closing the client (Ctrl-C) frees the server-side reader within ~15s (the goroutine returns on the next timeout).
Output: a unified diff plus a one-paragraph note on why a bounded block beats Block: 0 for connection cleanup.What success looks like
curl -N localhost:8080/stream stays open. A price change (or a raw XADD) appears within ~100ms as a data frame — an id: line carrying the Stream id and a data: line whose JSON fields are strings (forwarded msg.Values), terminated by a blank line. On an idle connection you see a :keepalive comment about every 15s (the bounded-block timeout), which EventSource ignores but proxies don’t:
id: 1718900000000-0
data: {"product_id":"1","price":"1399","stock":"50","version":"4"}
:keepalive
★ Tail the Stream and fan out over SSE
TypeScript AdvancedBuild the same live fan-out in Hono — streamSSE over a dedicated Redis connection with a bounded xRead BLOCK, writeSSE per event, keepalive on idle — so the wire output is contract-identical to the Go server.
New in this step
`streamSSE` / `writeSSE` Hono’s helper that turns a handler into a text/event-stream response; writeSSE({ id, data }) emits one frame.
dedicated `redis.duplicate()` connection A blocking read ties up its connection, so run it on a duplicated client opened per stream — never the shared one your REST routes use.
bounded `xRead BLOCK` xRead(..., { BLOCK: 15000 }) parks at most 15s; node-redis also documents a BLOCK 0 foot-gun that can fail to resolve on a duplicated connection.
`null` on timeout node-redis returns null when the bounded block elapsed with no entries — your cue to heartbeat and re-check the loop guard.
`sse.closed` / `sse.aborted` Flags Hono sets when the client disconnects; re-checking them each loop frees the duplicated reader within ~15s.
Async iteration models an SSE stream cleanly
Hono’s streamSSE helper turns a handler into a text/event-stream response and gives you a stream
object with writeSSE({ id, data }). The events come from node-redis xRead. One rule matters: a blocking
xRead ties up its connection, so you must run it off your shared client. node-redis documents
createClientPool() for isolating blocking commands; for a long-lived per-client stream the simplest correct
choice is a dedicated connection via redis.duplicate() that you open on connect and close on disconnect
— never the shared one your REST handlers use, or the whole app stalls behind the blocked read. Each connected
client gets its own reader, loops awaiting the next event, advances its last id, and writes a frame. The id
you start from is the Last-Event-ID header (replay) or $ (only-new).
SSE fan-out (Hono streamSSE + node-redis)
import { streamSSE } from "hono/streaming";
app.get("/stream", (c) => {
const stream = "warehouse:1";
let lastID = c.req.header("Last-Event-ID") ?? "$"; // "$" = only new events
return streamSSE(c, async (sse) => {
const reader = redis.duplicate(); // blocking reads need their own connection
await reader.connect();
try {
while (!sse.closed && !sse.aborted) {
// Bounded block, NOT BLOCK: 0 — node-redis can fail to resolve a BLOCK 0 xRead on a
// duplicated connection (#2258), and a parked BLOCK 0 never re-checks sse.closed, leaking
// the reader when the tab closes. A 15s block returns null on timeout so we re-check below.
const res = await reader.xRead(
{ key: stream, id: lastID },
{ BLOCK: 15000, COUNT: 500 },
);
if (!res) {
await sse.write(":keepalive\n\n"); // comment line; EventSource ignores it, proxies stay alive
continue; // loop guard re-checks sse.closed/aborted before re-blocking
}
for (const msg of res[0].messages) {
lastID = msg.id;
await sse.writeSSE({ id: msg.id, data: JSON.stringify(msg.message) });
}
}
} finally {
await reader.quit(); // free the dedicated connection once the client is gone
}
});
});Agent prompt — paste into an agent with repo access
Before you write it: a learner closes the browser tab during an idle connection. With a bounded 15s BLOCK, what is the longest the dedicated reader can stay alive before the loop notices and quits it — and what would BLOCK: 0 do instead?
Role: Senior TypeScript engineer in this repo.
Context: Hono app; node-redis client at redis (already connected); Stream key "warehouse:1"; streamSSE from "hono/streaming".
Task: Add GET /stream that fans out the Redis Stream to the client over SSE using a per-client async loop.
Requirements:
- Use streamSSE(c, async (sse) => { ... }); start id from the Last-Event-ID header, default "$".
- Run the blocking xRead on a DEDICATED connection, not the shared client: open redis.duplicate() (node-redis also documents createClientPool() for this), await connect(), and reader.quit() in finally.
- Loop while (!sse.closed && !sse.aborted) awaiting reader.xRead({ key: stream, id: lastID }, { BLOCK: 15000, COUNT: 500 }). Do NOT use BLOCK: 0 — node-redis can fail to resolve a BLOCK 0 xRead on a duplicated connection (#2258), and a parked BLOCK 0 never re-checks sse.closed, leaking the reader when the tab closes.
- On a timeout (xRead returns null) await sse.write(":keepalive\n\n") then continue, so the loop guard re-checks sse.closed/aborted and a closed tab is released within ~15s. For each message writeSSE({ id, data }) and advance lastID.
Tests / acceptance:
- curl -N localhost:8080/stream stays open; a price change appears as an SSE frame within ~100ms (parity with the Go server); an idle connection still receives a ":keepalive" comment about every 15s.
- Reconnecting with -H "Last-Event-ID: <id>" replays only events after that id.
- Closing the client frees the duplicated connection within ~15s (the finally block runs reader.quit()).
Output: a unified diff plus a one-paragraph note on why a bounded block beats BLOCK 0 for connection cleanup, and why a blocking xRead needs its own connection.What success looks like
Contract-identical wire output to the Go server: the same id: / data: frames, the same four string-valued fields. curl -N localhost:8080/stream delivers a data: frame within ~100ms of a change and the same :keepalive comment about every 15s on an idle connection — the data: fields are strings on both backends, since each forwards Redis’s string-valued msg.message verbatim. Key order is not part of the contract — Go’s json.Marshal over a map sorts keys alphabetically while node-redis preserves insertion order, so the same frame can serialise its fields in a different order; clients read by name and coerce with Number(...). The shared SSE contract is exactly why the dashboard never knows which backend it is talking to.
Client offset protocol: every event carries its Stream id
IntermediatePin down the one rule that makes replay possible — the unit of progress is the Stream id: the server stamps it on every SSE frame and the client remembers the last id it applied, so a reconnect resumes exactly where it left off.
New in this step
`EventSource` The browser’s built-in SSE client — it auto-reconnects and remembers the last id: for you, so the live path needs no bespoke bookkeeping.
`Last-Event-ID` The header EventSource resends on auto-reconnect, carrying the last id it saw, so the server knows where to resume.
opaque, monotonic id Treat the Stream id as a black-box marker that only ever increases — compare and store it, never compute the next one yourself.
`localStorage` Browser key-value storage; persist the last id here when you want replay to survive a full page reload, then send it explicitly.
The id is the whole protocol
Reconnect-and-replay works because of one shared rule — the unit of progress is the Stream id. The server
already writes it as the SSE id: field; the browser’s EventSource automatically remembers the last id:
it saw and, on an auto-reconnect, sends it back as the Last-Event-ID request header. So the client needs
no bespoke bookkeeping for the live path — the platform does it. For a catch-up that must survive a full
page reload, persist the last id yourself (e.g. localStorage) and send it explicitly. Either way the
server’s job is identical: start reading just after the given id. Treat the id as opaque and monotonic;
never try to compute the next one yourself.
Reconnect-and-replay: Last-Event-ID drives XREAD from that id
Go AdvancedMake reconnects replay the gap with no second code path — when Last-Event-ID is a real id the same XREAD loop returns the missed events first, so the client catches up before live resumes, and a trimmed id falls back to a fresh snapshot.
New in this step
`replaying N` frame A one-off event: replaying SSE frame whose data is the count of missed events, guarded on > 0 so a clean reconnect never flashes Replaying 0.
`resync` frame A one-off event: resync frame telling the client to re-GET /products when its gap is no longer in the Stream.
`XINFO STREAM` / `first-entry` XINFO STREAM warehouse:1 reports stream metadata; its first-entry id is the oldest entry still retained.
trimmed id A Last-Event-ID older than first-entry — the gap was trimmed away, so it is unrecoverable from the Stream and needs a snapshot.
component-wise id comparison Split each <ms>-<seq> on - and compare ms then seq as integers — a string compare wrongly orders 5-2 after 5-10.
Replay is the same loop, started from a different id
You already pass Last-Event-ID into XREAD, and the elegant part is that no second code path is needed.
XREAD BLOCK 15000 STREAMS warehouse:1 1718900000000-0 returns every entry after that id immediately
(Redis ranges are id-ordered — the bounded block only matters once you have caught up to the live tail), so a
client that was offline for ten seconds receives its seven missed events back-to-back, then the loop blocks
~15s at a time for the next live one. Two refinements make it production-honest:
- Count the gap. The first
XREADbatch after a reconnect is the missed set; emit a one-offevent: replayingframe with its length so the dashboard can show Replaying N events. Guard it on a real (non-null) batch so a clean reconnect that lands on an idle timeout never flashes a count. - Handle a trimmed id. If the client’s
Last-Event-IDis older than the Stream’s first retained entry (XINFO STREAM warehouse:1exposesfirst-entry), the gap is unrecoverable from the Stream — send a one-offevent: resynctelling the client to re-GET /productsfor a fresh snapshot, then continue live. Replay restores deltas; the snapshot restores a baseline when the deltas are gone.
The block stays bounded here too (Block: 15 * time.Second, never Block: 0): once the backlog drains,
this is the live tail again, so the same redis.Nil-on-timeout → keepalive → context-recheck cleanup applies.
Replay the gap, then go live
// Stream ids are <ms>-<seq> integer pairs — compare component-wise, NOT lexically
// ("5-2" > "5-10" is true as strings but false as ids). idOlder reports a < b.
func idOlder(a, b string) bool {
ams, aseq := splitID(a)
bms, bseq := splitID(b)
if ams != bms {
return ams < bms
}
return aseq < bseq
}
func splitID(id string) (ms, seq int64) {
if i := strings.IndexByte(id, '-'); i >= 0 {
ms, _ = strconv.ParseInt(id[:i], 10, 64)
seq, _ = strconv.ParseInt(id[i+1:], 10, 64)
}
return
}
// Extend the fan-out loop: check for a trimmed id, then the first real batch is the missed set.
if lastID != "$" {
info, err := rdb.XInfoStream(r.Context(), stream).Result()
// resync when the oldest retained entry is NEWER than the client's last id (the gap was trimmed).
if err == nil && idOlder(lastID, info.FirstEntry.ID) {
fmt.Fprint(w, "event: resync\ndata: 1\n\n")
flusher.Flush()
lastID = "$" // skip the unrecoverable gap and jump to the live tail
}
}
first := true
for {
res, err := rdb.XRead(r.Context(), &redis.XReadArgs{
Streams: []string{stream, lastID}, Block: 15 * time.Second, Count: 500,
}).Result() // .Result() unwraps the *XStreamSliceCmd into ([]XStream, error)
if err == redis.Nil { // idle timeout, no entries — keepalive and re-check the tab
fmt.Fprint(w, ":keepalive\n\n")
flusher.Flush()
if r.Context().Err() != nil { return }
continue
}
if err != nil { return }
msgs := res[0].Messages
if first && lastID != "$" && len(msgs) > 0 {
fmt.Fprintf(w, "event: replaying\ndata: %d\n\n", len(msgs)) // -> pill: "Replaying N" (guarded >0)
first = false
}
for _, msg := range msgs {
lastID = msg.ID
payload, _ := json.Marshal(msg.Values)
fmt.Fprintf(w, "id: %s\ndata: %s\n\n", msg.ID, payload)
}
flusher.Flush()
}Agent prompt — paste into an agent with repo access
A client reconnects with a Last-Event-ID that has already been trimmed from the Stream. What single frame does the server send before any live events resume, and what does the client do in response?
Role: Senior Go engineer in this repo.
Context: GET /stream already tails the Stream with a bounded XREAD (Block: 15 * time.Second) from a start id, keepaliving on each redis.Nil timeout; XINFO STREAM exposes the first retained entry id.
Task: Extend /stream so a reconnect with Last-Event-ID replays exactly the missed events, with a status signal and a trimmed-id fallback.
Requirements:
- When lastID is a real id, the first non-empty XREAD batch is the missed events; emit one "event: replaying" frame with that count, guarded on len(msgs) > 0 so a clean reconnect never flashes "Replaying 0".
- Keep the block bounded (Block: 15 * time.Second, never Block: 0); the redis.Nil timeout path still writes ":keepalive\n\n" and returns when r.Context() is done.
- Advance lastID per message so live events resume seamlessly after the backlog drains.
- If lastID is older than XINFO STREAM's first-entry id, emit "event: resync\ndata: 1\n\n" (the client re-fetches GET /products) instead of a partial replay, and reset lastID to "$" to resume the live tail. Compare the two ids COMPONENT-WISE, never lexically: split each "<ms>-<seq>" on "-", compare ms as an int64, then seq as an int64 — a string compare wrongly orders "5-2" after "5-10" and falsely resyncs once ≥10 events share one millisecond.
Tests / acceptance:
- Connect, note the last id, disconnect, make 5 changes, reconnect with Last-Event-ID: the client receives "replaying 5", then exactly those 5 events, ending identical to a fresh snapshot.
- A Last-Event-ID older than the trim window yields a resync event with data: 1, not missing data.
Output: a unified diff plus a one-paragraph note on how the snapshot + Stream together guarantee no gaps.What success looks like
Reconnect with a real Last-Event-ID and the first batch is the gap: one event: replaying frame whose data is the missed count (guarded > 0, so a clean reconnect never flashes Replaying 0), then exactly those missed ids in order, then live frames resume. Reconnect with an id older than the Stream’s first retained entry and you instead get one event: resync frame, telling the client to re-GET /products — no partial replay:
# reconnect after missing 5 events:
event: replaying
data: 5
id: 1718900001000-0
data: {"product_id":"1",...}
...
# reconnect with a trimmed id:
event: resync
data: 1
Reconnect-and-replay with Last-Event-ID (parity)
TypeScript AdvancedMirror the replay logic in TypeScript so both servers behave identically — the first xRead batch after a reconnect is the missed set: emit replaying N, resume live, and fall back to resync on a trimmed id.
New in this step
`replaying N` frame A one-off { event: replaying, data: String(count) } SSE frame, guarded on length > 0 so a clean reconnect never flashes Replaying 0.
`resync` frame A one-off { event: resync, data: "1" } frame telling the client to re-GET /products when its gap is no longer retained.
`xInfoStream` + `first-entry` key node-redis v5 returns the oldest retained id under the hyphenated key info["first-entry"].id, not info.firstEntry (which is undefined and would dead-end the resync).
trimmed id A Last-Event-ID older than the oldest retained entry — the gap is gone from the Stream, so the client needs a fresh snapshot.
component-wise id comparison Split each <ms>-<seq> on - and compare ms then seq as numbers — a string compare wrongly orders 5-2 after 5-10.
Same protocol, async-iterator shape
The TypeScript path is the Go path in different syntax. The dedicated reader’s first non-empty xRead from a
real Last-Event-ID returns the missed batch; you writeSSE({ event: "replaying", data: String(batch.length) }), stream the frames, advance lastID, and the loop then blocks ~15s at a time
for live events. The trimmed-id case uses node-redis xInfoStream(stream) to read the oldest retained id —
which node-redis v5 returns under the hyphenated key info["first-entry"].id, not info.firstEntry
(that property is undefined, which would silently disable the whole resync branch). If the client’s id is
older, write a resync event and let it re-GET /products. Compare the two ids component-wise on
<ms>-<seq> (split on -, the ms as a number, then the seq as a number) — a string compare wrongly orders
5-2 after 5-10 and falsely resyncs once ten events share one millisecond. Keeping the two servers
behaviourally identical is the point — a learner can swap backends and the dashboard cannot tell, including
the same bounded block (BLOCK: 15000, never BLOCK: 0), the null-on-timeout keepalive, and the
sse.closed/sse.aborted re-check that frees the duplicated connection when the tab closes.
Replay then live (node-redis)
// Stream ids are <ms>-<seq> integer pairs — compare component-wise, NOT lexically
// ("5-2" > "5-10" is true as strings but false as ids). idOlder(a, b) reports a < b.
function idOlder(a: string, b: string): boolean {
const [ams, aseq] = a.split("-").map(Number);
const [bms, bseq] = b.split("-").map(Number);
return ams !== bms ? ams < bms : aseq < bseq;
}
if (lastID !== "$") {
const info = await reader.xInfoStream(stream).catch(() => null);
// node-redis v5 returns the oldest entry under the HYPHENATED key "first-entry",
// not info.firstEntry (that is undefined, which would silently kill the resync branch).
const firstEntry = info?.["first-entry"];
if (firstEntry && idOlder(lastID, firstEntry.id)) {
await sse.writeSSE({ event: "resync", data: "1" });
lastID = "$"; // skip the unrecoverable gap and jump to the live tail
}
}
let first = true;
while (!sse.closed && !sse.aborted) {
const res = await reader.xRead(
{ key: stream, id: lastID },
{ BLOCK: 15000, COUNT: 500 }, // bounded, never BLOCK: 0 — see the fan-out step
);
if (!res) {
await sse.write(":keepalive\n\n"); // idle timeout: keepalive, then the guard re-checks closed/aborted
continue;
}
const msgs = res[0].messages;
if (first && lastID !== "$" && msgs.length > 0) {
await sse.writeSSE({ event: "replaying", data: String(msgs.length) }); // guarded >0
first = false;
}
for (const msg of msgs) {
lastID = msg.id;
await sse.writeSSE({ id: msg.id, data: JSON.stringify(msg.message) });
}
}Agent prompt — paste into an agent with repo access
A client reconnects with a Last-Event-ID for 5 missed events. In what order do the frames arrive — replaying, the 5 data frames, then live — and which value would a lexical (string) id-compare get wrong once ten of those events share one millisecond?
Role: Senior TypeScript engineer in this repo.
Context: GET /stream tails the Stream on a dedicated node-redis connection with a bounded xRead ({ BLOCK: 15000 }), keepaliving on each null timeout and re-checking sse.closed/sse.aborted; node-redis v5 xInfoStream returns the oldest retained id under the HYPHENATED key info["first-entry"].id (NOT info.firstEntry, which is undefined).
Task: Extend /stream so a reconnect with Last-Event-ID replays exactly the missed events, matching the Go server.
Requirements:
- The first non-empty xRead batch from a real lastID is the missed events; writeSSE an { event: "replaying", data: String(count) } before the data frames, guarded on msgs.length > 0 so a clean reconnect never flashes "Replaying 0".
- Keep the block bounded ({ BLOCK: 15000 }, never BLOCK: 0); on the null timeout await sse.write(":keepalive\n\n") and continue so the loop guard re-checks sse.closed/sse.aborted.
- Advance lastID per message; resume the live bounded-BLOCK loop afterwards.
- If lastID is older than the oldest retained id, writeSSE an { event: "resync", data: "1" } so the client re-fetches GET /products, and reset lastID to "$" to resume the live tail. Read that id as info["first-entry"].id (node-redis v5's hyphenated key — info.firstEntry is undefined and would dead-end the resync). Compare COMPONENT-WISE, never lexically: split each "<ms>-<seq>" on "-", compare ms as a number, then seq as a number — a string compare wrongly orders "5-2" after "5-10" and falsely resyncs once ≥10 events share one millisecond.
Tests / acceptance:
- Disconnect, make 5 changes, reconnect with Last-Event-ID: receive "replaying 5", then those 5 events; final state equals a fresh snapshot (parity with Go).
- An id older than the trim window yields a resync event with data: 1.
Output: a unified diff plus a one-paragraph note on keeping Go and TS replay behaviour identical.What success looks like
Identical replay frames to the Go server. A reconnect after K missed events sends event: replaying with data: K, then those K ids in order; a trimmed id sends event: resync with data: 1. Because the trimmed check compares the ids component-wise on <ms>-<seq> (and reads the oldest entry via the hyphenated first-entry key), it does not falsely resync once ten events share one millisecond — the foot-gun a lexical compare would hit.
Make replay idempotent: apply by (id, version)
IntermediateFunnel every event — live, replayed, or duplicated — through one reducer keyed by product id and version, accepting only strictly-newer versions, so at-least-once delivery is safe without the cost of exactly-once.
New in this step
idempotency Applying the same event twice has the same effect as once — the property that makes a re-delivered or duplicated frame harmless.
at-least-once vs exactly-once Replay may re-deliver events (at-least-once); guaranteeing exactly-once is expensive, so you make apply idempotent instead.
reducer keyed by `(id, version)` One function holding the current version per product that accepts an event only if its version is strictly newer — collapsing duplicates and out-of-order arrivals.
Idempotency is what makes at-least-once safe
Replay and live delivery can overlap — a reconnect might re-deliver an event you already applied, or two
servers might both forward the same change. That’s fine if applying an event is idempotent. The rule: hold
the current version per product, and when an event arrives apply it only if it is newer than the version
you already show; otherwise drop it. Because version is monotonic per row, this collapses duplicates,
tolerates out-of-order arrivals during the replay-to-live handoff, and means you never have to guarantee
exactly-once delivery (which is expensive and, here, unnecessary). The same predicate works for snapshots: a
snapshot is just a set of (id, version) baselines you apply the same way.
Idempotent apply (the client's one reducer)
// state: Map<productId, {price, stock, version}>
function applyEvent(state, e) {
const cur = state.get(e.product_id);
if (cur && e.version <= cur.version) return false; // stale or duplicate — ignore
state.set(e.product_id, { price: e.price, stock: e.stock, version: e.version });
return true; // changed — re-render this row
}What success looks like
The reducer is a clean dedupe. Feed it the same product at version 1 then 2: the first returns true (row updates), the second returns true (newer wins). Re-feed version 2 (a duplicate id from an overlapping replay) and it returns false — state unchanged, no re-render. Feed version 1 after 2 (out-of-order) and it also returns false. That is what makes at-least-once replay safe without exactly-once delivery.
Build the live dashboard (single index.html)
Web dashboard BeginnerMake the payoff something you can see — one zero-build public/index.html that fetches the snapshot, opens an EventSource to /stream, and renders each product with a slider, a Sell button, and a Sold-Out badge that ticks live across windows.
New in this step
`fetch` The browser’s built-in HTTP call — used for the initial GET /products snapshot and for the PUT/POST writes.
`EventSource.onmessage` Fires for each default data: frame; you JSON.parse(ev.data) and feed it to the same idempotent reducer.
zero-build static file One self-contained index.html the backend serves at / — no framework, no bundler, nothing to install.
coercing with `Number()` Stream data: fields arrive as strings, so wrap price/stock/version in Number(...) before comparing or rendering.
Zero-install on purpose
The payoff has to be something you see, so the dashboard is one static file the backend already serves at
/. It uses two browser built-ins: fetch for the snapshot and writes, and EventSource for the live feed
(it handles reconnect and Last-Event-ID for you). The render is a tiny reducer over the applyEvent rule
from the last step. Open the file in two browser windows: drag the price slider in window A and window
B’s number ticks within ~100ms; click Sell 1 and stock drops in both; when stock hits 0 the Sold-Out
badge flips and a second window trying to sell the last unit gets a live out of stock toast — that’s the
atomic guard, visible.
public/index.html
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>Ticker — live control room</title>
<style>
body { font: 15px system-ui, sans-serif; margin: 24px; background: #0b0f17; color: #e8edf5; }
.row { display: flex; align-items: center; gap: 16px; padding: 12px; border: 1px solid #243049; border-radius: 10px; margin: 8px 0; }
.name { font-weight: 700; min-width: 130px; }
.price { font-variant-numeric: tabular-nums; min-width: 84px; transition: color .1s; }
.flash { color: #f5b14c; }
.sold-out { color: #ff6b6b; font-weight: 700; }
button { font: inherit; cursor: pointer; border-radius: 8px; border: 1px solid #355; background: #16203a; color: inherit; padding: 6px 12px; }
#toast { position: fixed; bottom: 20px; right: 20px; background: #3a1620; border: 1px solid #a44; padding: 10px 14px; border-radius: 8px; opacity: 0; transition: opacity .2s; }
#toast.show { opacity: 1; }
</style>
</head>
<body>
<h1>Ticker <span id="pill">connecting…</span></h1>
<div id="list"></div>
<div id="toast"></div>
<script>
const API = location.origin; // the backend serves this file, so same origin
const state = new Map(); // id -> { id, name, price, stock, version }
// Idempotent: only a strictly-newer version wins. Stream fields arrive as strings, so coerce.
function applyEvent(raw) {
const id = Number(raw.product_id ?? raw.id);
const version = Number(raw.version);
const cur = state.get(id);
if (cur && version <= cur.version) return;
state.set(id, { id, name: raw.name ?? cur?.name ?? ("Product " + id), price: Number(raw.price), stock: Number(raw.stock), version });
renderRow(id);
}
const fmt = (cents) => "$" + (cents / 100).toFixed(2);
function renderRow(id) {
const p = state.get(id);
let el = document.getElementById("p" + id);
if (!el) {
el = document.createElement("div");
el.className = "row";
el.id = "p" + id;
el.innerHTML =
'<span class="name"></span>' +
'<input type="range" min="0" max="10000" step="1" />' +
'<span class="price"></span><span class="stock"></span>' +
'<button>Sell 1</button>';
el.querySelector("input").addEventListener("change", (ev) => setPrice(id, Number(ev.target.value)));
el.querySelector("button").addEventListener("click", () => sell(id));
document.getElementById("list").appendChild(el);
}
el.querySelector(".name").textContent = p.name;
el.querySelector("input").value = p.price;
const price = el.querySelector(".price");
price.textContent = fmt(p.price);
price.classList.remove("flash"); void price.offsetWidth; price.classList.add("flash");
el.querySelector(".stock").innerHTML =
p.stock > 0 ? (p.stock + " in stock") : '<span class="sold-out">SOLD OUT</span>';
}
async function setPrice(id, price) {
const p = state.get(id);
const r = await fetch(API + "/products/" + id + "/price", {
method: "PUT", headers: { "Content-Type": "application/json" },
body: JSON.stringify({ price, version: p.version }),
});
applyEvent(await r.json()); // 200 confirms, 409 rebases to the current row
}
async function sell(id) {
const r = await fetch(API + "/products/" + id + "/sell", {
method: "POST", headers: { "Content-Type": "application/json" },
body: JSON.stringify({ qty: 1 }),
});
const body = await r.json();
if (r.status === 409) { toast("Out of stock"); return; }
applyEvent(body);
}
function toast(msg) {
const t = document.getElementById("toast");
t.textContent = msg; t.classList.add("show");
setTimeout(() => t.classList.remove("show"), 2000);
}
async function start() {
const snapshot = await (await fetch(API + "/products")).json();
for (const p of snapshot) applyEvent(p);
const es = new EventSource(API + "/stream");
es.onmessage = (ev) => applyEvent(JSON.parse(ev.data));
window._es = es; // the next step adds the status-pill listeners
}
start();
</script>
</body>
</html>Agent prompt — paste into an agent with repo access
Role: Front-end engineer in this repo (vanilla JS, no framework, no build step).
Context: The backend serves this file at / and exposes GET /products (snapshot), PUT /products/:id/price {price,version}, POST /products/:id/sell {qty}, and GET /stream (SSE). Stream event fields arrive as strings.
Task: Write public/index.html — a single self-contained file that renders each product with a price slider, a Sell button, a Sold-Out badge, and live updates from /stream.
Requirements:
- One idempotent applyEvent(raw) keyed by product id that ignores any event whose version is not newer; coerce string fields to numbers.
- fetch the snapshot first, then open new EventSource("/stream") and apply each message; format cents as currency for display only.
- A slider change PUTs {price, version}; Sell POSTs {qty:1}; a 409 on sell shows an "Out of stock" toast; a 409 on price rebases to the returned current value.
Tests / acceptance:
- Open in two windows: a slider drag in one updates the other within ~100ms; selling the last unit flips both to SOLD OUT and toasts the loser.
Output: the complete public/index.html, no commentary.What success looks like
Open http://localhost:8080/ in two windows. Dragging the price slider in window A makes window B’s number tick within ~100ms — the change went Postgres → XADD → SSE → both browsers. Clicking Sell 1 drops stock in both; selling the last unit flips both to SOLD OUT, and the second window’s attempt to sell it gets a live Out of stock toast (the 409 from the atomic guard). No polling, no refresh — the board is genuinely live across windows.
The Live / Reconnecting / Replaying status pill + the offline demo
Web dashboard IntermediateSurface the durable-realtime machinery with a status pill — green Live, amber Reconnecting…, Replaying N while a backlog drains — then break the network on purpose so you can watch a window catch up.
New in this step
`onopen` / `onerror` EventSource fires onopen when connected and onerror when the connection drops (then it auto-reconnects) — your Live and Reconnecting cues.
`addEventListener` for custom events Named server frames (event: replaying, event: resync) arrive via addEventListener("replaying", …), not onmessage.
DevTools offline throttle The browser’s network panel can force a tab Offline, so you can simulate a dropped connection and watch the reconnect-replay.
Make the machinery visible, then break the network on purpose
The whole point of durable realtime is invisible when everything is online — so surface it. EventSource
fires onopen when connected and onerror when the connection drops (it then auto-reconnects, re-sending
Last-Event-ID); listen to those for Live and Reconnecting…. Your server emits a custom replaying
event with a count at the start of a reconnect, so addEventListener("replaying", …) shows Replaying N
events while the gap drains, and the next live message returns you to Live. A resync event tells the
dashboard to re-GET /products.
Now the money demo: open two windows, in DevTools set window B’s network to Offline, change prices and sell in window A for ten seconds, then bring B back Online — the pill reads Reconnecting…, then Replaying 7 events, the backlog fast-forwards, and B ends byte-identical to A. No gaps, no full refetch.
The status-pill additions to index.html
// Replace the EventSource setup inside start() with connect(); keep applyEvent/renderRow as-is.
function setPill(text, color) {
const pill = document.getElementById("pill");
pill.textContent = text;
pill.style.color = color;
}
function connect() {
const es = new EventSource(API + "/stream");
es.onopen = () => setPill("● Live", "#3FD27A");
es.onerror = () => setPill("● Reconnecting…", "#f5b14c"); // EventSource auto-retries for us
es.onmessage = (ev) => applyEvent(JSON.parse(ev.data));
es.addEventListener("replaying", (ev) => setPill("● Replaying " + ev.data + " events", "#f5b14c"));
es.addEventListener("resync", async () => {
const snap = await (await fetch(API + "/products")).json();
for (const p of snap) applyEvent(p);
});
return es;
}Agent prompt — paste into an agent with repo access
Role: Front-end engineer in this repo (vanilla JS).
Context: public/index.html already renders products and applies SSE events. The server sends data frames, a custom "replaying" event (data = count) on reconnect, and a "resync" event when the client's id was trimmed.
Task: Add a status pill reflecting Live / Reconnecting… / Replaying N, and handle resync.
Requirements:
- EventSource onopen -> "Live"; onerror -> "Reconnecting…" (do NOT manually reconnect — the browser does, re-sending Last-Event-ID).
- addEventListener("replaying", e => show "Replaying {e.data} events"); addEventListener("resync", refetch GET /products).
- Keep applyEvent idempotent so replayed events that overlap live ones are ignored.
Tests / acceptance:
- DevTools offline demo: take the tab offline, make 7 changes elsewhere, go online -> the pill shows Reconnecting…, then Replaying 7 events, and the tab ends identical to a freshly loaded one.
Output: the updated index.html (or a unified diff), no commentary.What success looks like
The machinery becomes visible. On connect the pill goes green ● Live; pull window B offline in DevTools and it flips amber ● Reconnecting…. Make 7 changes in window A, then bring B back online: B’s EventSource auto-reconnects with its Last-Event-ID, the server replays the gap, the pill briefly reads ● Replaying 7 events, the rows fast-forward, and the next live frame returns it to ● Live — B is now byte-identical to A, with no full refetch.
Multi-instance fan-out: two servers, one Stream
AdvancedProve the design scales by running a second instance on another port — because both tail the same Redis Stream, a write through A surfaces on a dashboard connected to B, so fan-out grows horizontally without one line of fan-out code changing.
New in this step
horizontal scaling Adding more identical instances to handle load, rather than making one bigger — only possible if instances need no shared in-memory state.
in-process fan-out Pushing changes through a Go channel or a Node EventEmitter reaches only clients on that process — it breaks the moment you run two.
sticky sessions Pinning a client to one instance; not needed here because every instance sees every event, so any instance can serve any dashboard.
the Stream as fan-out point Every instance runs the same XREAD against the same Stream, so Redis — not any process — is the coordination point.
The Stream is the fan-out point, not the process
In-process fan-out (a Go channel, a Node EventEmitter) only reaches clients connected to that process,
so it breaks the moment you run two instances behind a load balancer. Redis Streams sidestep this entirely:
every instance runs the same XREAD BLOCK against the same Stream, so all of them see every event and
forward it to their own SSE subscribers. There is no leader, no sticky sessions, no inter-node gossip — the
realtime store is the coordination point. Write through instance A (:8080), connect a dashboard to instance
B (:8081), and the tick still arrives. That is the property that lets this design scale from one box to
many without changing a line of fan-out code.
Two instances, one Stream
# Two instances of the SAME server, same DATABASE_URL + REDIS_URL, different ports.
PORT=8080 ./ticker & # instance A (Go: PORT=8080 go run ./cmd/server)
PORT=8081 ./ticker & # instance B
curl -N localhost:8081/stream & # subscribe on B
curl -X POST localhost:8080/products/1/sell -d '{"qty":1}' # write on A
# -> the sale event prints on B's stream within ~100ms, proving cross-instance deliveryWhat success looks like
The write hits instance A (:8080); the SSE frame surfaces on the subscriber connected to instance B (:8081) within ~100ms — because both tail the same warehouse:1 Stream with plain XREAD, every instance sees every event. No sticky sessions, no leader, no inter-node messaging: the Stream is the fan-out point, so adding instances does not change a line of fan-out code:
# on B's curl -N localhost:8081/stream :
id: 1718900003000-0
data: {"product_id":"1","price":"1499","stock":"49","version":"5"}
Bound the Stream: XADD MAXLEN ~ plus a snapshot
AdvancedStop the Stream growing forever by capping it with MAXLEN ~ N, and lean on the products snapshot as the floor under that cap — a client whose id was trimmed away recovers from GET /products, so bounded memory never means a lost client.
New in this step
approximate trimming (`MAXLEN ~`) The ~ lets Redis trim in efficient batches, so the Stream sits a little over the cap — far cheaper than exact trimming.
snapshot as baseline floor The products table holds current state for every product, so a gap the Stream no longer retains is still recoverable from it.
snapshot-first recovery Always load GET /products first, then tail the Stream; a reconnect replays the retained gap or falls back to a fresh snapshot via resync.
Replay needs a floor and a snapshot
A Stream you never trim grows without bound and eventually costs more RAM than the data is worth. Cap it at
append time with approximate trimming — XADD warehouse:1 MAXLEN ~ 10000 * … — where the ~ lets Redis
trim in efficient batches (a little over the cap is fine, and much cheaper than exact trimming). Trimming
raises a question: what about a client whose Last-Event-ID is older than the oldest retained entry? That
gap is unrecoverable from the Stream, which is exactly why Postgres holds the snapshot — current price,
stock, and version for every product. The protocol is snapshot-first: a client always loads GET /products
(the baseline), then tails the Stream from live; a reconnect replays the gap if it is still retained, or
falls back to a fresh snapshot via the resync event if it is not. Snapshot = baseline truth; Stream =
recent deltas; together they bound memory and guarantee no gaps.
Bounded append + snapshot-first recovery
# Append with an approximate cap so the Stream self-trims:
XADD warehouse:1 MAXLEN ~ 10000 * product_id 1 price 1399 stock 50 version 7
# Recovery order for any client:
# 1) GET /products -> snapshot baseline (current version per product)
# 2) open /stream -> live tail; a reconnect replays the gap from Last-Event-ID
# 3) if the id was trimmed -> server sends `resync` -> client repeats step 1Agent prompt — paste into an agent with repo access
Role: Senior backend engineer in this repo (use the selected backend).
Context: Every write XADDs a change event; the products table holds current state; clients recover via GET /products then /stream.
Task: Bound the Stream with approximate trimming and confirm the snapshot path recovers a trimmed client.
Requirements:
- Add MAXLEN ~ 10000 to every XADD (go-redis: XAddArgs{MaxLen: 10000, Approx: true}; node-redis: { TRIM: { strategy: "MAXLEN", strategyModifier: "~", threshold: 10000 } }).
- Keep GET /products as the snapshot; ensure a reconnect whose Last-Event-ID is below the first retained id triggers a resync.
Tests / acceptance:
- Drive well over 10000 events; XLEN stays approximately at the cap, not unbounded.
- A client with a trimmed Last-Event-ID receives resync and recovers a correct, complete view via GET /products.
Output: a unified diff plus a one-paragraph note on why approximate (~) trimming is cheaper than exact.What success looks like
Drive well over 10000 events and XLEN warehouse:1 settles approximately at the cap (a little over is the ~ trade-off — efficient batch trimming, not exact), instead of growing without bound. A client whose Last-Event-ID now falls below XINFO STREAM warehouse:1’s first-entry no longer replays a partial gap: it receives the resync frame and recovers a complete, correct view from GET /products. Snapshot is the floor under a trimmed Stream:
XLEN warehouse:1 -> (integer) 10000 # ~, may read a little above the capConsumer groups: the other half of Streams
AdvancedMeet the commands this design deliberately does not use — XGROUP CREATE, XREADGROUP, XACK, XPENDING, XAUTOCLAIM — and learn the judgement: a dashboard mirror wants plain XREAD (every reader sees every event), a worker pool wants a consumer group (each event processed once).
New in this step
consumer group A named group over a Stream that distributes entries so each is processed once across a pool — the opposite of a mirror.
`XREADGROUP` (`>`) Reads on behalf of a group; the > id means messages never yet delivered to this group, so different consumers get different entries.
`XACK` Confirms a consumer finished an entry, removing it from the group’s pending list — delivery is tracked, not fire-and-forget.
Pending Entries List / `XPENDING` The per-group set of delivered-but-not-yet-acked entries; XPENDING inspects what is still outstanding.
`XAUTOCLAIM` Reassigns entries a crashed consumer grabbed but never acked, after a min-idle time — the reclaim half of group delivery.
fan-out read vs group read Plain XREAD gives every reader the whole feed (a mirror); a group splits the feed across workers (each job once) — Ticker needs the former.
Fan-out read vs group read — and why this build picks fan-out
Streams answer two different questions, and Ticker has been using only the first. The fan-out read you built
— XREAD against warehouse:1 — gives every reader every entry: run N server instances and each one
tails the same Stream independently, so each connected dashboard sees the full ordered feed. That is exactly
what a live mirror wants, and it is why the multi-instance step scaled for free.
A consumer group answers the opposite question: distribute the work so each entry is processed once
across a pool. XGROUP CREATE warehouse:1 workers $ creates the group; each worker calls
XREADGROUP GROUP workers <consumer> COUNT N STREAMS warehouse:1 > and Redis hands different entries to
different consumers (the > means “messages never delivered to this group”). Delivery is tracked, not
fire-and-forget: an entry stays in the group’s Pending Entries List until the worker confirms it with
XACK warehouse:1 workers <id>. XPENDING warehouse:1 workers inspects what is still un-acked, and
XAUTOCLAIM warehouse:1 workers <consumer> <min-idle> 0 reassigns entries a crashed worker grabbed but
never acked — that ack-and-reclaim machinery is the whole point of a group, and it is pure overhead for a
mirror.
So the choice is load-bearing, not accidental. If you put the SSE servers in a consumer group, each change
event would be delivered to one server, and only the dashboards connected to that server would tick —
the other windows would silently miss it. Plain XREAD is correct here precisely because it does not
distribute: every instance must see every event for the fan-out to be complete. Reach for a group when you
want competing workers (a re-embed worker, an email sender, an audit sink) to share a backlog so each job
runs once; reach for plain XREAD when every reader needs the whole feed. (A consumer-group-worker that
processes each change exactly once is a natural future add-on — out of scope here.)
What a group would look like (contrast — not wired into this build)
# A consumer GROUP distributes entries (each processed once) — the opposite of the dashboard mirror.
redis-cli -u "$REDIS_URL" XGROUP CREATE warehouse:1 workers '$' # create the group at the live tail
redis-cli -u "$REDIS_URL" XREADGROUP GROUP workers w1 COUNT 10 STREAMS warehouse:1 '>' # w1 gets NEW entries
redis-cli -u "$REDIS_URL" XACK warehouse:1 workers 1718900000000-0 # confirm one entry is done
redis-cli -u "$REDIS_URL" XPENDING warehouse:1 workers # what is delivered-but-not-acked
redis-cli -u "$REDIS_URL" XAUTOCLAIM warehouse:1 workers w2 60000 0 # w2 reclaims entries idle > 60s
# Ticker's fan-out uses the plain read instead — every instance sees EVERY entry, no group, no ack:
redis-cli -u "$REDIS_URL" XREAD BLOCK 15000 STREAMS warehouse:1 '$'Integration test: live delivery + exact replay
Go IntermediateProve both guarantees in code with a Go integration test against the Compose stack — a change reaches a second subscriber within ~100ms, and a reconnect with Last-Event-ID receives exactly the missed events — because ordering, blocking reads, and replay exist only in a real server.
New in this step
integration test vs unit test A unit test isolates one function; an integration test runs against real Postgres and Redis to prove behaviour that only emerges across the whole system.
why a fake Redis can't prove this Ordering, blocking XREAD, and replay-from-id are real-server behaviours a mock cannot reproduce — so this test must point at the Compose stack.
`t.Skip` Skips a test at runtime (here when DATABASE_URL/REDIS_URL are unset) so the suite still passes without infra instead of failing.
resetting state between runs Truncating products and clearing the Stream before each run keeps tests independent and repeatable.
Test the propagation, not just the write
Unit tests with a fake Redis can’t prove ordering, blocking reads, or replay — those exist only in a real
server. Point the test at the Compose stack: start the fan-out handler, connect subscriber S1, make a
change, and assert S1 receives it within ~100ms with the right version. Then for replay: connect S2, record
its last id, disconnect it, make K changes, reconnect S2 with Last-Event-ID, and assert it receives
exactly those K events (the replaying count equals K) and ends with state identical to a fresh
GET /products. Skip cleanly if the env vars aren’t set, so the suite still runs without infra.
Agent prompt — paste into an agent with repo access
Role: Senior Go engineer in this repo.
Context: The fan-out server, write path, and schema exist; Postgres via DATABASE_URL and Redis via REDIS_URL (the Compose stack).
Task: Add integration tests for live delivery and reconnect-replay.
Requirements:
- Live: subscribe via an SSE client, make a price change, assert the event arrives within 100ms carrying the new version.
- Replay: record the last id, disconnect, make K=5 changes, reconnect with Last-Event-ID, assert exactly 5 events (a "replaying 5" frame) and final state equal to GET /products.
- t.Skip when DATABASE_URL or REDIS_URL is unset; reset the products table and the Stream between runs.
Tests / acceptance:
- `go test ./... -run TestRealtime` passes reliably across 10 runs against the Compose stack.
Output: a unified diff plus how the test isolates Redis and Postgres between runs.What success looks like
go test ./... -run TestRealtime passes against the Compose stack and proves both guarantees in code: the live subscriber receives a change within ~100ms carrying the new version, and a subscriber that reconnects with Last-Event-ID after 5 changes gets a replaying 5 frame, exactly those 5 events, and a final state equal to GET /products. With env vars unset the test t.Skips instead of failing:
ok github.com/you/ticker 0.42s
--- (with no DATABASE_URL/REDIS_URL: SKIP: realtime infra not configured)Integration test: same assertions in the TS stack
TypeScript IntermediateMake parity a test, not a promise — assert the same two guarantees in TypeScript against the same Compose stack, so the project can honestly claim the realtime store, not the language, carries the weight.
New in this step
integration test A test against real Postgres and Redis (not mocks) — the only way to prove ordering, blocking reads, and replay actually hold.
`bun test` Bun’s built-in test runner (vitest works too); runs the TS assertions against the same Compose stack the Go suite uses.
parity testing Asserting both backends produce identical behaviour, so a learner can swap languages and the dashboard cannot tell.
skip when infra unset Skip the test when DATABASE_URL/REDIS_URL are absent, so the suite passes without infra rather than failing.
Parity is a test, not a promise
The TS suite asserts the identical behaviour with bun test (or vitest): an SSE client receives a change
within ~100ms, and a reconnect with Last-Event-ID replays exactly the missed events, ending byte-identical
to GET /products. Running both suites against the same Compose stack is what lets the project claim,
honestly, that the realtime store — not the language — carries the weight.
Agent prompt — paste into an agent with repo access
Role: Senior TypeScript engineer in this repo.
Context: The Hono fan-out server, write path, and schema exist; Postgres via DATABASE_URL and Redis via REDIS_URL.
Task: Add integration tests (bun test or vitest) for live delivery and reconnect-replay, matching the Go suite one-for-one.
Requirements:
- Live: an SSE/EventSource client receives a price change within 100ms with the new version.
- Replay: record the last id, disconnect, make 5 changes, reconnect with Last-Event-ID, assert exactly 5 events (a "replaying 5" frame) and final state equal to GET /products.
- Skip when DATABASE_URL or REDIS_URL is unset; reset products and the Stream between runs.
Tests / acceptance:
- `bun test` passes reliably; the assertions match the Go suite.
Output: a unified diff plus a note on any timing or flake mitigation you added.What success looks like
bun test passes the same two assertions one-for-one against the same Compose stack: live delivery within ~100ms with the new version, and a reconnect that replays exactly 5 missed events (a replaying 5 frame) ending equal to GET /products. Both suites green against one Redis + Postgres is what lets the project claim, honestly, that the realtime store — not the language — carries the weight.
Optional: deploy online (Cloud Run + managed or self-hosted Redis)
IntermediateGet a public URL if you want one by deploying the stateless server to Cloud Run against a real Redis and Postgres — entirely optional, since docker compose up is already the whole demo, and only the three connection strings change.
New in this step
Cloud Run A serverless container host that scales to zero with a free monthly allotment — a fit for the stateless fan-out server.
stateless container The server keeps no in-memory state (the Stream does), so any instance can serve any client and it deploys anywhere unchanged.
blocking `XREAD` caveat Some serverless Redis free tiers support Streams but not blocking XREAD; on those, switch to short non-blocking polling instead.
proxy response buffering SSE is a long-lived response; a buffering CDN or proxy would withhold frames, so disable buffering on the /stream route.
Optional, and free if you do it
Nothing about seeing the project work requires the cloud — docker compose up is the whole demo. If you
want a public URL, the server is a stateless container, so Cloud Run fits: it scales to zero and has a
generous free monthly allotment. Redis Streams need a real Redis, and this design relies on blocking
XREAD BLOCK, so self-hosting redis:7 (a tiny VM or container host) keeps it unchanged. One honest
constraint to know: some serverless Redis offerings — Upstash’s free tier among them — support Streams
(XADD / XRANGE) but not the blocking form of XREAD; on those you’d switch the reader to
short-interval non-blocking polling (XREAD with no BLOCK, on a timer) instead. Postgres can be Cloud SQL
or any managed Postgres. Only the three connection strings change; the binary and the dashboard are
identical to local.
Deploy (optional)
# Build + deploy the server container to Cloud Run (free monthly allotment eligible).
gcloud run deploy ticker \
--source . \
--set-env-vars DATABASE_URL="$DATABASE_URL",REDIS_URL="$REDIS_URL" \
--region us-central1 --allow-unauthenticated
# REDIS_URL points at your own redis:7 (blocking XREAD works) or a managed Redis that supports it.Summarise the change stream into an anomaly note with Gemini
Optional add-on IntermediateTurn the change feed you already keep into a read-only advisory — read the last slice with XRANGE, ask Gemini for structured JSON flagging restock needs and price swings, and surface it for a human to act on, never auto-applied.
New in this step
structured JSON output Asking the model to return typed JSON you can parse directly, instead of free text you must scrape.
`responseMimeType: application/json` The Gemini setting that forces the response to be valid JSON rather than prose.
`responseSchema` A schema the JSON must match (here { headline, restock[], price_anomalies[] }), so the output is typed and predictable.
read-only advisory endpoint GET /insights only suggests a restock or price review — it never changes a price or stock; a human decides.
AI reads the feed; humans act
The change feed you already maintain is a perfect input for a cheap summary: pull the recent events with
XRANGE warehouse:1 - + COUNT 200, and ask Gemini to return structured JSON — products trending toward
sold-out, prices that moved more than a threshold, a one-line headline. Use
responseMimeType: "application/json" with a responseSchema so the output is typed, not free text you
have to parse. Keep the key server-side and expose GET /insights that the dashboard can show as an
advisory banner. Crucially this is read-only: it suggests a restock or a price review, it never changes a
price or places an order — the platform shows the way, it doesn’t act. Costs nothing: a free Google AI
Studio key covers this.
Ask Gemini for a structured anomaly note
System: You are an inventory analyst. Given recent change events, return ONLY JSON.
User: <the last 200 events: product_id, price, stock, version>
responseSchema (conceptual):
{ "type": "object", "properties": {
"headline": { "type": "string" },
"restock": { "type": "array", "items": { "type": "object",
"properties": { "product_id": {"type":"integer"}, "reason": {"type":"string"} } } },
"price_anomalies": { "type": "array", "items": { "type": "object",
"properties": { "product_id": {"type":"integer"}, "delta_pct": {"type":"number"} } } } } }Chat prompt — paste into a chat to get the code
Role: Gemini integration engineer. The reader has no repo here — return complete code.
Context: Server-side handler in the user's selected backend; GEMINI_API_KEY in env; recent events available via XRANGE on the warehouse Stream.
Task: Implement GET /insights that reads the last 200 Stream events and returns Gemini's structured anomaly note { headline, restock[], price_anomalies[] }.
Requirements:
- Use responseMimeType="application/json" plus a responseSchema matching that shape; validate the JSON before returning.
- Read events with XRANGE warehouse:{id} - + COUNT 200; keep GEMINI_API_KEY server-side; time out after 20s.
- Read-only: the endpoint must NOT change any price or stock — it returns advice a human acts on.
- Link to the official structured-output docs rather than hardcoding a model name that may change.
Tests / acceptance (describe):
- A stream where one product hit stock 0 yields it in restock[]; a price jump beyond 25% appears in price_anomalies[].
- Malformed model output is rejected, not returned raw.
Output: the complete handler, no commentary.A second client: native Jetpack Compose against the same backend
Optional add-on BeginnerGive the live board a native Android face by pointing a Jetpack Compose app at the unchanged backend — same /products snapshot, same /stream feed — to prove the realtime protocol is client-agnostic.
New in this step
Jetpack Compose Android’s declarative UI toolkit; here it renders a live board that ticks per event, just like the web dashboard.
`10.0.2.2` emulator host The Android emulator’s alias for the host machine’s localhost, so the app reaches the local backend at http://10.0.2.2:8080.
client-agnostic protocol Snapshot-first then an ordered SSE feed works the same for any client — a browser and an Android app reconcile against it identically.
native SSE vs browser `EventSource` A native SSE client does not auto-reconnect or auto-send Last-Event-ID the way the browser’s EventSource does — so Android owns that itself.
One backend, two clients — and why a native one earns its keep
Nothing about the server changes here. The Android app points at the identical /products and /stream
endpoints the bundled public/index.html already calls; run the backend with docker compose up and the
emulator reaches it at http://10.0.2.2:8080 (the Android emulator’s alias for your host’s localhost).
The lesson is that the realtime protocol — snapshot-first, then an ordered SSE feed whose every frame
carries a Stream id — is client-agnostic: a browser and an Android app reconcile against the same feed the
same way. The one honest difference, which the next steps lean on, is that the browser’s EventSource
auto-reconnects and resends Last-Event-ID for you, whereas a native SSE client does not — so on Android
you own the reconnect-and-replay discipline. Costs nothing: the backend is the same local Docker
stack, and the Android emulator ships free with Android Studio — no device, no account, no paid service.
To learn the UI toolkit itself, see the Jetpack Compose track; for the language, Kotlin.
Consume the SSE stream with OkHttp's EventSource
Optional add-on IntermediateOpen GET /stream from Android with the okhttp-sse artifact, parsing each frame’s id, type, and data exactly as the web client does — and notice this client gives you no auto-reconnect, which is precisely why this module exists.
New in this step
`okhttp-sse` artifact The SSE companion to core OkHttp (com.squareup.okhttp3:okhttp-sse) that does the SSE framing for you.
`EventSources.createFactory` / `newEventSource` Build a factory from your OkHttp client, then factory.newEventSource(request, listener) opens the stream.
`EventSourceListener.onEvent` The callback handing you each frame’s id (Stream id), type (event name), and data (the JSON to parse).
no auto-reconnect / auto `Last-Event-ID` Unlike the browser’s EventSource, OkHttp’s only does request()/cancel() — you own reconnect and resending the last id.
okhttp-sse gives you id + type + data per event — and nothing more
OkHttp ships SSE in a companion artifact, com.squareup.okhttp3:okhttp-sse, beside the core okhttp. You
build an EventSource.Factory from your client with EventSources.createFactory(client), then
factory.newEventSource(request, listener). The listener’s key callback is
onEvent(eventSource, id: String?, type: String?, data: String) — OkHttp has already done the SSE framing
for you, so id is the Stream id the server stamped, type is the event name (message by default, or the
server’s custom replaying / resync), and data is the JSON body you parse into a change event. onOpen,
onClosed, and onFailure(eventSource, t, response) round out the lifecycle.
One accuracy point that is the whole feature: unlike the browser’s EventSource, OkHttp’s is a low-level
client — its interface is just request() and cancel(), with no automatic reconnect and no
automatic Last-Event-ID. That is not a gap to apologise for; it is exactly why this module exists. You
reconnect and resend the last id yourself (next step). If you would rather not hand-roll the retry loop, the
documented higher-level LaunchDarkly okhttp-eventsource
library wraps OkHttp and manages retry + Last-Event-ID for you; this module uses the plain okhttp-sse
artifact so the discipline stays visible. Official artifact:
https://square.github.io/okhttp/5.x/okhttp-sse/okhttp3.sse/
Add the dependencies
// app/build.gradle.kts
dependencies {
implementation("com.squareup.okhttp3:okhttp") // take the latest from Maven Central
implementation("com.squareup.okhttp3:okhttp-sse") // SSE companion: EventSource + EventSourceListener
implementation("androidx.lifecycle:lifecycle-runtime-compose") // collectAsStateWithLifecycle
}Open the stream and parse each event
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener
import okhttp3.sse.EventSources
import org.json.JSONObject
// A self-contained change event. Stream fields arrive as JSON strings, so coerce to numbers.
data class Tick(val productId: Long, val price: Long, val stock: Int, val version: Int)
fun parseTick(data: String): Tick {
val j = JSONObject(data)
// server sends product_id on stream frames, id on the snapshot — accept either
val pid = if (j.has("product_id")) j.getString("product_id").toLong() else j.getLong("id")
return Tick(pid, j.getString("price").toLong(), j.getString("stock").toInt(), j.getString("version").toInt())
}
fun openStream(client: OkHttpClient, baseUrl: String, lastId: String?, listener: EventSourceListener): EventSource {
val builder = Request.Builder().url("$baseUrl/stream")
if (lastId != null) builder.header("Last-Event-ID", lastId) // we resend it ourselves — OkHttp won't
return EventSources.createFactory(client).newEventSource(builder.build(), listener)
}
// In the listener, onEvent hands you id, type, and the JSON payload directly:
val listener = object : EventSourceListener() {
override fun onOpen(eventSource: EventSource, response: Response) { /* status: Live */ }
override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) {
when (type) {
"replaying" -> { /* data = count; status: Replaying N */ }
"resync" -> { /* re-fetch GET /products, then keep streaming */ }
else -> { /* a data frame: applyTick(id, parseTick(data)) */ }
}
}
override fun onClosed(eventSource: EventSource) { /* reconnect from lastId */ }
override fun onFailure(eventSource: EventSource, t: Throwable?, response: Response?) { /* status: Reconnecting */ }
}Hold the board in Compose state and render a live list
Optional add-on IntermediateMake the board live by holding it in a ViewModel as a StateFlow<BoardUiState> and collecting it with collectAsStateWithLifecycle() — each event produces a new immutable state, so Compose recomposes only the changed rows and the same status pill the web client shows.
New in this step
`ViewModel` The lifecycle-scoped holder of the board’s truth; the SSE listener calls into it rather than touching the UI directly.
`StateFlow` An observable value holder updated with update { … }; each new immutable snapshot is what the UI collects.
`collectAsStateWithLifecycle` Collects a flow only while the screen is active, so a backgrounded screen doesn’t leak the stream.
`LazyColumn` A scrolling list that renders only visible rows; keyed by product id so Compose updates just the row that changed.
recomposition Compose re-runs only the composables whose inputs changed, so a new state snapshot ticks one row, not the whole board.
StateFlow in, collectAsStateWithLifecycle out — the board ticks per event
The ViewModel owns the truth: a StateFlow<BoardUiState> holding a Map<Long, Row> keyed by product id
plus a status enum (Live, Reconnecting, Replaying). The SSE listener doesn’t touch the UI — it calls
into the ViewModel, which updates the flow with update { … }. The Compose screen collects it with
collectAsStateWithLifecycle() (from androidx.lifecycle:lifecycle-runtime-compose), which is
lifecycle-aware: it stops collecting when the app is in the background and resumes on return, so you don’t
leak the stream behind a backgrounded screen. Render the map as a LazyColumn; because the state is a new
immutable snapshot per event, Compose recomposes only the rows that changed and the board ticks live. The
status pill reads the same status field the web dashboard shows — green Live on open, amber
Reconnecting… on failure, Replaying N while a backlog drains — so the two clients are visibly the
same machine with different skins.
UI state, ViewModel flow, and the Compose screen
import androidx.lifecycle.ViewModel
import androidx.lifecycle.compose.collectAsStateWithLifecycle
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.update
enum class Status { Live, Reconnecting, Replaying }
data class Row(val id: Long, val name: String, val price: Long, val stock: Int, val version: Int)
data class BoardUiState(val rows: Map<Long, Row> = emptyMap(), val status: Status = Status.Reconnecting, val replayCount: Int = 0)
class BoardViewModel : ViewModel() {
private val _state = MutableStateFlow(BoardUiState())
val state = _state.asStateFlow()
fun setStatus(s: Status, replayCount: Int = 0) = _state.update { it.copy(status = s, replayCount = replayCount) }
// applyTick lives in the next step (idempotent by version); it calls _state.update { ... } too.
}
@Composable
fun BoardScreen(vm: BoardViewModel) {
val ui by vm.state.collectAsStateWithLifecycle() // lifecycle-aware collection
Column {
Text(when (ui.status) { // the same pill the web client shows
Status.Live -> "● Live"
Status.Reconnecting -> "● Reconnecting…"
Status.Replaying -> "● Replaying ${ui.replayCount} events"
})
LazyColumn {
items(ui.rows.values.toList(), key = { it.id }) { r ->
Row(Modifier.fillMaxWidth().padding(12.dp), Arrangement.SpaceBetween) {
Text(r.name)
Text("$" + "%.2f".format(r.price / 100.0)) // format cents at the edge only
Text(if (r.stock > 0) "${r.stock} in stock" else "SOLD OUT")
}
}
}
}
}Reconnect-and-replay: persist the last id, resend Last-Event-ID, apply idempotently
Optional add-on AdvancedOwn on Android what the browser gets for free — remember the last applied Stream id, resend it as Last-Event-ID on every reconnect so the server replays exactly the gap, and apply each event idempotently so a duplicate id changes nothing.
New in this step
persist last id (`DataStore`) Hold the last applied id in the ViewModel, and store it in DataStore if you want replay to survive the app being killed.
manual reconnect loop On onFailure/onClosed, reopen /stream from the last id yourself — OkHttp will not, so the discipline is explicit.
backoff Wait briefly (and growing) between reconnect attempts so a flapping connection doesn’t hammer the server.
idempotent apply by version Accept an event only if its version is strictly newer than the stored one — identical to the web client’s reducer, so re-delivered ids are ignored.
The same protocol the browser gets for free, made explicit
This is the heart of the module, and it is the same contract the web client follows — the Make replay idempotent and client offset protocol steps define it once for every client. Two responsibilities, both yours on Android because OkHttp won’t do them:
- Persist the last id. Each applied data frame’s
idis your progress marker. Hold it in the ViewModel, and persist it (e.g.DataStore) if you want replay to survive the app being killed. On reconnect, pass it as theLast-Event-IDrequest header — the server’sXREAD-from-id returns just the gap (and areplaying Nframe first, which drives the pill); aresyncevent means the id was trimmed, so re-fetchGET /productsand continue. - Apply idempotently by version. Replay and live delivery can overlap, so the apply must be a no-op for
anything you’ve already seen. Hold the current
versionper product and accept an event only if its version is strictly newer — identical to the web client’sapplyEventreducer. This is what makes at-least-once replay safe: a re-delivered id is simply ignored.
The reconnect loop itself is plain: on onFailure / onClosed, set the pill to Reconnecting…, back off
briefly, and call openStream(client, baseUrl, lastId, listener) again. Snapshot-first on cold start
(GET /products), then the Stream from lastId — baseline plus deltas, no gaps.
Idempotent apply + Last-Event-ID reconnect (ViewModel core)
// In BoardViewModel — the one reducer every event flows through.
private var lastId: String? = null // the Stream id of the newest applied event; resent on reconnect
fun applyTick(eventId: String?, t: Tick) {
_state.update { s ->
val cur = s.rows[t.productId]
if (cur != null && t.version <= cur.version) return@update s // stale or duplicate — ignore
val name = cur?.name ?: "Product ${t.productId}"
s.copy(rows = s.rows + (t.productId to Row(t.productId, name, t.price, t.stock, t.version)))
}
if (eventId != null) lastId = eventId // advance progress only after a successful apply
}
// Reconnect = reopen /stream from lastId; the server replays the gap, applyTick dedupes any overlap.
fun reconnect(client: OkHttpClient, baseUrl: String, listener: EventSourceListener) {
setStatus(Status.Reconnecting)
openStream(client, baseUrl, lastId, listener) // resends Last-Event-ID — OkHttp will not do this for you
}Agent prompt — paste into an agent with repo access
Role: Senior Android engineer (Kotlin, Jetpack Compose, coroutines) in this repo.
Context: The Ticker backend is unchanged and reachable at http://10.0.2.2:8080 from the emulator. It exposes GET /products (snapshot), GET /stream (SSE: data frames carry the Stream id and a JSON {product_id,price,stock,version}; custom "replaying" frame with data=count on reconnect; "resync" frame when the client's id was trimmed). Dependencies: okhttp, okhttp-sse, lifecycle-runtime-compose.
Task: Build BoardViewModel (StateFlow<BoardUiState>) plus an OkHttp SSE client that opens /stream, applies events idempotently, and reconnects resending Last-Event-ID.
Requirements:
- Open the stream with EventSources.createFactory(client).newEventSource(request, listener); set the Last-Event-ID request header to the last applied id (OkHttp does NOT auto-reconnect or auto-send it).
- applyTick(eventId, tick) is idempotent: ignore any event whose version is not strictly greater than the stored version for that product id; advance the stored lastId only after a successful apply.
- Map listener callbacks to status: onOpen -> Live; onFailure/onClosed -> Reconnecting then reconnect from lastId; type "replaying" -> Replaying(count); type "resync" -> re-fetch GET /products and continue.
- Cold start is snapshot-first: GET /products applied through the same reducer, then open the stream.
Tests / acceptance:
- A unit test feeds a FAKE EventSourceListener two events for the same product (versions 1 then 2): assert the board row ends at version 2 with the second event's price/stock.
- Re-deliver the version-2 event a second time (same id): assert the board state is unchanged (duplicate ignored) and the row count stays 1.
- Provide a fake EventSource so the test needs no network; assert lastId equals the id of the newest applied event.
Output: a unified diff plus a one-paragraph note on why the Android client must own reconnect/Last-Event-ID while the browser's EventSource does not.Where to take it next
- Go deep on the store that carries this whole build: the Redis track — Streams,
pub/sub versus Streams, persistence, and
SCAN. - Keep the durable truth honest with PostgreSQL — constraints,
RETURNING, optimistic concurrency, and isolation levels. - Build the same fan-out in either language: Go or TypeScript (Hono on the edge).
- Contrast the lessons: Aurora Commerce owns in-transaction correctness; Ticker owns after-commit propagation. See why on the Compare page, where Redis scores 5 here and a document store’s change-stream replay scores lower.