/** * Stream Actions Route * * GET /stream/actions - Server-Sent Events stream for execution lifecycle and action events. * Ideal for debugging, observability, and action-focused tooling. */ import { Hono } from "hono"; import { streamSSE } from "hono/streaming"; import type { Runtime, Action } from "@gizmo-ai/runtime"; import type { EventEmitter } from "events"; import type { ExecutionStatusEvent, HeartbeatEvent, } from "../types.ts"; import { onExecutionLifecycle } from "../lib/event-bus.ts"; export interface StreamActionsRouteConfig< S extends Record, A extends Action > { runtime: Runtime; bus: EventEmitter; heartbeatInterval: number; serializer: (value: unknown) => string; } /** * Create the actions stream route */ export function createStreamActionsRoute< S extends Record, A extends Action >(config: StreamActionsRouteConfig): Hono { const { bus, heartbeatInterval, serializer } = config; const app = new Hono(); app.get("/", async (c) => { return streamSSE(c, async (stream) => { let eventId = 0; // Helper to send events with incrementing ID const sendEvent = async (event: ExecutionStatusEvent | HeartbeatEvent) => { eventId++; await stream.writeSSE({ id: String(eventId), event: event.type, data: serializer(event), }); }; // 1. Set up execution lifecycle listener const unsubscribeLifecycle = onExecutionLifecycle(bus, async (lifecycle) => { const statusEvent: ExecutionStatusEvent = { type: "execution.status", status: lifecycle.status, executionId: lifecycle.executionId, turnId: lifecycle.turnId, error: lifecycle.error?.message, timestamp: Date.now(), }; try { await sendEvent(statusEvent); } catch { // Stream closed } }); // 2. Set up custom event listener (for coordinator events, tool calls, etc.) const handleCustomEvent = async (event: unknown) => { eventId++; try { await stream.writeSSE({ id: String(eventId), event: "custom-event", data: serializer({ type: "custom-event", data: event, timestamp: Date.now(), }), }); } catch { // Stream closed } }; bus.on("custom-event", handleCustomEvent); // 3. Set up heartbeat const heartbeatTimer = setInterval(async () => { const heartbeat: HeartbeatEvent = { type: "heartbeat", timestamp: Date.now(), }; try { await sendEvent(heartbeat); } catch { // Stream closed } }, heartbeatInterval); // 4. Cleanup on disconnect stream.onAbort(() => { unsubscribeLifecycle(); bus.off("custom-event", handleCustomEvent); clearInterval(heartbeatTimer); }); // Keep the stream open indefinitely await new Promise(() => {}); }); }); return app; }