/** * Regression test: 入站消息的 ack sendReply 失败时不能卡队列 * * Bug (相邻, 与 #2 同型但触发点不同): * client.registerCallbackListener 内部: * queue.push(...) // 已入队 * await sendReply(...) // 抛 * processNextForSession(...) // 永远到不了 * * 后果: 入站消息卡在 sessionQueues 里, 永远不会被发送给 pi 处理 * 用户在钉钉侧发了消息但 AI 不响应 * * 修复: ack 包在 try/catch, 保证 processNextForSession 总是被调用 */ import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; import { mkdtempSync, rmSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; // ============================================================================ // 隔离 homedir // ============================================================================ let testHome: string; let mockHomedirValue = ''; vi.mock('node:os', async () => { const actual = await vi.importActual('node:os'); return { ...actual, homedir: () => mockHomedirValue }; }); // ============================================================================ // 测试状态 // ============================================================================ const handlers = new Map>(); const allEventHandlers: { event: string; handler: Function }[] = []; const registeredTools = new Map(); const sentWebhookCalls: Array<{ url: string; body: any }> = []; // 控制 ack 是否抛错(业务 ack = "👋 收到...") let ackShouldThrow = false; let failOnlyAcks = false; const ACK_MARKER = '👋'; function isAckBody(body: any): boolean { const text = body?.text?.content || body?.text?.text || ''; return text.startsWith(ACK_MARKER); } const realFetch = global.fetch; beforeEach(() => { global.fetch = vi.fn(async (url: string, opts: any) => { const body = JSON.parse(opts.body); sentWebhookCalls.push({ url, body }); const shouldFail = failOnlyAcks ? ackShouldThrow && isAckBody(body) : ackShouldThrow; if (shouldFail) { return { ok: false, status: 500, statusText: 'Internal Server Error' } as any; } return { ok: true, status: 200, statusText: 'OK', json: async () => ({}) } as any; }) as any; }); afterEach(() => { global.fetch = realFetch; }); function makeMockPi() { return { on: vi.fn((event: string, handler: any) => { if (!handlers.has(event)) handlers.set(event, new Set()); handlers.get(event)!.add(handler); allEventHandlers.push({ event, handler }); }), registerTool: vi.fn((tool: any) => { registeredTools.set(tool.name, tool); }), registerCommand: vi.fn(), registerShortcut: vi.fn(), registerFlag: vi.fn(), getFlag: vi.fn(() => undefined), getActiveTools: vi.fn(() => []), setActiveTools: vi.fn(), getAllTools: vi.fn(() => []), sendUserMessage: vi.fn(), sendMessage: vi.fn(), exec: vi.fn(), }; } async function emitEventAll(event: string, payload: any): Promise { const hs = handlers.get(event); if (!hs) return; for (const h of hs) await h(payload); } let mockClient: any = null; vi.mock('dingtalk-stream', () => { const g = globalThis as any; const { EventEmitter } = require('node:events'); return { DWClient: class MockDWClient extends EventEmitter { private _connected = false; constructor(public config: any) { super(); g.__mockDingTalkClient = this; } registerCallbackListener(_topic: string, _handler: any) { this._cbHandler = _handler; } _cbHandler: any = null; async connect() { this._connected = true; this.emit('connect'); } disconnect() { this._connected = false; } }, TOPIC_ROBOT: '/v1.0/im/bot/messages/get', EventAck: { SUCCESS: { status: 'SUCCESS', message: 'OK' } }, }; }); vi.mock('@mariozechner/pi-ai', () => ({ Type: new Proxy({}, { get: (_, prop: string) => (...args: any[]) => ({ type: String(prop).toLowerCase(), args, _mockType: prop, }), }), })); vi.mock('@mariozechner/pi-coding-agent', () => ({ Type: new Proxy({}, { get: (_, prop: string) => (...args: any[]) => ({ type: String(prop).toLowerCase(), args, _mockType: prop, }), }), })); // ============================================================================ // 注入 webhook 消息 // ============================================================================ async function injectMessage(payload: { msgId: string; senderNick: string; sessionWebhook: string; text: string; conversationId?: string; }): Promise { if (!mockClient || !mockClient._cbHandler) return; const message = { msgId: payload.msgId, senderNick: payload.senderNick, sessionWebhook: payload.sessionWebhook, conversationId: payload.conversationId || payload.sessionWebhook.match(/access_token=([^&]+)/)?.[1], text: { content: payload.text }, }; await mockClient._cbHandler({ data: JSON.stringify(message) }); } // ============================================================================ // 加载扩展 // ============================================================================ async function loadExtensionAndConnect(): Promise { handlers.clear(); registeredTools.clear(); allEventHandlers.length = 0; sentWebhookCalls.length = 0; ackShouldThrow = false; failOnlyAcks = false; mockClient = null; (globalThis as any).__mockDingTalkClient = null; process.env.PI_SESSION_ID = 'test-ack-error'; const fs = await import('node:fs/promises'); const path = await import('node:path'); const baseDir = path.join(testHome, '.pi', 'agent', 'dingtalk-bot'); await fs.mkdir(path.join(baseDir, 'sessions'), { recursive: true }); await fs.writeFile( path.join(baseDir, 'config.json'), JSON.stringify({ bots: [{ clientId: 'cid', clientSecret: 'sec', name: 'bot' }], }, null, '\t') + '\n' ); await fs.writeFile( path.join(baseDir, 'sessions', 'test-ack-error.json'), JSON.stringify({ activeBotId: 'cid', enabled: true }, null, '\t') + '\n' ); const mockPi = makeMockPi(); vi.resetModules(); const extModule = await import('../index.js'); await extModule.default(mockPi as any); await emitEventAll('session_start', { type: 'session_start' }); await new Promise(r => setTimeout(r, 10)); mockClient = (globalThis as any).__mockDingTalkClient; return mockPi; } // ============================================================================ // 测试 // ============================================================================ describe('入站消息 ack 失败的健壮性', () => { let mockPi: any; beforeEach(async () => { testHome = mkdtempSync(join(tmpdir(), 'pi-dingtalkbot-test-')); mockHomedirValue = testHome; mockPi = await loadExtensionAndConnect(); }); afterEach(() => { delete process.env.PI_SESSION_ID; try { rmSync(testHome, { recursive: true, force: true }); } catch {} }); it('REGRESSION: ack sendReply 失败时, 入站消息仍能被处理', async () => { // 让 ack 失败但其他发送成功 ackShouldThrow = true; failOnlyAcks = true; await injectMessage({ msgId: 'm1', senderNick: 'Alice', sessionWebhook: 'https://oapi.dingtalk.com/robot/send?access_token=tokenA', text: 'hello', }); // ⭐ 核心断言: sendUserMessage 应该被调用(消息被处理) // 修复前 ack 失败导致 processNextForSession 跳过, sendUserMessage 0 次调用 expect(mockPi.sendUserMessage).toHaveBeenCalled(); // 抓出实际发给 pi 的文本 const calls = mockPi.sendUserMessage.mock.calls; const texts = calls.map((c: any[]) => c[0]?.[0]?.text || (typeof c[0] === 'string' ? c[0] : '') ); expect(texts.some((t: string) => t.includes('m1') && t.includes('hello'))).toBe(true); }); it('ack 失败时, 至少有一条 webhook 调用被记录(ack 真的发出去了)', async () => { ackShouldThrow = true; failOnlyAcks = true; await injectMessage({ msgId: 'm1', senderNick: 'Alice', sessionWebhook: 'https://oapi.dingtalk.com/robot/send?access_token=tokenA', text: 'hello', }); // 应该有 ack 调用(即使它失败了) const ackCalls = sentWebhookCalls.filter(c => isAckBody(c.body)); expect(ackCalls.length).toBeGreaterThanOrEqual(1); }); it('控制组: ack 成功时, 消息正常被处理', async () => { ackShouldThrow = false; await injectMessage({ msgId: 'm1', senderNick: 'Alice', sessionWebhook: 'https://oapi.dingtalk.com/robot/send?access_token=tokenA', text: 'hello', }); expect(mockPi.sendUserMessage).toHaveBeenCalled(); const calls = mockPi.sendUserMessage.mock.calls; const texts = calls.map((c: any[]) => c[0]?.[0]?.text || (typeof c[0] === 'string' ? c[0] : '') ); expect(texts.some((t: string) => t.includes('m1'))).toBe(true); }); });