/** * WindowAggregator — Message batching and ChannelDelta builder * * Buffers messages per channel with three flush triggers: * 1. @mention → immediate flush * 2. Buffer full → flush at maxBatchSize * 3. Timer → flush after batchWindowMs */ import { logger } from '../util/logger.js'; import type { RawMessage, ChannelDelta } from './types.js'; export interface AggregatorConfig { batchWindowMs: number; maxBatchSize: number; } interface ChannelWindow { channelId: string; messages: RawMessage[]; timer: ReturnType | null; } export class WindowAggregator { private windows = new Map(); private config: AggregatorConfig; public onDelta: ((delta: ChannelDelta) => void) | null = null; constructor(config: Partial = {}) { this.config = { batchWindowMs: config.batchWindowMs || 60_000, maxBatchSize: config.maxBatchSize || 20, }; } updateConfig(newConfig: Partial): void { Object.assign(this.config, newConfig); } append(rawMessage: RawMessage): void { const channelId = rawMessage.channelId; const win = this._getOrCreate(channelId); win.messages.push(rawMessage); // Immediate flush on @mention if (rawMessage.mentionsMe) { this.flush(channelId, 'mention'); return; } // Buffer full if (win.messages.length >= this.config.maxBatchSize) { this.flush(channelId, 'buffer_full'); return; } // Start timer if not running if (!win.timer) { win.timer = setTimeout( () => this.flush(channelId, 'timeout'), this.config.batchWindowMs, ); } } flush(channelId: string, reason: string): void { const win = this.windows.get(channelId); if (!win || win.messages.length === 0) return; const messages = win.messages.splice(0); if (win.timer) { clearTimeout(win.timer); win.timer = null; } const hasMention = messages.some(m => m.mentionsMe); const participants = [...new Set(messages.map(m => m.nick || m.from))]; const source = messages[0]?.source || 'push'; const delta: ChannelDelta = { channelId, channelName: channelId, messageCount: messages.length, participants, hasMention, urgency: hasMention ? 1.0 : Math.min(0.1 * messages.length, 0.9), messages, source, flushReason: reason, }; logger.debug(`[aggregator] FLUSH ch=${channelId} reason=${reason} msgs=${messages.length} mention=${hasMention}`); if (this.onDelta) { this.onDelta(delta); } } flushAll(): void { for (const channelId of this.windows.keys()) { this.flush(channelId, 'shutdown'); } } stop(): void { for (const win of this.windows.values()) { if (win.timer) clearTimeout(win.timer); } this.windows.clear(); } private _getOrCreate(channelId: string): ChannelWindow { let win = this.windows.get(channelId); if (!win) { win = { channelId, messages: [], timer: null }; this.windows.set(channelId, win); } return win; } }