/** * Collector — Message deduplication and distribution * * Deduplicates incoming messages using StateStore (or in-memory fallback). * Agent's own messages are marked seen but NOT forwarded to aggregator. * * Authoritative writer for StateStore fields: * - last_visit: updated on ingest() success * - last_msg_seq: updated on ingest() when msg has sequence */ import { logger } from '../util/logger.js'; import type { StateStore } from '../state/store.js'; import type { WindowAggregator } from './aggregator.js'; import type { RawMessage } from './types.js'; export class Collector { private aggregator: WindowAggregator; private store: StateStore | null; private agentId: string; private _memSeen = new Set(); private _memCleanupTimer: ReturnType | null = null; constructor(aggregator: WindowAggregator, store: StateStore | null = null, agentId = '') { this.aggregator = aggregator; this.store = store; this.agentId = agentId; } ingest(rawMessage: RawMessage): void { if (this._hasSeen(rawMessage.id)) { logger.debug(`[collector] DEDUP skip id=${rawMessage.id}`); return; } // Mark as seen this._markSeen(rawMessage.id, rawMessage.channelId); // Update StateStore: last_visit + last_msg_seq if (this.store) { this.store.updateLastVisit(rawMessage.channelId); if (rawMessage.seq != null) { this.store.updateLastMsgSeq(rawMessage.channelId, rawMessage.seq); } } // Agent's own messages: mark seen but don't trigger agent turns if (this.agentId && rawMessage.from === this.agentId) { logger.debug(`[collector] SELF msg id=${rawMessage.id} (marked seen, no turn)`); return; } logger.debug(`[collector] NEW msg id=${rawMessage.id} ch=${rawMessage.channelId} from=${rawMessage.nick}`); this.aggregator.append(rawMessage); } ingestBatch(rawMessages: RawMessage[]): void { for (const msg of rawMessages) { this.ingest(msg); } } private _hasSeen(messageId: string): boolean { if (this.store) return this.store.hasSeen(messageId); return this._memSeen.has(messageId); } private _markSeen(messageId: string, channelId: string): void { if (this.store) { this.store.markSeen(messageId, channelId); } else { this._memSeen.add(messageId); } } startCleanup(intervalMs = 600_000): void { if (this.store) return; // StateStore handles its own cleanup this._memCleanupTimer = setInterval(() => { this._memSeen.clear(); }, intervalMs); } stop(): void { if (this._memCleanupTimer) { clearInterval(this._memCleanupTimer); this._memCleanupTimer = null; } } }