import type { ProviderName } from '../provider/index.ts'; import { publish } from '../../events/bus.ts'; import type { ToolApprovalMode } from '../tools/approval.ts'; export type RunOpts = { sessionId: string; assistantMessageId: string; agent: string; provider: ProviderName; model: string; projectRoot: string; oneShot?: boolean; userContext?: string; reasoningText?: boolean; abortSignal?: AbortSignal; isCompactCommand?: boolean; compactionContext?: string; toolApprovalMode?: ToolApprovalMode; compactionRetries?: number; }; export type QueuedMessage = { messageId: string; position: number; }; type RunnerState = { queue: RunOpts[]; running: boolean; currentMessageId: string | null; }; // Global state for session queues const runners = new Map(); // Track active abort controllers per MESSAGE (not session) const messageAbortControllers = new Map(); function publishQueueState(sessionId: string) { const state = runners.get(sessionId); if (!state) return; const queuedMessages: QueuedMessage[] = state.queue.map((opts, index) => ({ messageId: opts.assistantMessageId, position: index, })); publish({ type: 'queue.updated', sessionId, payload: { currentMessageId: state.currentMessageId, queuedMessages, queueLength: state.queue.length, }, }); } /** * Enqueues an assistant run for a given session. * Creates an abort controller per message. */ export function enqueueAssistantRun( opts: Omit, processQueueFn: (sessionId: string) => Promise, ) { const abortController = new AbortController(); messageAbortControllers.set(opts.assistantMessageId, abortController); const state = runners.get(opts.sessionId) ?? { queue: [], running: false, currentMessageId: null, }; state.queue.push({ ...opts, abortSignal: abortController.signal }); runners.set(opts.sessionId, state); publishQueueState(opts.sessionId); if (!state.running) void processQueueFn(opts.sessionId); } /** * Aborts the currently running message for a session. * Optionally clears the queue. */ export function abortSession(sessionId: string, clearQueue = false) { const state = runners.get(sessionId); if (!state) return; // Abort the currently running message if (state.currentMessageId) { const controller = messageAbortControllers.get(state.currentMessageId); if (controller) { controller.abort(); messageAbortControllers.delete(state.currentMessageId); } } // Optionally clear the queue and abort all queued messages if (clearQueue && state.queue.length > 0) { for (const opts of state.queue) { const controller = messageAbortControllers.get(opts.assistantMessageId); if (controller) { controller.abort(); messageAbortControllers.delete(opts.assistantMessageId); } } state.queue = []; publishQueueState(sessionId); } } /** * Aborts a specific message by its ID. * If it's currently running, aborts the stream. * If it's queued, removes it from the queue. */ export function abortMessage( sessionId: string, messageId: string, ): { removed: boolean; wasRunning: boolean } { const state = runners.get(sessionId); if (!state) return { removed: false, wasRunning: false }; // Check if this is the currently running message if (state.currentMessageId === messageId) { const controller = messageAbortControllers.get(messageId); if (controller) { controller.abort(); messageAbortControllers.delete(messageId); } return { removed: true, wasRunning: true }; } // Check if it's in the queue const index = state.queue.findIndex( (opts) => opts.assistantMessageId === messageId, ); if (index !== -1) { state.queue.splice(index, 1); const controller = messageAbortControllers.get(messageId); if (controller) { controller.abort(); messageAbortControllers.delete(messageId); } publishQueueState(sessionId); return { removed: true, wasRunning: false }; } return { removed: false, wasRunning: false }; } /** * Removes a queued message (not the currently running one). */ export function removeFromQueue(sessionId: string, messageId: string): boolean { const state = runners.get(sessionId); if (!state) return false; // Don't allow removing the currently running message via this function if (state.currentMessageId === messageId) { return false; } const index = state.queue.findIndex( (opts) => opts.assistantMessageId === messageId, ); if (index === -1) return false; state.queue.splice(index, 1); const controller = messageAbortControllers.get(messageId); if (controller) { controller.abort(); messageAbortControllers.delete(messageId); } publishQueueState(sessionId); return true; } /** * Gets the current queue state for a session. */ export function getQueueState(sessionId: string): { currentMessageId: string | null; queuedMessages: QueuedMessage[]; isRunning: boolean; } | null { const state = runners.get(sessionId); if (!state) return null; return { currentMessageId: state.currentMessageId, queuedMessages: state.queue.map((opts, index) => ({ messageId: opts.assistantMessageId, position: index, })), isRunning: state.running, }; } export function getRunnerState( sessionId: string, ): { queue: RunOpts[]; running: boolean } | undefined { return runners.get(sessionId); } export function setRunning(sessionId: string, running: boolean) { const state = runners.get(sessionId); if (state) state.running = running; } export function setCurrentMessage(sessionId: string, messageId: string | null) { const state = runners.get(sessionId); if (state) { state.currentMessageId = messageId; publishQueueState(sessionId); } } export function dequeueJob(sessionId: string): RunOpts | undefined { const state = runners.get(sessionId); const job = state?.queue.shift(); if (job && state) { state.currentMessageId = job.assistantMessageId; publishQueueState(sessionId); } return job; } export function cleanupSession(sessionId: string) { const state = runners.get(sessionId); if (state && state.queue.length === 0 && !state.running) { // Clean up any lingering abort controller for current message if (state.currentMessageId) { messageAbortControllers.delete(state.currentMessageId); } state.currentMessageId = null; runners.delete(sessionId); publishQueueState(sessionId); } }