/** * PendingReplyStore — 跟踪"主动发送并等待用户回复"的 in-flight 消息 * * 解决的问题: * - 之前 5 个 caller 直接操作 Map + 自己处理定时器/resolve/reject * - check-then-add 是 race condition * - cleanupInterval 生命周期跟 extension 走, disconnect 后失效 * * 设计: * - 5 个 method 对应 5 个 caller 类型 (无 seam 泄漏) * - add() 原子: 同 conversationId 已 pending 时直接 reject * - start()/stop() 封装 setInterval, 跟 connection 走 * * 不负责: * - 发送消息 (sendReply) — caller 在 add 之前自己发 * - 已知遗留 bug: send 与 add 之间的 race window (用户在极短时间内回复会被丢进普通队列) */ export type ReplyPayload = { reply: string; message: any }; export interface AddEntry { conversationId: string; content: string; /** TTL in ms; 默认 300_000 (5 分钟) */ ttlMs?: number; } interface InternalEntry { messageId: string; conversationId: string; content: string; timestamp: number; timeout: NodeJS.Timeout; resolve: (value: ReplyPayload) => void; reject: (reason: Error) => void; } export interface PendingReplyStoreOptions { /** cleanup 间隔; 默认 60_000 (1 分钟) */ cleanupIntervalMs?: number; /** add() 没指定 ttlMs 时使用; 默认 300_000 (5 分钟) */ defaultTtlMs?: number; } const DEFAULT_CLEANUP_INTERVAL_MS = 60 * 1000; const DEFAULT_TTL_MS = 5 * 60 * 1000; export class PendingReplyStore { private readonly entries = new Map(); private interval: NodeJS.Timeout | null = null; private readonly cleanupIntervalMs: number; private readonly defaultTtlMs: number; private cleanupGeneration = 0; constructor(opts?: PendingReplyStoreOptions) { this.cleanupIntervalMs = opts?.cleanupIntervalMs ?? DEFAULT_CLEANUP_INTERVAL_MS; this.defaultTtlMs = opts?.defaultTtlMs ?? DEFAULT_TTL_MS; } /** * 注册一个 pending reply。 * 原子: 同 conversationId 已存在 pending 时, 直接 reject (无 race)。 * * Returns a Promise that resolves on user reply (via resolveByConversation) * or rejects on timeout / cancel. */ add(entry: AddEntry): Promise { const ttl = entry.ttlMs ?? this.defaultTtlMs; // 原子检查 — 找到 conversationId 已 pending 就拒绝 for (const existing of this.entries.values()) { if (existing.conversationId === entry.conversationId) { return Promise.reject( new Error("该会话已有等待中的消息,请等待用户回复或取消等待") ); } } const messageId = `pending-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 6)}`; return new Promise((resolve, reject) => { const timeout = setTimeout(() => { // 超时: 删除并 reject this.entries.delete(messageId); reject(new Error(`等待回复超时(${Math.round(ttl / 1000)}秒)`)); }, ttl); this.entries.set(messageId, { messageId, conversationId: entry.conversationId, content: entry.content, timestamp: Date.now(), timeout, resolve, reject, }); }); } /** * 解析第一个匹配 conversationId 的 pending reply。 * Returns true if a reply was resolved, false if no match. */ resolveByConversation(conversationId: string, payload: ReplyPayload): boolean { for (const [id, entry] of this.entries) { if (entry.conversationId === conversationId) { clearTimeout(entry.timeout); this.entries.delete(id); entry.resolve(payload); return true; } } return false; } /** * 取消第一个匹配 conversationId 的 pending reply。 * Returns true if cancelled, false if no match. */ cancelByConversation(conversationId: string, reason: string): boolean { for (const [id, entry] of this.entries) { if (entry.conversationId === conversationId) { clearTimeout(entry.timeout); this.entries.delete(id); entry.reject(new Error(reason)); return true; } } return false; } /** * 取消所有 pending reply (e.g. disconnect 时调用)。 */ cancelAll(reason: string): void { for (const entry of this.entries.values()) { clearTimeout(entry.timeout); entry.reject(new Error(reason)); } this.entries.clear(); } /** * 清理过期 entry (timestamp + ttlMs < now)。 * 由 start() 启动的定时器自动调用, 也可手动调用。 */ cleanupExpired(ttlMs: number = this.defaultTtlMs): number { const now = Date.now(); let cleaned = 0; for (const [id, entry] of this.entries) { if (now - entry.timestamp > ttlMs) { clearTimeout(entry.timeout); this.entries.delete(id); entry.reject(new Error("等待回复超时")); cleaned++; } } return cleaned; } /** * 启动定期清理。Lifecycle: connect() 时调用。 * 如果已经启动, 先停掉旧的再启新的 (防重入)。 */ start(): void { this.stop(); // 使用 generation 防止 stop 后回调还在跑 const myGen = ++this.cleanupGeneration; this.interval = setInterval(() => { if (this.cleanupGeneration !== myGen) return; // 已被新的 start 取代 this.cleanupExpired(); }, this.cleanupIntervalMs); } /** * 停止定期清理。Lifecycle: disconnect() 时调用。 */ stop(): void { this.cleanupGeneration++; // 让任何在跑的回调失效 if (this.interval) { clearInterval(this.interval); this.interval = null; } } get size(): number { return this.entries.size; } }