/** * Stream State Route * * GET /stream/state - Server-Sent Events stream for state updates only. * Uses JSON Patch (RFC 6902) for efficient delta updates. */ import { Hono } from "hono"; import { streamSSE } from "hono/streaming"; import { compare } from "fast-json-patch"; import type { Runtime, Action } from "@gizmo-ai/runtime"; import type { EventEmitter } from "events"; import type { StateUpdateEvent, InitialStateEvent, HeartbeatEvent, } from "../types.ts"; import { onStateChanged } from "../lib/event-bus.ts"; export interface StreamStateRouteConfig< S extends Record, A extends Action > { runtime: Runtime; bus: EventEmitter; slices?: string[]; /** Function to get resolved slices (accounts for auto-filtering) */ resolvedSlices?: (state: S) => string[]; heartbeatInterval: number; serializer: (value: unknown) => string; } /** * Create the state stream route */ export function createStreamStateRoute< S extends Record, A extends Action >(config: StreamStateRouteConfig): Hono { const { runtime, bus, slices, resolvedSlices, heartbeatInterval, serializer } = config; const app = new Hono(); app.get("/", async (c) => { return streamSSE(c, async (stream) => { let eventId = 0; // Helper to send events with incrementing ID const sendEvent = async (event: InitialStateEvent | StateUpdateEvent | HeartbeatEvent) => { eventId++; await stream.writeSSE({ id: String(eventId), event: event.type, data: serializer(event), }); }; // 1. Send initial state snapshot const currentState = runtime.getState(); const sliceKeys = resolvedSlices ? resolvedSlices(currentState) : (slices ?? Object.keys(currentState)); const initialSlices: Record = {}; for (const key of sliceKeys) { initialSlices[key] = currentState[key as keyof S]; } const initialEvent: InitialStateEvent = { type: "state.initial", slices: initialSlices, timestamp: Date.now(), }; await sendEvent(initialEvent); // Track previous state for computing patches const previousSliceState: Record = { ...initialSlices }; // 2. Set up state change listener with JSON Patch deltas const unsubscribeState = onStateChanged(bus, async (change) => { if (slices && !slices.includes(change.slice)) { return; // Skip slices we're not tracking } const previousState = previousSliceState[change.slice]; const currentState = change.after; // Detect if we need full replacement instead of patches const needsFullReplacement = previousState === undefined || previousState === null || currentState === null || typeof previousState !== typeof currentState || Array.isArray(previousState) !== Array.isArray(currentState); let updateEvent: StateUpdateEvent; if (needsFullReplacement) { updateEvent = { type: "state.update", slice: change.slice, value: currentState, cause: { type: change.causingAction.type, id: getActionId(change.causingAction), payload: change.causingAction.payload, }, timestamp: Date.now(), }; } else { // At this point, we've verified previousState is not undefined/null and types match const patches = compare(previousState as object, currentState as object); if (patches.length === 0) { return; } updateEvent = { type: "state.update", slice: change.slice, patches, cause: { type: change.causingAction.type, id: getActionId(change.causingAction), payload: change.causingAction.payload, }, timestamp: Date.now(), }; } previousSliceState[change.slice] = currentState; try { await sendEvent(updateEvent); } catch { // Stream closed } }); // 3. Set up heartbeat const heartbeatTimer = setInterval(async () => { const heartbeat: HeartbeatEvent = { type: "heartbeat", timestamp: Date.now(), }; try { await sendEvent(heartbeat); } catch { // Stream closed } }, heartbeatInterval); // 4. Cleanup on disconnect stream.onAbort(() => { unsubscribeState(); clearInterval(heartbeatTimer); }); // Keep the stream open indefinitely await new Promise(() => {}); }); }); return app; } /** * Extract action ID from action metadata if present */ function getActionId(action: Action): string | undefined { const meta = (action as { meta?: { __ellie?: { actionId?: string } } }).meta; return meta?.__ellie?.actionId; }