import { describe, expect, test, vi } from "vitest"; import type { WecomBotInboundMessage as WecomInboundMessage } from "../types/index.js"; import type { WecomWebhookTarget } from "./types.js"; import { StreamStore } from "./state.js"; describe("wecom StreamStore queue", () => { test("does not merge into active batch; flushes queued batch after active finishes", async () => { vi.useFakeTimers(); try { const store = new StreamStore(); const flushed: string[] = []; store.setFlushHandler((pending) => flushed.push(pending.streamId)); const target = { account: {} as any, config: {} as any, runtime: {}, core: {} as any, path: "/wecom", } satisfies WecomWebhookTarget; const conversationKey = "wecom:default:U:C"; const msg1 = { msgid: "M1" } satisfies WecomInboundMessage; const msg2 = { msgid: "M2" } satisfies WecomInboundMessage; const r1 = store.addPendingMessage({ conversationKey, target, msg: msg1, msgContent: "1", nonce: "n", timestamp: "t", debounceMs: 10, }); const r2 = store.addPendingMessage({ conversationKey, target, msg: msg2, msgContent: "2", nonce: "n", timestamp: "t", debounceMs: 10, }); expect(r1.status).toBe("active_new"); // 初始批次不接收合并:第二条进入 queued expect(r2.status).toBe("queued_new"); expect(r2.streamId).not.toBe(r1.streamId); // Follow-ups within queued should merge into queued (status queued_merged). const r3 = store.addPendingMessage({ conversationKey, target, msg: { msgid: "M3" } as any, msgContent: "3", nonce: "n", timestamp: "t", debounceMs: 10, }); expect(r3.status).toBe("queued_merged"); expect(r3.streamId).toBe(r2.streamId); // Active batch flushes at debounce time. await vi.advanceTimersByTimeAsync(11); expect(flushed).toEqual([r1.streamId]); // Queued batch timer also fires, but cannot flush until active finishes. await vi.advanceTimersByTimeAsync(11); expect(flushed).toEqual([r1.streamId]); // Once the active stream finishes, queued batch is promoted and flushes immediately. store.onStreamFinished(r1.streamId); expect(flushed).toEqual([r1.streamId, r2.streamId]); } finally { vi.useRealTimers(); } }); test("merges into active batch when it has not started yet (even after promotion)", async () => { vi.useFakeTimers(); try { const store = new StreamStore(); const flushed: string[] = []; store.setFlushHandler((pending) => flushed.push(pending.streamId)); const target = { account: {} as any, config: {} as any, runtime: {}, core: {} as any, path: "/wecom", } satisfies WecomWebhookTarget; const conversationKey = "wecom:default:U:C2"; // 1 becomes active and flushes; mark as started to simulate "processing started". const r1 = store.addPendingMessage({ conversationKey, target, msg: { msgid: "M1" } as any, msgContent: "1", nonce: "n", timestamp: "t", debounceMs: 10, }); store.markStarted(r1.streamId); await vi.advanceTimersByTimeAsync(11); expect(flushed).toEqual([r1.streamId]); // 2 enters queued with a longer debounce; it should NOT become readyToFlush yet. const r2 = store.addPendingMessage({ conversationKey, target, msg: { msgid: "M2" } as any, msgContent: "2", nonce: "n", timestamp: "t", debounceMs: 100, }); expect(flushed).toEqual([r1.streamId]); // Finish 1, promote 2 to active (but do NOT flush immediately since it's not readyToFlush). store.onStreamFinished(r1.streamId); expect(flushed).toEqual([r1.streamId]); // Now 2 is active, but (in real monitor) it may still be in debounce before markStarted. // We simulate that by NOT calling markStarted. Follow-up should merge into active (same streamId). const r3 = store.addPendingMessage({ conversationKey, target, msg: { msgid: "M3" } as any, msgContent: "3", nonce: "n", timestamp: "t", debounceMs: 10, }); expect(r3.streamId).toBe(r2.streamId); expect(r3.status).toBe("active_merged"); } finally { vi.useRealTimers(); } }); test("clears conversation state when idle so next message becomes active", async () => { const store = new StreamStore(); store.setFlushHandler(() => { }); const target = { account: {} as any, config: {} as any, runtime: {}, core: {} as any, path: "/wecom", } satisfies WecomWebhookTarget; const conversationKey = "wecom:default:U:idle"; const r1 = store.addPendingMessage({ conversationKey, target, msg: { msgid: "M1" } as any, msgContent: "1", nonce: "n", timestamp: "t", debounceMs: 10, }); store.markStarted(r1.streamId); store.markFinished(r1.streamId); store.onStreamFinished(r1.streamId); const r2 = store.addPendingMessage({ conversationKey, target, msg: { msgid: "M2" } as any, msgContent: "2", nonce: "n", timestamp: "t", debounceMs: 10, }); expect(r2.status).toBe("active_new"); expect(r2.streamId).not.toBe(r1.streamId); }); });