/** * Regression test: agent_end 不再发错用户 (Bug #5) * * Bug (架构评审发现): * agent_end 试图从 txt (assistant 响应) 中正则匹配 [dingtalkbot] [...] 前缀 * 提取 messageId。但 txt 是 assistant 的最终响应, 不含该前缀 → regex 永远不匹配 * → fallback 到 getLatestSession() 返回最新入队的 session * → 多用户并发时 agent_end 把用户 A 的回复发给用户 B * * 修复: 用 processNextForSession 设置的 currentProcessingMessageId 闭包变量 * 找对应 session (它精确指向当前正在处理的消息) */ import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; import { mkdtempSync, rmSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; let testHome: string; let mockHomedirValue = ''; vi.mock('node:os', async () => { const actual = await vi.importActual('node:os'); return { ...actual, homedir: () => mockHomedirValue }; }); const handlers = new Map>(); const allEventHandlers: { event: string; handler: Function }[] = []; const registeredTools = new Map(); const sentWebhookCalls: Array<{ url: string; body: any }> = []; function makeMockPi() { return { on: vi.fn((event: string, handler: any) => { if (!handlers.has(event)) handlers.set(event, new Set()); handlers.get(event)!.add(handler); allEventHandlers.push({ event, handler }); }), registerTool: vi.fn((tool: any) => { registeredTools.set(tool.name, tool); }), registerCommand: vi.fn(), registerShortcut: vi.fn(), registerFlag: vi.fn(), getFlag: vi.fn(() => undefined), getActiveTools: vi.fn(() => []), setActiveTools: vi.fn(), getAllTools: vi.fn(() => []), sendUserMessage: vi.fn(), sendMessage: vi.fn(), exec: vi.fn(), }; } async function emitEventAll(event: string, payload: any): Promise { const hs = handlers.get(event); if (!hs) return; for (const h of hs) await h(payload); } let mockClient: any = null; vi.mock('dingtalk-stream', () => { const g = globalThis as any; const { EventEmitter } = require('node:events'); return { DWClient: class MockDWClient extends EventEmitter { private _connected = false; constructor(public config: any) { super(); g.__mockDingTalkClient = this; } registerCallbackListener(_topic: string, _handler: any) { this._cbHandler = _handler; } _cbHandler: any = null; async connect() { this._connected = true; this.emit('connect'); } disconnect() { this._connected = false; } }, TOPIC_ROBOT: '/v1.0/im/bot/messages/get', EventAck: { SUCCESS: { status: 'SUCCESS', message: 'OK' } }, }; }); vi.mock('@mariozechner/pi-ai', () => ({ Type: new Proxy({}, { get: (_, prop: string) => (...args: any[]) => ({ type: String(prop).toLowerCase(), args, _mockType: prop, }), }), })); vi.mock('@mariozechner/pi-coding-agent', () => ({ Type: new Proxy({}, { get: (_, prop: string) => (...args: any[]) => ({ type: String(prop).toLowerCase(), args, _mockType: prop, }), }), })); const realFetch = global.fetch; beforeEach(() => { global.fetch = vi.fn(async (_url: string, opts: any) => { const body = JSON.parse(opts.body); sentWebhookCalls.push({ url: _url, body }); return { ok: true, status: 200, statusText: 'OK', json: async () => ({}) } as any; }) as any; }); afterEach(() => { global.fetch = realFetch; }); async function injectMessage(payload: { msgId: string; senderNick: string; sessionWebhook: string; text: string; }): Promise { if (!mockClient || !mockClient._cbHandler) return; const message = { msgId: payload.msgId, senderNick: payload.senderNick, sessionWebhook: payload.sessionWebhook, conversationId: payload.sessionWebhook.match(/access_token=([^&]+)/)?.[1], text: { content: payload.text }, }; await mockClient._cbHandler({ data: JSON.stringify(message) }); } async function loadExtension(): Promise { handlers.clear(); registeredTools.clear(); allEventHandlers.length = 0; sentWebhookCalls.length = 0; mockClient = null; (globalThis as any).__mockDingTalkClient = null; process.env.PI_SESSION_ID = 'test-agent-end-routing'; const fs = await import('node:fs/promises'); const path = await import('node:path'); const baseDir = path.join(testHome, '.pi', 'agent', 'dingtalk-bot'); await fs.mkdir(path.join(baseDir, 'sessions'), { recursive: true }); await fs.writeFile( path.join(baseDir, 'config.json'), JSON.stringify({ bots: [{ clientId: 'cid', clientSecret: 'sec', name: 'bot' }], }, null, '\t') + '\n' ); await fs.writeFile( path.join(baseDir, 'sessions', 'test-agent-end-routing.json'), JSON.stringify({ activeBotId: 'cid', enabled: true }, null, '\t') + '\n' ); const mockPi = makeMockPi(); vi.resetModules(); const extModule = await import('../index.js'); await extModule.default(mockPi as any); await emitEventAll('session_start', { type: 'session_start' }); await new Promise(r => setTimeout(r, 10)); mockClient = (globalThis as any).__mockDingTalkClient; return mockPi; } /** 模拟 pi 处理某条消息后触发 agent_end * - userMessageId: 该轮处理的 user 消息的 messageId * - senderNick: 该轮对应的用户昵称 * - content: assistant 的响应文本 * * 修复后从 user 消息文本 [dingtalkbot] [...] [nick] [messageId] 提取 messageId */ async function triggerAgentEndForUser(opts: { userMessageId: string; senderNick: string; content: string; }): Promise { const userText = `[dingtalkbot] [bot] [${opts.senderNick}] [${opts.userMessageId}]\nuser input`; await emitEventAll('agent_end', { type: 'agent_end', messages: [ { role: 'user', content: [{ type: 'text', text: userText }] }, { role: 'assistant', content: [{ type: 'text', text: opts.content }] }, ], }); } // ============================================================================ describe('agent_end 路由到正确的会话 (Bug #5)', () => { let mockPi: any; beforeEach(async () => { testHome = mkdtempSync(join(tmpdir(), 'pi-dingtalkbot-test-')); mockHomedirValue = testHome; mockPi = await loadExtension(); }); afterEach(() => { delete process.env.PI_SESSION_ID; try { rmSync(testHome, { recursive: true, force: true }); } catch {} }); it('单用户场景: agent_end 的回复发到正确的 webhook', async () => { const userAWebhook = 'https://oapi.dingtalk.com/robot/send?access_token=tokenA'; await injectMessage({ msgId: 'msgA', senderNick: 'Alice', sessionWebhook: userAWebhook, text: 'hi', }); expect(mockPi.sendUserMessage).toHaveBeenCalled(); await triggerAgentEndForUser({ userMessageId: 'msgA', senderNick: 'Alice', content: 'reply-A' }); // 唯一一次 webhook 调用应该是发给 Alice 的(不算 ack) const aliceReplies = sentWebhookCalls.filter(c => c.url === userAWebhook && (c.body.text?.content === 'reply-A' || c.body.text?.text === 'reply-A') ); expect(aliceReplies.length).toBe(1); }); it('REGRESSION (核心 bug): 多用户并发, agent_end 的回复发到对应用户而非最新入队的', async () => { // 顺序注入两个用户的消息 // 用户 A 先到, 用户 B 后到 (B 的 timestamp 更新) // 模拟 pi 先处理 A 的 turn, 再处理 B 的 turn const userAWebhook = 'https://oapi.dingtalk.com/robot/send?access_token=tokenA'; const userBWebhook = 'https://oapi.dingtalk.com/robot/send?access_token=tokenB'; await injectMessage({ msgId: 'msgA', senderNick: 'Alice', sessionWebhook: userAWebhook, text: 'A-msg', }); // 等 A 处理完, session 删除, 再注入 B // (实际上 pi 会串行处理每个 turn 的 agent_end, 这里模拟相同效果) await triggerAgentEndForUser({ userMessageId: 'msgA', senderNick: 'Alice', content: 'reply-A' }); sentWebhookCalls.length = 0; // 清掉 A 的 ack + reply // 现在注入 B await injectMessage({ msgId: 'msgB', senderNick: 'Bob', sessionWebhook: userBWebhook, text: 'B-msg', }); await triggerAgentEndForUser({ userMessageId: 'msgB', senderNick: 'Bob', content: 'reply-B' }); // 关键断言: B 的回复应该发到 B 的 webhook const bobReplies = sentWebhookCalls.filter(c => c.url === userBWebhook && (c.body.text?.content === 'reply-B' || c.body.text?.text === 'reply-B') ); expect(bobReplies.length).toBe(1); // 不应该有任何“reply-B”发到 Alice 的 webhook const wrongAliceReplies = sentWebhookCalls.filter(c => c.url === userAWebhook && (c.body.text?.content === 'reply-B' || c.body.text?.text === 'reply-B') ); expect(wrongAliceReplies.length).toBe(0); }); it('多用户并发(不清理就 trigger agent_end): 不发错用户', async () => { const userAWebhook = 'https://oapi.dingtalk.com/robot/send?access_token=tokenA'; const userBWebhook = 'https://oapi.dingtalk.com/robot/send?access_token=tokenB'; // 不清理 dingTalkSessions, 直接注入两个用户的消息 await injectMessage({ msgId: 'msgA', senderNick: 'Alice', sessionWebhook: userAWebhook, text: 'A-msg', }); await injectMessage({ msgId: 'msgB', senderNick: 'Bob', sessionWebhook: userBWebhook, text: 'B-msg', }); // 此时两个 session 都在 dingTalkSessions 里, A 先到 (timestamp 小) // 修复前: getLatestSession 返回 B, A 的回复会被发到 B // 修复后: 从 user 消息文本提取 messageId, 回复发到对应用户 await triggerAgentEndForUser({ userMessageId: 'msgA', senderNick: 'Alice', content: 'reply-A' }); const aliceReplies = sentWebhookCalls.filter(c => c.url === userAWebhook && (c.body.text?.content === 'reply-A' || c.body.text?.text === 'reply-A') ); expect(aliceReplies.length).toBe(1); // B 不应该收到 A 的回复 const wrongBobReplies = sentWebhookCalls.filter(c => c.url === userBWebhook && (c.body.text?.content === 'reply-A' || c.body.text?.text === 'reply-A') ); expect(wrongBobReplies.length).toBe(0); }); it('无 currentProcessingMessageId 时 (用户直接在 pi 输入), 不发送任何回复', async () => { // 直接触发 agent_end, 不注入任何消息 (currentProcessingMessageId 为 null) // agent_end 事件里不带 [dingtalkbot] 前缀的 user 消息 await emitEventAll('agent_end', { type: 'agent_end', messages: [ { role: 'user', content: [{ type: 'text', text: 'plain user input without prefix' }] }, { role: 'assistant', content: [{ type: 'text', text: 'reply' }] }, ], }); // 不应该有任何 webhook 调用 const replyCalls = sentWebhookCalls.filter(c => (c.body.text?.content === 'reply' || c.body.text?.text === 'reply') ); expect(replyCalls.length).toBe(0); }); it('disconnect 后 agent_end 不发送 (即使有遗留的 currentProcessingMessageId)', async () => { const userAWebhook = 'https://oapi.dingtalk.com/robot/send?access_token=tokenA'; await injectMessage({ msgId: 'msgA', senderNick: 'Alice', sessionWebhook: userAWebhook, text: 'hi', }); // 模拟 disconnect await emitEventAll('session_shutdown', { type: 'session_shutdown' }); sentWebhookCalls.length = 0; // 现在 connected = false, agent_end handler 应该提前 return await triggerAgentEndForUser({ userMessageId: 'msgA', senderNick: 'Alice', content: 'reply-after-disconnect' }); const replyCalls = sentWebhookCalls.filter(c => (c.body.text?.content === 'reply-after-disconnect' || c.body.text?.text === 'reply-after-disconnect') ); expect(replyCalls.length).toBe(0); }); });