/** * AG-UI SSE Events Route * * GET /events/ag-ui - Server-Sent Events stream for AG-UI protocol clients. * * Provides AG-UI protocol compatibility while preserving Gizmo's observability. * Events are generated by the ag-ui-adapter-plugin and emitted via EventEmitter. */ import { Hono } from "hono"; import { streamSSE } from "hono/streaming"; import type { Runtime, Action } from "@gizmo-ai/runtime"; import type { EventEmitter } from "events"; import type { AGUIEventType, StateSnapshotEvent } from "@gizmo-ai/ag-ui-adapter-plugin"; /** * Helper to create AG-UI events with automatic timestamp * Inlined to avoid runtime dependency on @gizmo-ai/ag-ui-adapter-plugin */ function createAGUIEvent( event: Omit ): T { return { ...event, timestamp: Date.now(), } as T; } export interface AGUIEventsRouteConfig< S extends Record, A extends Action > { /** * Runtime instance */ runtime: Runtime; /** * EventEmitter from ag-ui-adapter-plugin * Should be the same bus passed to agUIAdapterPlugin() */ bus: EventEmitter; /** * Which state slices to include in STATE_SNAPSHOT * Default: ['agent'] (only agent slice) */ slices?: string[]; /** * Heartbeat interval in milliseconds * Default: 30000 (30 seconds) */ heartbeatInterval?: number; /** * JSON serializer function */ serializer: (value: unknown) => string; } /** * Create the AG-UI SSE events route * * This route provides AG-UI protocol compatibility for frontend clients. * It streams AG-UI events (RunStarted, TextMessageContent, ToolCallStart, etc.) * and STATE_DELTA events with RFC 6902 JSON Patch operations. * * @example * ```typescript * import { createAGUIEventsRoute } from "@gizmo-ai/server"; * import { agUIAdapterPlugin } from "@gizmo-ai/ag-ui-adapter-plugin"; * * const adapter = agUIAdapterPlugin(); * const aguiRoute = createAGUIEventsRoute({ * runtime, * bus: adapter.bus, * serializer: JSON.stringify, * }); * * app.route("/events/ag-ui", aguiRoute); * ``` */ export function createAGUIEventsRoute< S extends Record, A extends Action >(config: AGUIEventsRouteConfig): Hono { const { runtime, bus, slices = ["agent"], heartbeatInterval = 30000, serializer, } = config; const app = new Hono(); app.get("/", async (c) => { return streamSSE(c, async (stream) => { let eventId = 0; // Helper to send AG-UI events with incrementing ID const sendEvent = async (event: AGUIEventType) => { eventId++; await stream.writeSSE({ id: String(eventId), event: event.type, data: serializer(event), }); }; // 1. Send STATE_SNAPSHOT on connection const currentState = runtime.getState(); const trackedState: Record = {}; // Extract only tracked slices for (const slice of slices) { if (slice in currentState) { trackedState[slice] = currentState[slice as keyof S]; } } const snapshotEvent = createAGUIEvent({ type: "STATE_SNAPSHOT", runId: null, state: trackedState, metadata: { slices, connection: "initial", }, }); await sendEvent(snapshotEvent); // 2. Set up AG-UI event listener // The ag-ui-adapter-plugin emits events via bus.emit("ag-ui.event", event) const handleAGUIEvent = async (event: AGUIEventType) => { try { await sendEvent(event); } catch { // Stream closed, cleanup will happen } }; bus.on("ag-ui.event", handleAGUIEvent); // 3. Set up heartbeat (optional in AG-UI spec) const heartbeatTimer = setInterval(async () => { const heartbeat: AGUIEventType = { type: "Heartbeat", runId: null, timestamp: Date.now(), }; try { await sendEvent(heartbeat); } catch { // Stream closed } }, heartbeatInterval); // 4. Cleanup on disconnect stream.onAbort(() => { bus.off("ag-ui.event", handleAGUIEvent); clearInterval(heartbeatTimer); }); // Keep the stream open indefinitely // The stream will close when the client disconnects await new Promise(() => {}); // Never resolves }); }); return app; } /** * Helper to check if AG-UI adapter plugin is configured * * @param runtime - Runtime instance * @returns True if ag-ui-adapter-plugin is present */ export function hasAGUIAdapter>( runtime: Runtime ): boolean { const state = runtime.getState(); return "aguiAdapter" in state; }