/** * PendingReplyStore 单元测试 * * 覆盖: * - add() 原子性 (race 防护) * - add() → resolveByConversation 流程 * - add() → cancelByConversation 流程 * - add() → timeout 流程 * - cancelAll (disconnect 场景) * - cleanupExpired (定时清理) * - start()/stop() lifecycle (interval 跟着 start 走) */ import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; import { PendingReplyStore } from '../pending-reply-store.js'; describe('PendingReplyStore', () => { let store: PendingReplyStore; beforeEach(() => { vi.useFakeTimers(); store = new PendingReplyStore({ cleanupIntervalMs: 1000, defaultTtlMs: 5000, }); }); afterEach(() => { store.stop(); vi.useRealTimers(); }); describe('add()', () => { it('注册一个 pending reply 并返回 promise', () => { const promise = store.add({ conversationId: 'c1', content: '?' }); expect(promise).toBeInstanceOf(Promise); expect(store.size).toBe(1); }); it('同 conversationId 第二次 add() 立即 reject (原子防护)', async () => { const p1 = store.add({ conversationId: 'c1', content: '?' }); expect(store.size).toBe(1); const p2 = store.add({ conversationId: 'c1', content: '?' }); await expect(p2).rejects.toThrow('该会话已有等待中的消息'); expect(store.size).toBe(1); // 第二个 add 没插入 // p1 仍然可以正常被 resolve store.resolveByConversation('c1', { reply: 'yes', message: {} }); await expect(p1).resolves.toEqual({ reply: 'yes', message: {} }); }); it('不同 conversationId 可以并行 add()', () => { store.add({ conversationId: 'c1', content: '?' }); store.add({ conversationId: 'c2', content: '?' }); expect(store.size).toBe(2); }); }); describe('resolveByConversation()', () => { it('解析匹配的 entry 并 resolve promise', async () => { const promise = store.add({ conversationId: 'c1', content: '?' }); const result = store.resolveByConversation('c1', { reply: 'yes', message: { foo: 1 } }); expect(result).toBe(true); expect(store.size).toBe(0); await expect(promise).resolves.toEqual({ reply: 'yes', message: { foo: 1 } }); }); it('无匹配时返回 false, 不抛错', () => { const result = store.resolveByConversation('unknown', { reply: 'x', message: {} }); expect(result).toBe(false); expect(store.size).toBe(0); }); it('只解析第一个匹配 (FIFO)', async () => { const p1 = store.add({ conversationId: 'c1', content: 'first' }); const p2 = store.add({ conversationId: 'c2', content: 'second' }); // 解析 c1 — p1 应该被 resolve store.resolveByConversation('c1', { reply: 'r1', message: {} }); await expect(p1).resolves.toEqual({ reply: 'r1', message: {} }); expect(store.size).toBe(1); // p2 还在等 store.resolveByConversation('c2', { reply: 'r2', message: {} }); await expect(p2).resolves.toEqual({ reply: 'r2', message: {} }); expect(store.size).toBe(0); }); }); describe('cancelByConversation()', () => { it('取消匹配的 entry 并 reject promise', async () => { const promise = store.add({ conversationId: 'c1', content: '?' }); const result = store.cancelByConversation('c1', '用户取消'); expect(result).toBe(true); expect(store.size).toBe(0); await expect(promise).rejects.toThrow('用户取消'); }); it('无匹配时返回 false', () => { const result = store.cancelByConversation('unknown', 'reason'); expect(result).toBe(false); }); it('取消后 store 不再持有该 entry', async () => { const promise = store.add({ conversationId: 'c1', content: '?' }); const promiseCaught = promise.catch(e => e); expect(store.size).toBe(1); store.cancelByConversation('c1', 'cancel'); expect(store.size).toBe(0); await promiseCaught; // swallow // 同一 conversationId 重新 add 应该被允许 const p2 = store.add({ conversationId: 'c1', content: '?' }); const p2Caught = p2.catch(e => e); expect(store.size).toBe(1); // 清理 store.cancelByConversation('c1', 'cleanup'); const err = await p2Caught; expect(err).toBeInstanceOf(Error); expect((err as Error).message).toBe('cleanup'); }); }); describe('cancelAll()', () => { it('取消所有 entry 并 reject 所有 promise', async () => { const p1 = store.add({ conversationId: 'c1', content: '?' }); const p2 = store.add({ conversationId: 'c2', content: '?' }); const p3 = store.add({ conversationId: 'c3', content: '?' }); expect(store.size).toBe(3); // attach handlers first const c1 = p1.catch(e => e); const c2 = p2.catch(e => e); const c3 = p3.catch(e => e); store.cancelAll('disconnect'); expect(store.size).toBe(0); expect((await c1).message).toBe('disconnect'); expect((await c2).message).toBe('disconnect'); expect((await c3).message).toBe('disconnect'); }); }); describe('timeout', () => { it('TTL 到了之后 promise reject', async () => { // 先 attach handler, 再 advance time — 避免 unhandled rejection const promise = store.add({ conversationId: 'c1', content: '?', ttlMs: 100 }); const caught = promise.catch(e => e); // 还没到时间 vi.advanceTimersByTime(99); expect(store.size).toBe(1); // 超时 vi.advanceTimersByTime(1); expect(store.size).toBe(0); const err = await caught; expect(err).toBeInstanceOf(Error); expect((err as Error).message).toMatch(/等待回复超时/); }); it('使用 defaultTtlMs 当未指定 ttlMs', async () => { const promise = store.add({ conversationId: 'c1', content: '?' }); // defaultTtlMs = 5000 const caught = promise.catch(e => e); vi.advanceTimersByTime(4999); expect(store.size).toBe(1); vi.advanceTimersByTime(1); expect(store.size).toBe(0); const err = await caught; expect(err).toBeInstanceOf(Error); expect((err as Error).message).toMatch(/等待回复超时/); }); it('resolve 后定时器不会再触发 (没有内存泄漏)', async () => { const promise = store.add({ conversationId: 'c1', content: '?', ttlMs: 100 }); store.resolveByConversation('c1', { reply: 'r', message: {} }); await promise; // 推进时间, 确认不会 reject vi.advanceTimersByTime(1000); // promise 仍 resolved await expect(promise).resolves.toEqual({ reply: 'r', message: {} }); }); }); describe('cleanupExpired()', () => { it('手动清理超过 ttlMs 的 entry', async () => { const promise = store.add({ conversationId: 'c1', content: '?', ttlMs: 100 }); const caught = promise.catch(e => e); vi.advanceTimersByTime(101); // 定时器已 fire, store 已被清空 expect(store.size).toBe(0); await caught; // swallow rejection }); it('start() 后定时器自动 cleanup', async () => { store.start(); const promise = store.add({ conversationId: 'c1', content: '?', ttlMs: 100 }); const caught = promise.catch(e => e); // 在 TTL 后触发 cleanup 间隔 (但 TTL 内的独立定时器先 fire) vi.advanceTimersByTime(150); // TTL 定时器 + cleanup 间隔 都没让 entry 留下 expect(store.size).toBe(0); await caught; }); }); describe('start()/stop() lifecycle', () => { it('stop 后定时器不再触发', () => { store.start(); store.stop(); // 推进时间超过 cleanup 间隔, 不应该抛错 vi.advanceTimersByTime(5000); // 如果 interval 没被清理, vitest fake timer 会警告未清理的定时器 // (断言只能间接验证 — 显式行为是 store.size 仍然准确) expect(store.size).toBe(0); }); it('start() 多次调用, 旧的 interval 被清理 (无重入)', () => { const startSpy = vi.spyOn(global, 'setInterval'); store.start(); const firstIntervalId = store['interval']; store.start(); const secondIntervalId = store['interval']; expect(startSpy).toHaveBeenCalledTimes(2); // 不同的 interval id expect(firstIntervalId).not.toBe(secondIntervalId); }); it('stop 后 cleanup 回调不应再操作 store', () => { store.start(); // 添加一个 entry store.add({ conversationId: 'c1', content: '?' }); expect(store.size).toBe(1); // stop store.stop(); // 即使 timer 已 fire, 也不应该 reject (generation check 应该让回调失效) // 这里我们通过直接调用 cleanup 来验证 — 不会有副作用 store.cleanupExpired(); expect(store.size).toBe(1); // 仍然在, 因为没过期 }); }); });