/** * Scheduler — Serial inbound queue with priority ordering * * Mention-containing deltas are pushed to the front of the queue. * Processing is serial to avoid concurrent agent turns. */ import { logger } from '../util/logger.js'; import type { ChannelDelta } from './types.js'; import type { ActionExecutor } from './executor.js'; export class Scheduler { private executor: ActionExecutor; private queue: ChannelDelta[] = []; private _processing = false; constructor(executor: ActionExecutor) { this.executor = executor; } enqueue(delta: ChannelDelta): void { if (delta.hasMention) { this.queue.unshift(delta); } else { this.queue.push(delta); } logger.debug( `[scheduler] ENQUEUE ch=${delta.channelId} pos=${delta.hasMention ? 'FRONT' : 'BACK'} queue=${this.queue.length}`, ); void this._processNext(); } private async _processNext(): Promise { if (this._processing || this.queue.length === 0) return; this._processing = true; try { const delta = this.queue.shift()!; logger.debug(`[scheduler] PROCESS ch=${delta.channelId} msgs=${delta.messageCount} remaining=${this.queue.length}`); await this.executor.handleDelta(delta); } catch (err) { logger.error(`[scheduler] ERROR processing delta: ${(err as Error).message}`); } finally { this._processing = false; if (this.queue.length > 0) { setTimeout(() => void this._processNext(), 0); } } } get queueSize(): number { return this.queue.length; } }