/** * SSE Events Route * * GET /events - Server-Sent Events stream for real-time state updates. * Uses JSON Patch (RFC 6902) for efficient delta updates. */ import { Hono } from "hono"; import { streamSSE } from "hono/streaming"; import { compare } from "fast-json-patch"; import type { Runtime, Action } from "@gizmo-ai/runtime"; import type { EventEmitter } from "events"; import type { SSEEvent, StateUpdateEvent, InitialStateEvent, HeartbeatEvent, ExecutionStatusEvent, CustomEvent, } from "../types.ts"; import { onStateChanged, onExecutionLifecycle } from "../lib/event-bus.ts"; export interface EventsRouteConfig< S extends Record, A extends Action > { runtime: Runtime; bus: EventEmitter; slices?: string[]; /** Function to get resolved slices (accounts for auto-filtering) */ resolvedSlices?: (state: S) => string[]; heartbeatInterval: number; serializer: (value: unknown) => string; } /** * Create the SSE events route */ export function createEventsRoute< S extends Record, A extends Action >(config: EventsRouteConfig): Hono { const { runtime, bus, slices, resolvedSlices, heartbeatInterval, serializer } = config; const app = new Hono(); app.get("/", async (c) => { return streamSSE(c, async (stream) => { let eventId = 0; // Helper to send events with incrementing ID const sendEvent = async (event: SSEEvent) => { eventId++; await stream.writeSSE({ id: String(eventId), event: event.type, data: serializer(event), }); }; // 1. Send initial state snapshot const currentState = runtime.getState(); // Use resolvedSlices for consistency with middleware filtering const sliceKeys = resolvedSlices ? resolvedSlices(currentState) : (slices ?? Object.keys(currentState)); const initialSlices: Record = {}; for (const key of sliceKeys) { initialSlices[key] = currentState[key as keyof S]; } const initialEvent: InitialStateEvent = { type: "state.initial", slices: initialSlices, timestamp: Date.now(), }; await sendEvent(initialEvent); // Track previous state for computing patches const previousSliceState: Record = { ...initialSlices }; // 2. Set up state change listener with JSON Patch deltas const unsubscribeState = onStateChanged(bus, async (change) => { if (slices && !slices.includes(change.slice)) { return; // Skip slices we're not tracking } const previousState = previousSliceState[change.slice]; const currentState = change.after; // Detect if we need full replacement instead of patches const needsFullReplacement = previousState === undefined || // First update previousState === null || // Null is a special case (typeof null === "object") currentState === null || // Transitioning to/from null typeof previousState !== typeof currentState || // Type changed Array.isArray(previousState) !== Array.isArray(currentState); // Array ↔ non-array let updateEvent: StateUpdateEvent; if (needsFullReplacement) { // Send full state value for type changes or first update updateEvent = { type: "state.update", slice: change.slice, value: currentState, cause: { type: change.causingAction.type, id: getActionId(change.causingAction), payload: change.causingAction.payload, }, timestamp: Date.now(), }; } else { // Compute JSON Patch operations for incremental update // At this point, we've verified previousState is not undefined/null and types match const patches = compare(previousState as object, currentState as object); // Only send if there are actual changes if (patches.length === 0) { return; } updateEvent = { type: "state.update", slice: change.slice, patches, cause: { type: change.causingAction.type, id: getActionId(change.causingAction), payload: change.causingAction.payload, }, timestamp: Date.now(), }; } // Update tracked state previousSliceState[change.slice] = currentState; try { await sendEvent(updateEvent); } catch { // Stream closed, cleanup will happen } }); // 3. Set up execution lifecycle listener const unsubscribeLifecycle = onExecutionLifecycle(bus, async (lifecycle) => { const statusEvent: ExecutionStatusEvent = { type: "execution.status", status: lifecycle.status, executionId: lifecycle.executionId, turnId: lifecycle.turnId, error: lifecycle.error?.message, timestamp: Date.now(), }; try { await sendEvent(statusEvent); } catch { // Stream closed } }); // 4. Set up custom event listener (for coordinator events, etc.) const handleCustomEvent = async (event: unknown) => { try { await sendEvent({ type: "custom-event", data: event, timestamp: Date.now(), }); } catch { // Stream closed } }; bus.on("custom-event", handleCustomEvent); // 5. Set up heartbeat const heartbeatTimer = setInterval(async () => { const heartbeat: HeartbeatEvent = { type: "heartbeat", timestamp: Date.now(), }; try { await sendEvent(heartbeat); } catch { // Stream closed } }, heartbeatInterval); // 6. Cleanup on disconnect stream.onAbort(() => { unsubscribeState(); unsubscribeLifecycle(); bus.off("custom-event", handleCustomEvent); clearInterval(heartbeatTimer); }); // Keep the stream open indefinitely // The stream will close when the client disconnects await new Promise(() => {}); // Never resolves }); }); return app; } /** * Extract action ID from action metadata if present */ function getActionId(action: Action): string | undefined { const meta = (action as { meta?: { __ellie?: { actionId?: string } } }).meta; return meta?.__ellie?.actionId; }