/** * Regression test: agent_end 中 sendReply 失败时不能卡队列 * * Bug #2 (架构评审发现): * await sendReply(...) // ← 网络错误会抛 * dingTalkSessions.delete(...) // ← 永远到不了 * progress cleanup / 推进下一条 // ← 永远到不了 * * 后果: 该 conversationId 队列卡死,后续消息永远不被处理 * 用户必须 /dingtalkbot-disable + enable 才能恢复 * * 修复: 把 sendReply 包在 try/catch,cleanup 逻辑保证执行 */ import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; import { mkdtempSync, rmSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import * as os from 'node:os'; // 隔离 homedir — 避免与其他测试文件共享 ~/.pi/agent/dingtalk-bot/ let testHome: string; let originalHomedir: typeof os.homedir; // ============================================================================ // 模块 Mock // ============================================================================ const handlers = new Map>(); const allEventHandlers: { event: string; handler: Function }[] = []; const registeredTools = new Map(); const sentWebhookCalls: Array<{ url: string; body: any }> = []; // 控制 sendReply 是否抛错 let sendReplyShouldThrow = false; function makeMockPi() { const on = vi.fn((event: string, handler: any) => { if (!handlers.has(event)) handlers.set(event, new Set()); handlers.get(event)!.add(handler); allEventHandlers.push({ event, handler }); }); return { on, registerTool: vi.fn((tool: any) => { registeredTools.set(tool.name, tool); }), registerCommand: vi.fn(), registerShortcut: vi.fn(), registerFlag: vi.fn(), getFlag: vi.fn(() => undefined), getActiveTools: vi.fn(() => []), setActiveTools: vi.fn(), getAllTools: vi.fn(() => []), sendUserMessage: vi.fn(), sendMessage: vi.fn(), exec: vi.fn(), }; } // 触发事件 — 调用所有 handler,返回第一个非 undefined 的结果 async function emitEvent(event: string, payload: any): Promise { const hs = handlers.get(event); if (!hs) return undefined; for (const h of hs) { const result = await h(payload); if (result !== undefined) return result; } return undefined; } // 取出所有 handler 调用(不取返回值,允许多个) async function emitEventAll(event: string, payload: any): Promise { const hs = handlers.get(event); if (!hs) return; for (const h of hs) { await h(payload); } } vi.mock('dingtalk-stream', () => { // 用全局变量在 mock 工厂和测试间共享 client 实例引用 const g = globalThis as any; const { EventEmitter } = require('node:events'); return { DWClient: class MockDWClient extends EventEmitter { private _connected = false; constructor(public config: any) { super(); g.__mockDingTalkClient = this; } registerCallbackListener(_topic: string, _handler: any) { this._cbHandler = _handler; } _cbHandler: any = null; async connect() { this._connected = true; this.emit('connect'); } disconnect() { this._connected = false; } }, TOPIC_ROBOT: '/v1.0/im/bot/messages/get', EventAck: { SUCCESS: { status: 'SUCCESS', message: 'OK' } }, }; }); vi.mock('@mariozechner/pi-ai', () => ({ Type: new Proxy({}, { get: (_, prop: string) => (...args: any[]) => ({ type: String(prop).toLowerCase(), args, _mockType: prop, }), }), })); vi.mock('@mariozechner/pi-coding-agent', () => ({ Type: new Proxy({}, { get: (_, prop: string) => (...args: any[]) => ({ type: String(prop).toLowerCase(), args, _mockType: prop, }), }), })); // ============================================================================ // 模拟 dingTalk 流:注入 webhook 消息 // ============================================================================ let mockClient: any = null; async function injectMessage(payload: { msgId: string; senderNick: string; sessionWebhook: string; text: string; conversationId?: string; }): Promise { if (!mockClient || !mockClient._cbHandler) return; const message = { msgId: payload.msgId, senderNick: payload.senderNick, sessionWebhook: payload.sessionWebhook, conversationId: payload.conversationId || payload.sessionWebhook.match(/access_token=([^&]+)/)?.[1], text: { content: payload.text }, }; await mockClient._cbHandler({ data: JSON.stringify(message) }); } // ============================================================================ // 让 sendReply 受控地抛错 // ============================================================================ // 测试中只有“agent_end 回复”需要控制为失败, // 业务 ack 消息(“👋 收到 ...”)仍需成功否则消息根本不会被处理 const FAIL_TEXTS = new Set(['reply', 'first-reply', 'reply-text']); // 通过 mock fetch 让 webhook 调用受控 const realFetch = global.fetch; beforeEach(() => { // 默认:fetch 成功 global.fetch = vi.fn(async (url: string, opts: any) => { const body = JSON.parse(opts.body); sentWebhookCalls.push({ url, body }); // 业务 ack 始终成功 const text = body.text?.content || body.text?.text || ''; if (sendReplyShouldThrow && FAIL_TEXTS.has(text)) { return { ok: false, status: 500, statusText: 'Internal Server Error', } as any; } return { ok: true, status: 200, statusText: 'OK', json: async () => ({}), } as any; }) as any; }); afterEach(() => { global.fetch = realFetch; }); // ============================================================================ // 加载扩展并设置连接状态 // ============================================================================ async function loadExtensionAndConnect(): Promise { handlers.clear(); registeredTools.clear(); allEventHandlers.length = 0; sentWebhookCalls.length = 0; sendReplyShouldThrow = false; mockClient = null; (globalThis as any).__mockDingTalkClient = null; process.env.PI_SESSION_ID = 'test-agent-end-error'; // 使用隔离的临时 homedir,避免与其他测试文件冲突 const fs = await import('node:fs/promises'); const path = await import('node:path'); const baseDir = path.join(testHome, '.pi', 'agent', 'dingtalk-bot'); await fs.mkdir(path.join(baseDir, 'sessions'), { recursive: true }); await fs.writeFile( path.join(baseDir, 'config.json'), JSON.stringify({ bots: [{ clientId: 'test-client-id', clientSecret: 'test-secret', name: 'test-bot', }], }, null, '\t') + '\n' ); await fs.writeFile( path.join(baseDir, 'sessions', 'test-agent-end-error.json'), JSON.stringify({ activeBotId: 'test-client-id', enabled: true, }, null, '\t') + '\n' ); const mockPi = makeMockPi(); vi.resetModules(); const extModule = await import('../index.js'); await extModule.default(mockPi as any); // 触发 session_start,让扩展连接机器人 await emitEventAll('session_start', { type: 'session_start' }); // 异步抓取 client(connect 是 async,session_start 之后可能还未完成) await new Promise(r => setTimeout(r, 10)); mockClient = (globalThis as any).__mockDingTalkClient; return mockPi; } // 触发 agent_end (某条钉钉消息的回合处理完成) async function triggerAgentEnd(opts: { userMessageId: string; senderNick: string; content: string; }): Promise { const userText = `[dingtalkbot] [bot] [${opts.senderNick}] [${opts.userMessageId}]\nuser input`; await emitEventAll('agent_end', { type: 'agent_end', messages: [ { role: 'user', content: [{ type: 'text', text: userText }] }, { role: 'assistant', content: [{ type: 'text', text: opts.content }] }, ], }); } // ============================================================================ // 测试 // ============================================================================ // 使用 vi.mock 隔离 os.homedir 返回值 let mockHomedirValue = ''; vi.mock('node:os', async () => { const actual = await vi.importActual('node:os'); return { ...actual, homedir: () => mockHomedirValue, }; }); describe('agent_end 中 sendReply 失败的健壮性', () => { let mockPi: any; beforeEach(async () => { testHome = mkdtempSync(join(tmpdir(), 'pi-dingtalkbot-test-')); mockHomedirValue = testHome; mockPi = await loadExtensionAndConnect(); }); afterEach(() => { delete process.env.PI_SESSION_ID; try { rmSync(testHome, { recursive: true, force: true }); } catch {} }); it('sendReply 失败时,agent_end handler 不应该抛异常', async () => { sendReplyShouldThrow = true; // 注入一条入站消息,触发扩展入队 + 调 pi await injectMessage({ msgId: 'm1', senderNick: 'Alice', sessionWebhook: 'https://oapi.dingtalk.com/robot/send?access_token=tokenA', text: 'hi', }); // 给 pi 一个回复(agent_end 触发) // sendReply 会失败,但 handler 不应抛 await expect(triggerAgentEnd({ userMessageId: 'm1', senderNick: 'Alice', content: '回复内容' })).resolves.toBeUndefined(); }); it('sendReply 失败后,扩展应继续处理下一条消息(不卡队列)', async () => { sendReplyShouldThrow = true; // 注入两条入站消息 await injectMessage({ msgId: 'm1', senderNick: 'Alice', sessionWebhook: 'https://oapi.dingtalk.com/robot/send?access_token=tokenA', text: 'first', }); await injectMessage({ msgId: 'm2', senderNick: 'Alice', sessionWebhook: 'https://oapi.dingtalk.com/robot/send?access_token=tokenA', text: 'second', }); // 第一条 agent_end:sendReply 失败 await triggerAgentEnd({ userMessageId: 'm1', senderNick: 'Alice', content: 'first-reply' }); // 第二条应该被处理:sendUserMessage 应被再调用一次 const calls = mockPi.sendUserMessage.mock.calls; expect(calls.length).toBeGreaterThanOrEqual(2); const texts = calls.map((c: any[]) => c[0]?.[0]?.text || (typeof c[0] === 'string' ? c[0] : '') ); expect(texts.some((t: string) => t.includes('m2'))).toBe(true); }); it('sendReply 失败时,会话状态应被清理(dingTalkSessions 删除)', async () => { sendReplyShouldThrow = true; await injectMessage({ msgId: 'm1', senderNick: 'Alice', sessionWebhook: 'https://oapi.dingtalk.com/robot/send?access_token=tokenA', text: 'hi', }); await triggerAgentEnd({ userMessageId: 'm1', senderNick: 'Alice', content: 'reply' }); // 此时再调工具应该找不到 m1 对应的 session // 通过工具的 execute 函数间接验证 const sendTool = registeredTools.get('dingtalkbot-send'); expect(sendTool).toBeDefined(); // 工具返回 "无活跃会话" 而不是 hang/throw const result = await sendTool.execute('call-1', { message: 'after', messageId: 'm1' }); expect(result.content[0].text).toBe('无活跃会话'); }); it('sendReply 成功时,行为不变(控制组)', async () => { sendReplyShouldThrow = false; await injectMessage({ msgId: 'm1', senderNick: 'Alice', sessionWebhook: 'https://oapi.dingtalk.com/robot/send?access_token=tokenA', text: 'hi', }); await triggerAgentEnd({ userMessageId: 'm1', senderNick: 'Alice', content: 'reply-text' }); // webhook 被调一次("👋 收到..."ack),agent_end 时再调一次("reply-text") const replyCalls = sentWebhookCalls.filter(c => c.body && (c.body.text?.content === 'reply-text' || c.body.text?.text === 'reply-text') ); expect(replyCalls.length).toBe(1); }); it('sendReply 成功时 agent_end 后队列也能推进(控制组)', async () => { sendReplyShouldThrow = false; await injectMessage({ msgId: 'm1', senderNick: 'Alice', sessionWebhook: 'https://oapi.dingtalk.com/robot/send?access_token=tokenA', text: 'first', }); await injectMessage({ msgId: 'm2', senderNick: 'Alice', sessionWebhook: 'https://oapi.dingtalk.com/robot/send?access_token=tokenA', text: 'second', }); await triggerAgentEnd({ userMessageId: 'm1', senderNick: 'Alice', content: 'first-reply' }); const calls = mockPi.sendUserMessage.mock.calls; expect(calls.length).toBeGreaterThanOrEqual(2); const texts = calls.map((c: any[]) => c[0]?.[0]?.text || (typeof c[0] === 'string' ? c[0] : '') ); expect(texts.some((t: string) => t.includes('m2'))).toBe(true); }); });