import { AgentRunner, AgentRunnerConnectRequest, AgentRunnerIsRunningRequest, AgentRunnerRunRequest, type AgentRunnerStopRequest, } from "./agent-runner"; import { Observable, ReplaySubject } from "rxjs"; import { AbstractAgent, BaseEvent, EventType, MessagesSnapshotEvent, RunStartedEvent, compactEvents, } from "@ag-ui/client"; import { finalizeRunEvents } from "@copilotkit/shared"; interface HistoricRun { threadId: string; runId: string; parentRunId: string | null; events: BaseEvent[]; createdAt: number; } class InMemoryEventStore { constructor(public threadId: string) {} /** The subject that current consumers subscribe to. */ subject: ReplaySubject | null = null; /** True while a run is actively producing events. */ isRunning = false; /** Current run ID */ currentRunId: string | null = null; /** Historic completed runs */ historicRuns: HistoricRun[] = []; /** Currently running agent instance (if any). */ agent: AbstractAgent | null = null; /** Subject returned from run() while the run is active. */ runSubject: ReplaySubject | null = null; /** True once stop() has been requested but the run has not yet finalized. */ stopRequested = false; /** Reference to the events emitted in the current run. */ currentEvents: BaseEvent[] | null = null; } // Use a symbol key on globalThis to survive hot reloads in development const GLOBAL_STORE_KEY = Symbol.for("@copilotkit/runtime/in-memory-store"); interface GlobalStoreData { stores: Map; historicRunsBackup: Map; } function getGlobalStore(): Map { const globalAny = globalThis as unknown as Record; if (!globalAny[GLOBAL_STORE_KEY]) { globalAny[GLOBAL_STORE_KEY] = { stores: new Map(), historicRunsBackup: new Map(), }; } const data = globalAny[GLOBAL_STORE_KEY]; // Restore historic runs from backup after hot reload // (when stores map is empty but backup has data) if (data.stores.size === 0 && data.historicRunsBackup.size > 0) { for (const [threadId, historicRuns] of data.historicRunsBackup) { const store = new InMemoryEventStore(threadId); store.historicRuns = historicRuns; data.stores.set(threadId, store); } } return data.stores; } function backupHistoricRuns( threadId: string, historicRuns: HistoricRun[], ): void { const globalAny = globalThis as unknown as Record; if (globalAny[GLOBAL_STORE_KEY]) { globalAny[GLOBAL_STORE_KEY].historicRunsBackup.set(threadId, historicRuns); } } const GLOBAL_STORE = getGlobalStore(); export class InMemoryAgentRunner extends AgentRunner { run(request: AgentRunnerRunRequest): Observable { let existingStore = GLOBAL_STORE.get(request.threadId); if (!existingStore) { existingStore = new InMemoryEventStore(request.threadId); GLOBAL_STORE.set(request.threadId, existingStore); } const store = existingStore; // Now store is const and non-null if (store.isRunning) { throw new Error("Thread already running"); } store.isRunning = true; store.currentRunId = request.input.runId; store.agent = request.agent; store.stopRequested = false; // Track seen message IDs and current run events for this run const seenMessageIds = new Set(); const currentRunEvents: BaseEvent[] = []; store.currentEvents = currentRunEvents; // Get all previously seen message IDs from historic runs const historicMessageIds = new Set(); for (const run of store.historicRuns) { for (const event of run.events) { if ("messageId" in event && typeof event.messageId === "string") { historicMessageIds.add(event.messageId); } if (event.type === EventType.RUN_STARTED) { const runStarted = event as RunStartedEvent; const messages = runStarted.input?.messages ?? []; for (const message of messages) { historicMessageIds.add(message.id); } } } } const nextSubject = new ReplaySubject(Infinity); const prevSubject = store.subject; // Update the store's subject immediately store.subject = nextSubject; // Create a subject for run() return value const runSubject = new ReplaySubject(Infinity); store.runSubject = runSubject; // Helper function to run the agent and handle errors const runAgent = async () => { // Get parent run ID for chaining const lastRun = store.historicRuns[store.historicRuns.length - 1]; const parentRunId = lastRun?.runId ?? null; try { await request.agent.runAgent(request.input, { onEvent: ({ event }) => { let processedEvent: BaseEvent = event; if (event.type === EventType.RUN_STARTED) { const runStartedEvent = event as RunStartedEvent; if (!runStartedEvent.input) { const sanitizedMessages = request.input.messages ? request.input.messages.filter( (message) => !historicMessageIds.has(message.id), ) : undefined; const updatedInput = { ...request.input, ...(sanitizedMessages !== undefined ? { messages: sanitizedMessages } : {}), }; processedEvent = { ...runStartedEvent, input: updatedInput, } as RunStartedEvent; } } runSubject.next(processedEvent); // For run() return - only agent events nextSubject.next(processedEvent); // For connect() / store - all events currentRunEvents.push(processedEvent); // Accumulate for storage }, onNewMessage: ({ message }) => { // Called for each new message if (!seenMessageIds.has(message.id)) { seenMessageIds.add(message.id); } }, onRunStartedEvent: () => { // Mark any messages from the input as seen so they aren't emitted twice if (request.input.messages) { for (const message of request.input.messages) { if (!seenMessageIds.has(message.id)) { seenMessageIds.add(message.id); } } } }, }); const appendedEvents = finalizeRunEvents(currentRunEvents, { stopRequested: store.stopRequested, }); for (const event of appendedEvents) { runSubject.next(event); nextSubject.next(event); } // Store the completed run in memory with ONLY its events if (store.currentRunId) { // Compact the events before storing (like SQLite does) const compactedEvents = compactEvents(currentRunEvents); store.historicRuns.push({ threadId: request.threadId, runId: store.currentRunId, parentRunId, events: compactedEvents, createdAt: Date.now(), }); // Backup for hot reload survival backupHistoricRuns(request.threadId, store.historicRuns); } // Complete the run store.currentEvents = null; store.currentRunId = null; store.agent = null; store.runSubject = null; store.stopRequested = false; store.isRunning = false; runSubject.complete(); nextSubject.complete(); } catch (error) { const interruptionMessage = error instanceof Error ? error.message : String(error); const appendedEvents = finalizeRunEvents(currentRunEvents, { stopRequested: store.stopRequested, interruptionMessage, }); for (const event of appendedEvents) { runSubject.next(event); nextSubject.next(event); } // Store the run even if it failed (partial events) if (store.currentRunId && currentRunEvents.length > 0) { // Compact the events before storing (like SQLite does) const compactedEvents = compactEvents(currentRunEvents); store.historicRuns.push({ threadId: request.threadId, runId: store.currentRunId, parentRunId, events: compactedEvents, createdAt: Date.now(), }); // Backup for hot reload survival backupHistoricRuns(request.threadId, store.historicRuns); } // Complete the run store.currentEvents = null; store.currentRunId = null; store.agent = null; store.runSubject = null; store.stopRequested = false; store.isRunning = false; runSubject.complete(); nextSubject.complete(); } }; // Bridge previous events if they exist if (prevSubject) { prevSubject.subscribe({ next: (e) => nextSubject.next(e), error: (err) => nextSubject.error(err), complete: () => { // Don't complete nextSubject here - it needs to stay open for new events }, }); } // Start the agent execution immediately (not lazily) runAgent(); // Return the run subject (only agent events, no injected messages) return runSubject.asObservable(); } connect(request: AgentRunnerConnectRequest): Observable { const store = GLOBAL_STORE.get(request.threadId); const connectionSubject = new ReplaySubject(Infinity); if (!store) { // No store means no events connectionSubject.complete(); return connectionSubject.asObservable(); } // Collect all historic events from memory const allHistoricEvents: BaseEvent[] = []; for (const run of store.historicRuns) { allHistoricEvents.push(...run.events); } // Apply compaction to all historic events together (like SQLite) const compactedEvents = compactEvents(allHistoricEvents); // Emit compacted events and track message IDs const emittedMessageIds = new Set(); for (const event of compactedEvents) { connectionSubject.next(event); if ("messageId" in event && typeof event.messageId === "string") { emittedMessageIds.add(event.messageId); } } // Bridge active run to connection if exists if (store.subject && (store.isRunning || store.stopRequested)) { store.subject.subscribe({ next: (event) => { // Skip message events that we've already emitted from historic if ( "messageId" in event && typeof event.messageId === "string" && emittedMessageIds.has(event.messageId) ) { return; } connectionSubject.next(event); }, complete: () => connectionSubject.complete(), error: (err) => connectionSubject.error(err), }); } else { // No active run, complete after historic events connectionSubject.complete(); } return connectionSubject.asObservable(); } isRunning(request: AgentRunnerIsRunningRequest): Promise { const store = GLOBAL_STORE.get(request.threadId); return Promise.resolve(store?.isRunning ?? false); } stop(request: AgentRunnerStopRequest): Promise { const store = GLOBAL_STORE.get(request.threadId); if (!store || !store.isRunning) { return Promise.resolve(false); } if (store.stopRequested) { return Promise.resolve(false); } store.stopRequested = true; store.isRunning = false; const agent = store.agent; if (!agent) { store.stopRequested = false; store.isRunning = false; return Promise.resolve(false); } try { agent.abortRun(); return Promise.resolve(true); } catch (error) { console.error("Failed to abort agent run", error); store.stopRequested = false; store.isRunning = true; return Promise.resolve(false); } } }