/** * Server Plugin * * Creates a RuntimePlugin that broadcasts state changes to an internal event bus. * The event bus is then consumed by SSE routes to stream updates to clients. */ import type { Middleware, Action, MiddlewareAPI, RuntimePlugin, ExecutionStartedAction, ExecutionCompletedAction, ExecutionInterruptedAction, ExecutionFailedAction, ExecutionAbortedAction, ExecutionAction, } from "@gizmo-ai/runtime"; import { isExecutionLifecycleAction } from "@gizmo-ai/runtime"; import type { EventEmitter } from "events"; import type { ServerPluginConfig, ServerPluginInstance, ExecutionLifecycleEvent, StoredRun, } from "./types.ts"; import { createEventBus, emitStateChanged, emitExecutionLifecycle } from "./lib/event-bus.ts"; import { sanitizeAction, isSerializable } from "./lib/serialization.ts"; import { createPersistenceStore } from "./persistence/index.ts"; import type { PersistenceStore, RunRecord, ActionRecord, StateRecord } from "./persistence/index.ts"; /** Streaming delta actions — ephemeral, high-volume noise excluded from JSONL persistence */ const STREAMING_PREFIX = "AGENT_MODEL_STREAM_"; /** * Compute SHA-256 digest of run bundle for tamper evidence */ async function computeRunDigest(bundle: { executionId: string; actions: unknown[]; finalState: unknown; startTime: number; endTime: number; }): Promise { const data = JSON.stringify(bundle); const encoder = new TextEncoder(); const hashBuffer = await crypto.subtle.digest("SHA-256", encoder.encode(data)); const hashArray = Array.from(new Uint8Array(hashBuffer)); return hashArray.map(b => b.toString(16).padStart(2, "0")).join(""); } /** * Create the server plugin * * This creates a RuntimePlugin that: * 1. Intercepts all actions and checks for state slice changes * 2. Emits state.changed events to the internal bus when slices change * 3. Emits execution.lifecycle events for RUNTIME_EXECUTION_* actions * * Usage: * ```typescript * const serverPlugin = createServerPlugin({ slices: ["execution", "agent"] }); * const runtime = createRuntime({ plugins: [agent, serverPlugin.plugin] }); * const server = createServer({ runtime, plugin: serverPlugin }); * ``` */ export function createServerPlugin< S extends Record = Record, A extends Action = Action >(config: ServerPluginConfig = {}): ServerPluginInstance { const { slices, historySize = 10, persistence } = config; const bus = createEventBus(); // Track whether we've warned about non-serializable slices let hasWarnedAboutSkipped = false; // Run history storage (ring buffer for in-memory) const runHistory = new Map(); const runHistoryOrder: string[] = []; // Persistence store (if enabled) const persistenceBaseDir = persistence?.baseDir ?? ".gizmo"; const store: PersistenceStore | null = persistence?.enabled ? createPersistenceStore({ baseDir: persistenceBaseDir, enabled: true, }) : null; // Action sequence counter per execution (all actions, for in-memory tracking) const actionSeq = new Map(); // Persisted action sequence counter (dense, no gaps from skipped streaming actions) const persistedSeqMap = new Map(); // Initialize persistence store if (store) { store.init().catch((err) => { console.error("[Server Plugin] Failed to initialize persistence:", err); }); } const middleware: Middleware = (api: MiddlewareAPI) => (next) => (action: A) => { // Capture state BEFORE the action const stateBefore = api.getState(); // Let the action flow through the pipeline const result = next(action); // Capture state AFTER the action const stateAfter = api.getState(); // Determine which slices to check const allSlices = slices ?? Object.keys(stateAfter); // If slices not explicitly configured, filter out non-serializable ones let slicesToCheck: string[]; const skippedSlices: string[] = []; if (!slices) { // Auto-filter non-serializable slices slicesToCheck = []; for (const sliceKey of allSlices) { const sliceState = stateAfter[sliceKey as keyof S]; if (isSerializable(sliceState)) { slicesToCheck.push(sliceKey); } else { skippedSlices.push(sliceKey); } } // Warn once about skipped slices if (skippedSlices.length > 0 && !hasWarnedAboutSkipped) { console.warn( `[Server Plugin] Skipping non-serializable slices: ${skippedSlices.join(', ')}. ` + `To silence this warning, explicitly set 'slices' in ServerPluginConfig.` ); hasWarnedAboutSkipped = true; } } else { // Use explicitly configured slices (user's choice, even if non-serializable) slicesToCheck = allSlices; } // Check each slice for changes (reference equality) for (const sliceKey of slicesToCheck) { const before = stateBefore[sliceKey as keyof S]; const after = stateAfter[sliceKey as keyof S]; // Reference equality check (Redux convention: new state = new reference) if (before !== after) { emitStateChanged(bus, { slice: sliceKey, before, after, causingAction: action, }); } } // Emit execution lifecycle events for convenience const lifecycleEvent = mapActionToLifecycleEvent(action); if (lifecycleEvent) { emitExecutionLifecycle(bus, lifecycleEvent); } // Track run history if enabled (in-memory) or persistence is enabled if (historySize > 0 || store !== null) { trackRunHistory(action, stateAfter); } return result; }; /** * Track run history for observability */ function trackRunHistory(action: A, state: S): void { // Handle execution started - create new run if (action.type === "RUNTIME_EXECUTION_STARTED") { const a = action as unknown as ExecutionStartedAction; const { executionId } = a.payload; const startTime = Date.now(); // In-memory storage (only when historySize > 0) if (historySize > 0) { runHistory.set(executionId, { executionId, startTime, status: "running", actions: [sanitizeAction(action)], }); runHistoryOrder.push(executionId); // Enforce ring buffer limit for in-memory if (runHistoryOrder.length > historySize) { const oldestId = runHistoryOrder.shift()!; runHistory.delete(oldestId); actionSeq.delete(oldestId); persistedSeqMap.delete(oldestId); } } // Initialize action sequence counters actionSeq.set(executionId, 0); persistedSeqMap.set(executionId, 0); // Persist to JSONL if (store) { const runRecord: RunRecord = { executionId, startTime, status: "running", actionCount: 1, logFile: `${persistenceBaseDir}/logs/${executionId}.jsonl`, }; store.appendRun(runRecord).catch((err) => { console.error("[Server Plugin] Failed to persist run:", err); }); const actionRecord: ActionRecord = { seq: 0, timestamp: startTime, type: action.type, payload: sanitizeAction(action), }; store.appendAction(executionId, actionRecord).catch((err) => { console.error("[Server Plugin] Failed to persist action:", err); }); } return; } // Get current execution ID from state const executionState = state.execution as { id?: string; currentExecutionId?: string } | undefined; const currentExecutionId = executionState?.id || executionState?.currentExecutionId; if (!currentExecutionId) { return; } // Get run from in-memory storage (may not exist if historySize=0) const run = runHistory.get(currentExecutionId); const seq = (actionSeq.get(currentExecutionId) ?? 0) + 1; actionSeq.set(currentExecutionId, seq); // Add action to current run (in-memory, if exists) if (run) { run.actions.push(sanitizeAction(action)); } // Persist action to JSONL (skip streaming deltas — ephemeral, high-volume noise) if (store && !action.type.startsWith(STREAMING_PREFIX)) { const persistedSeq = (persistedSeqMap.get(currentExecutionId) ?? 0) + 1; persistedSeqMap.set(currentExecutionId, persistedSeq); const actionRecord: ActionRecord = { seq: persistedSeq, timestamp: Date.now(), type: action.type, payload: sanitizeAction(action), }; store.appendAction(currentExecutionId, actionRecord).catch((err) => { console.error("[Server Plugin] Failed to persist action:", err); }); } // Update run status on lifecycle events if (action.type === "RUNTIME_EXECUTION_COMPLETED") { const endTime = Date.now(); if (run) { run.endTime = endTime; run.status = "completed"; } finalizeRunPersistence(currentExecutionId, run, state, "completed", endTime); } else if (action.type === "RUNTIME_EXECUTION_FAILED") { const a = action as unknown as ExecutionFailedAction; const endTime = Date.now(); const error = a.payload.error?.message ?? String(a.payload.error); if (run) { run.endTime = endTime; run.status = "failed"; run.error = error; } finalizeRunPersistence(currentExecutionId, run, state, "failed", endTime, error); } else if (action.type === "RUNTIME_EXECUTION_ABORTED") { const endTime = Date.now(); if (run) { run.endTime = endTime; run.status = "aborted"; } finalizeRunPersistence(currentExecutionId, run, state, "aborted", endTime); } else if (action.type === "RUNTIME_EXECUTION_INTERRUPTED") { const endTime = Date.now(); if (run) { run.endTime = endTime; run.status = "completed"; // Treat interrupted as completed } finalizeRunPersistence(currentExecutionId, run, state, "completed", endTime); } } /** * Finalize a run: compute digest and persist update * @param run - The in-memory run (may be undefined in persistence-only mode) * @param endTime - End timestamp (required when run is undefined) */ function finalizeRunPersistence( executionId: string, run: StoredRun | undefined, state: S, status: "completed" | "failed" | "aborted", endTime: number, error?: string ): void { // Persist state first persistState(state, executionId); // Skip persistence update if no store if (!store) return; // When run exists, compute digest with full action history if (run) { computeRunDigest({ executionId, actions: run.actions, finalState: state, startTime: run.startTime, endTime, }) .then((digest) => { const update: Partial = { endTime, status, actionCount: run.actions.length, digest, }; if (error) update.error = error; return store?.updateRun(executionId, update); }) .catch((err) => { console.error("[Server Plugin] Failed to finalize run:", err); // Fall back to update without digest const update: Partial = { endTime, status, actionCount: run.actions.length, }; if (error) update.error = error; store?.updateRun(executionId, update); }); } else { // Persistence-only mode: update without digest (actions not tracked in memory) const actionCount = (persistedSeqMap.get(executionId) ?? 0) + 1; const update: Partial = { endTime, status, actionCount, }; if (error) update.error = error; store.updateRun(executionId, update).catch((err) => { console.error("[Server Plugin] Failed to update persisted run:", err); }); } } /** * Persist state snapshot to JSONL */ function persistState(state: S, executionId?: string): void { if (store) { const stateRecord: StateRecord = { timestamp: Date.now(), executionId, snapshot: state as Record, }; store.appendState(stateRecord).catch((err) => { console.error("[Server Plugin] Failed to persist state:", err); }); } } // The plugin doesn't need its own state slice, just middleware const plugin: RuntimePlugin<"__server", object, A, S> = { middleware, }; /** * Get the resolved list of slices being tracked * This accounts for auto-filtering of non-serializable slices */ const resolvedSlices = (state: S): string[] => { const allSlices = slices ?? Object.keys(state); // If slices explicitly configured, return as-is if (slices) { return allSlices; } // Auto-filter non-serializable slices const filtered: string[] = []; for (const sliceKey of allSlices) { const sliceState = state[sliceKey as keyof S]; if (isSerializable(sliceState)) { filtered.push(sliceKey); } } return filtered; }; return { plugin, bus, slices, resolvedSlices, historySize, getRunHistory: () => runHistory, recordAction: trackRunHistory, persistence: store, }; } /** * Map execution actions to lifecycle events * * Uses the runtime's type guard to properly narrow action types. */ function mapActionToLifecycleEvent(action: Action): ExecutionLifecycleEvent | null { if (!isExecutionLifecycleAction(action)) { return null; } // Now action is narrowed to ExecutionAction lifecycle types // All actions are PayloadAction types with properties under .payload switch (action.type) { case "RUNTIME_EXECUTION_STARTED": { const a = action as ExecutionStartedAction; return { status: "started", executionId: a.payload.executionId, turnId: a.payload.turnId, }; } case "RUNTIME_EXECUTION_COMPLETED": { const a = action as ExecutionCompletedAction; return { status: "completed", executionId: a.payload.executionId, }; } case "RUNTIME_EXECUTION_FAILED": { const a = action as ExecutionFailedAction; return { status: "failed", executionId: a.payload.executionId, error: a.payload.error, }; } case "RUNTIME_EXECUTION_ABORTED": { const a = action as ExecutionAbortedAction; return { status: "aborted", executionId: a.payload.executionId, }; } case "RUNTIME_EXECUTION_INTERRUPTED": { // ExecutionInterruptedAction has payload with executionId and interruptedTurnId const a = action as ExecutionInterruptedAction; return { status: "interrupted", executionId: a.payload.executionId, turnId: a.payload.interruptedTurnId, }; } case "RUNTIME_EXECUTION_INPUT": { // Input is a new turn, not a status change we need to broadcast return null; } default: return null; } }