/** * pi-dingtalkbot * * 钉钉智能机器人 Stream 长连接扩展 for pi * 支持多个机器人配置和快速切换 */ import { mkdir, readFile, stat, unlink, writeFile } from "node:fs/promises"; import { basename, dirname, join } from "node:path"; import { homedir } from "node:os"; import { DWClient, TOPIC_ROBOT, EventAck } from "dingtalk-stream"; import type { ExtensionAPI, ExtensionContext } from "@mariozechner/pi-coding-agent"; import { Type } from "@mariozechner/pi-ai"; import { PendingReplyStore } from "./pending-reply-store.js"; // ============================================================================ // Config // ============================================================================ interface BotConfig { clientId: string; clientSecret: string; name?: string; } interface GlobalConfig { bots: BotConfig[]; } interface SessionConfig { activeBotId?: string; enabled?: boolean; } // 钉钉会话上下文 - 跟踪每个用户的会话 interface DingTalkSession { messageId: string; // 消息ID senderNick: string; // 发送者昵称 sessionWebhook: string; // 会话 Webhook conversationId?: string; // 会话ID(用于关联主动发送的消息) timestamp: number; // 最后活跃时间 } // 等待回复的状态 → 已在 pending-reply-store.ts 中独立定义 // ============================================================================ // HTTP API // ============================================================================ async function sendMessage(sessionWebhook: string, msgtype: string, content: any): Promise { const res = await fetch(sessionWebhook, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ msgtype, [msgtype]: content }) }); if (!res.ok) throw new Error(`发送消息失败: HTTP ${res.status}`); } // 检测内容是否包含 markdown 语法 function containsMarkdown(text: string): boolean { const markdownPatterns = [ /^#{1,6}\s/m, // 标题 /\*\*.*?\*\*/, // 粗体 /\*.*?\*/, // 斜体 /`{1,3}[^`]+`{1,3}/, // 代码 /\[.*?\]\(.*?\)/, // 链接 /!\[.*?\]\(.*?\)/, // 图片 /^\s*[-*+]\s/m, // 列表 /^\s*\d+\.\s/m, // 有序列表 /^\s*>\s/m, // 引用 /\|.*\|.*\|/, // 表格 /-{3,}/, // 分割线 ]; return markdownPatterns.some(pattern => pattern.test(text)); } // 发送消息(自动检测类型) async function sendReply(sessionWebhook: string, text: string): Promise { const trimmedText = text.trim(); if (!trimmedText) return; if (containsMarkdown(trimmedText)) { await sendMessage(sessionWebhook, "markdown", { title: "消息", text: trimmedText }); } else { await sendMessage(sessionWebhook, "text", { content: trimmedText }); } } // ============================================================================ // Utils // ============================================================================ function getSessionId(): string { return process.env.PI_SESSION_ID || process.env.PI_INSTANCE_ID || `sess-${Date.now().toString(36)}-${Math.random().toString(36).substring(2, 6)}`; } // 路径结构 (v1.1+): // ~/.pi/agent/dingtalk-bot/ // ├── config.json — 全局配置 (机器人数组) // └── sessions/{id}.json — 会话配置 (activeBotId, enabled) // // 历史路径 (v1.0): // ~/.pi/agent/dingtalk-bot.json // ~/.pi/agent/dingtalk-bot-session-{id}.json // // loadXxx() 优先读新路径, 回退到旧路径 (向后兼容) // saveXxx() 写新路径, 成功后删除旧路径文件 (迁移) function getBaseDir(): string { return join(homedir(), ".pi", "agent", "dingtalk-bot"); } function getConfigPath(): string { return join(getBaseDir(), "config.json"); } function getLegacyConfigPath(): string { return join(homedir(), ".pi", "agent", "dingtalk-bot.json"); } function getSessionConfigPath(sessionId: string): string { return join(getBaseDir(), "sessions", `${sessionId}.json`); } function getLegacySessionConfigPath(sessionId: string): string { return join(homedir(), ".pi", "agent", `dingtalk-bot-session-${sessionId}.json`); } async function loadGlobalConfig(): Promise { // 优先新路径 try { const data = JSON.parse(await readFile(getConfigPath(), "utf8")); return { bots: data.bots || [] }; } catch {} // fallback: 旧路径 try { const data = JSON.parse(await readFile(getLegacyConfigPath(), "utf8")); return { bots: data.bots || [] }; } catch {} return { bots: [] }; } async function saveGlobalConfig(c: GlobalConfig) { await mkdir(dirname(getConfigPath()), { recursive: true }); await writeFile(getConfigPath(), JSON.stringify(c, null, "\t") + "\n"); // 迁移: 删除旧路径文件(如果有) try { await unlink(getLegacyConfigPath()); } catch {} } async function loadSessionConfig(): Promise { try { const data = JSON.parse(await readFile(getSessionConfigPath(SESSION_ID), "utf8")); return { activeBotId: data.activeBotId, enabled: data.enabled ?? true }; } catch {} try { const data = JSON.parse(await readFile(getLegacySessionConfigPath(SESSION_ID), "utf8")); return { activeBotId: data.activeBotId, enabled: data.enabled ?? true }; } catch {} return { enabled: true }; } async function saveSessionConfig(c: SessionConfig) { await mkdir(dirname(getSessionConfigPath(SESSION_ID)), { recursive: true }); await writeFile(getSessionConfigPath(SESSION_ID), JSON.stringify(c, null, "\t") + "\n"); try { await unlink(getLegacySessionConfigPath(SESSION_ID)); } catch {} } function getBotDisplayName(bot: BotConfig): string { return bot.name || bot.clientId; } const PROMPT = ` [dingtalkbot] 钉钉机器人已连接 - 收到 @机器人 的消息会自动处理 - 回复会自动发送到对应用户的会话 - 使用 dingtalkbot-attach 发送文件`; // ============================================================================ // Extension // ============================================================================ const SESSION_ID = getSessionId(); export default function (pi: ExtensionAPI) { let globalBots: BotConfig[] = []; let sessionCfg: SessionConfig = { enabled: true }; let activeBotConfig: BotConfig | null = null; let currentCtx: ExtensionContext | null = null; let client: DWClient | null = null; let connected = false; // 钉钉会话映射表 - 用于回复时找到对应的 webhook const dingTalkSessions = new Map(); // 【增强3】会话独立队列 - 按会话分组,不同用户独立处理 const sessionQueues = new Map>(); const sessionProcessing = new Map(); // 等待回复的状态存储 (独立模块) const pendingReplyStore = new PendingReplyStore({ cleanupIntervalMs: 60 * 1000, defaultTtlMs: 5 * 60 * 1000, }); // 消息进度跟踪(持续通知) const messageTimeouts = new Map(); // 【增强】持续进度通知时间点:5分钟、15分钟、30分钟、1小时 const PROGRESS_NOTIFY_POINTS = [ { delay: 5 * 60 * 1000, message: "⏳ 还在处理中,请耐心等待..." }, { delay: 15 * 60 * 1000, message: "⏳ 处理时间较长,请继续等待..." }, { delay: 30 * 60 * 1000, message: "⏳ 仍在处理中,可能需要较长时间..." }, { delay: 60 * 60 * 1000, message: "⏳ 已处理超过1小时,感谢您的耐心..." }, ]; const PROGRESS_NOTIFY_INTERVAL = 60 * 1000; // 1分钟后开始检查进度通知 // 已处理的消息ID(用于去重,防止同一消息被处理多次) const processedMessages = new Set(); const PROCESSED_CLEANUP_INTERVAL = 60 * 1000; // 1分钟清理一次 const PROCESSED_MESSAGE_TTL = 10 * 60 * 1000; // 10分钟后从记录中移除 // 当前正在处理的消息ID(用于等待处理完成) let currentProcessingMessageId: string | null = null; // 获取指定会话的队列长度(前面还有几条消息) function getSessionQueueLength(conversationId: string): number { return sessionQueues.get(conversationId)?.length || 0; } // 会话是否正在处理 function isSessionProcessing(conversationId: string): boolean { return sessionProcessing.get(conversationId) || false; } // 设置会话处理状态 function setSessionProcessing(conversationId: string, processing: boolean) { sessionProcessing.set(conversationId, processing); } // 生成唯一消息ID function generateMessageId(): string { return `msg-${Date.now().toString(36)}-${Math.random().toString(36).substring(2, 6)}`; } // 从 webhook 提取会话标识 function extractConversationId(webhook: string): string { // webhook 格式: https://oapi.dingtalk.com/robot/send?access_token=xxx // 使用 access_token 作为会话标识 const match = webhook.match(/access_token=([^&]+)/); return match?.[1] || webhook; } // 清理已处理消息记录 function cleanupProcessedMessages() { // 定期清理过期的已处理记录,防止内存泄漏 // 注意:这里只是简单记录,10分钟后自动过期 } // 检查消息是否已处理过 function isMessageProcessed(messageId: string): boolean { return processedMessages.has(messageId); } // 标记消息已处理 function markMessageProcessed(messageId: string) { processedMessages.add(messageId); // 10分钟后自动移除(简化处理,不使用额外定时器) setTimeout(() => { processedMessages.delete(messageId); }, PROCESSED_MESSAGE_TTL); } // 记录已发送的通知时间点(避免重复发送) const notifiedPoints = new Map>(); // 【增强】发送处理进度通知(持续通知) async function sendProgressNotification( messageId: string, sessionWebhook: string, senderNick: string, elapsedMs: number ): Promise { if (currentProcessingMessageId !== messageId) return; // 消息已被处理完 // 获取该消息已发送的通知时间点 if (!notifiedPoints.has(messageId)) { notifiedPoints.set(messageId, new Set()); } const sent = notifiedPoints.get(messageId)!; // 检查并发送符合条件的进度通知 for (const point of PROGRESS_NOTIFY_POINTS) { if (!sent.has(point.delay) && elapsedMs >= point.delay) { sent.add(point.delay); await sendReply(sessionWebhook, point.message); break; // 每次只发送一个通知 } } } // 【增强】启动持续进度通知检查 function startProgressNotifier( messageId: string, sessionWebhook: string, senderNick: string, startTime: number ) { // 定期检查是否需要发送进度通知 const checkInterval = setInterval(() => { // 检查消息是否还在处理 if (currentProcessingMessageId !== messageId) { clearInterval(checkInterval); notifiedPoints.delete(messageId); return; } const elapsed = Date.now() - startTime; sendProgressNotification(messageId, sessionWebhook, senderNick, elapsed); // 如果所有通知都已发送,且消息仍在处理,可以考虑清理 const sent = notifiedPoints.get(messageId); if (sent && sent.size >= PROGRESS_NOTIFY_POINTS.length) { // 所有通知都已发送,不再检查 clearInterval(checkInterval); } }, PROGRESS_NOTIFY_INTERVAL); messageTimeouts.set(messageId + '_progress', checkInterval); } // 【增强3】处理队列中下一条消息(会话独立版) async function processNextForSession(conversationId: string): Promise { const queue = sessionQueues.get(conversationId); if (!queue || queue.length === 0) return; // 该会话是否正在处理 if (isSessionProcessing(conversationId)) return; setSessionProcessing(conversationId, true); const msg = queue.shift()!; const { messageId, senderNick, sessionWebhook, content, botName } = msg; currentProcessingMessageId = messageId; try { // 存储会话上下文 dingTalkSessions.set(messageId, { messageId, senderNick, sessionWebhook, conversationId, timestamp: Date.now() }); // 发送给 pi 处理 const messageText = `[dingtalkbot] [${botName}] [${senderNick}] [${messageId}]\n${content}`; try { // @ts-ignore await pi.sendUserMessage([{ type: "text", text: messageText }], { deliverAs: "steer" }); // 【增强】启动持续进度通知 startProgressNotifier(messageId, sessionWebhook, senderNick, Date.now()); } catch (err) { console.error('[dingtalkbot] 发送给 pi 失败:', err); currentProcessingMessageId = null; setSessionProcessing(conversationId, false); processNextForSession(conversationId); } } catch (err) { console.error('[dingtalkbot] 处理消息失败:', err); currentProcessingMessageId = null; setSessionProcessing(conversationId, false); processNextForSession(conversationId); } } // 发送消息并等待回复 — 委托给 PendingReplyStore // (store.add 原子检查 conversationId 重复, 避免 race) async function sendAndWait( sessionWebhook: string, content: string, options?: { timeout?: number } ): Promise<{ reply: string; message: any }> { const conversationId = extractConversationId(sessionWebhook); // 先发送消息(与 store.add 之间存在理论 race: 用户极快回复会被丢进普通队列) await sendReply(sessionWebhook, content); return pendingReplyStore.add({ conversationId, content, ttlMs: (options?.timeout || 300) * 1000, }); } // ============================================================================ // Connection // ============================================================================ async function connect(ctx: ExtensionContext, bot: BotConfig): Promise { try { disconnect(); currentCtx = ctx; activeBotConfig = bot; // @ts-ignore client = new DWClient({ clientId: bot.clientId, clientSecret: bot.clientSecret }); client.registerCallbackListener(TOPIC_ROBOT, async (res) => { try { const message = JSON.parse(res.data); const content = message?.text?.content; if (!content) return { status: EventAck.SUCCESS }; const messageId = message.msgId || generateMessageId(); const senderNick = message.senderNick || "未知用户"; const sessionWebhook = message.sessionWebhook || ""; const conversationId = message.conversationId || extractConversationId(sessionWebhook); const botName = getBotDisplayName(bot); // 检查是否有等待该会话回复的消息(send-and-wait 路径) if (pendingReplyStore.resolveByConversation(conversationId, { reply: content, message })) { return { status: EventAck.SUCCESS }; } // 【去重检查】防止同一消息被重复处理 if (isMessageProcessed(messageId)) { return { status: EventAck.SUCCESS }; } markMessageProcessed(messageId); // 【增强1&3】使用会话独立队列 if (!sessionQueues.has(conversationId)) { sessionQueues.set(conversationId, []); } const queue = sessionQueues.get(conversationId)!; const queueLength = queue.length; // 消息入队到会话队列 queue.push({ messageId, senderNick, sessionWebhook, content, botName, timestamp: Date.now() }); // 【增强1】显示队列位置,让用户知道前面还有多少消息 // queueLength 是当前队列中已有的消息数量(不包括刚入队的这条) // 如果 queueLength > 0,说明前面有消息在等待 let ackMessage = "👋 收到"; if (queueLength > 0) { // 队列中有等待的消息,显示位置(当前是第 queueLength+1 位,前面有 queueLength 条) ackMessage = `👋 收到,你是第 ${queueLength + 1} 位,前面还有 ${queueLength} 条消息...`; } else if (isSessionProcessing(conversationId)) { // 队列为空但正在处理,说明当前这条消息正在被处理 ackMessage = `👋 收到,正在处理中...`; } else { // 队列为空且没有处理中,这是第一条消息 ackMessage = `👋 收到,正在思考中...`; } // ack 是 UX 优化, 不应阻断消息处理 // (webhook 报错会让该消息卡在 queue 里, 用户必须重发) try { await sendReply(sessionWebhook, ackMessage); } catch (err) { console.error(`[dingtalkbot] ack 发送失败 [${messageId}]:`, err); } // 【增强3】启动该会话的处理(会话独立,不会阻塞其他会话) processNextForSession(conversationId); return { status: EventAck.SUCCESS }; } catch (err) { console.error('[dingtalkbot] 解析消息失败:', err); return { status: EventAck.SUCCESS }; } }); client.on("connect", () => { connected = true; setStatus(); }); client.on("disconnect", (reason: any) => { const wasConnected = connected; connected = false; dingTalkSessions.clear(); const reasonStr = typeof reason === 'string' ? reason : JSON.stringify(reason); const isKicked = reasonStr?.includes("kick") || reasonStr?.includes("replaced") || reasonStr?.includes("403"); console.log(`[dingtalkbot] ❌ ${getBotDisplayName(bot)} ${isKicked ? "被踢" : "断开"}${reasonStr ? `: ${reasonStr}` : ""}`); setStatus(wasConnected && isKicked ? `被踢 (${SESSION_ID.slice(0, 4)})` : undefined); }); client.on("error", (err: any) => { const errMsg = String(err); connected = false; console.log(`[dingtalkbot] ❌ ${getBotDisplayName(bot)}`, err); if (errMsg.includes("already connected") || errMsg.includes("403")) { setStatus("连接被占用"); currentCtx?.ui.notify(`❌ ${getBotDisplayName(bot)} 已被其他会话连接`, "error"); } else { setStatus(errMsg); } }); await client.connect(); connected = true; // 启动 pending reply store 的 cleanup interval — 跟着连接走 pendingReplyStore.start(); setStatus(); return true; } catch (err) { console.error(`[dingtalkbot] 连接异常:`, err); connected = false; setStatus("连接异常"); return false; } } function disconnect() { // 取消所有等待中的回复(promises 会 reject)并停止 cleanup interval pendingReplyStore.stop(); pendingReplyStore.cancelAll("机器人已断开连接"); dingTalkSessions.clear(); if (client) { try { client.disconnect(); } catch {} client = null; } connected = false; activeBotConfig = null; } function setStatus(msg?: string) { if (!currentCtx) return; const active = globalBots.find(b => b.clientId === sessionCfg.activeBotId) || globalBots[0]; if (!active || !connected) { currentCtx.ui.setStatus("dingtalkbot", ""); return; } const botName = getBotDisplayName(active); currentCtx.ui.setStatus("dingtalkbot", msg ? `${botName} 🔴 ${msg}` : `${botName} ✅` ); } // ============================================================================ // Tools // ============================================================================ function getLatestSession(): DingTalkSession | null { let latest: DingTalkSession | null = null; for (const session of dingTalkSessions.values()) { if (!latest || session.timestamp > latest.timestamp) { latest = session; } } return latest; } pi.registerTool({ name: "dingtalkbot-attach", label: "发送文件", description: "发送本地文件到钉钉(转为链接形式)", parameters: Type.Object({ paths: Type.Array(Type.String(), { minItems: 1, maxItems: 10 }), messageId: Type.Optional(Type.String()), }), async execute(_id, p) { if (!client || !connected) return { content: [{ type: "text", text: "机器人未连接" }], details: {} }; const targetMsgId = p.messageId; const session = targetMsgId ? dingTalkSessions.get(targetMsgId) : getLatestSession(); if (!session) return { content: [{ type: "text", text: "无活跃会话" }], details: {} }; const files: string[] = []; for (const fp of p.paths) { try { if ((await stat(fp)).isFile()) files.push(basename(fp)); } catch {} } if (files.length === 0) return { content: [{ type: "text", text: "没有有效的文件" }], details: {} }; const text = `📎 文件列表:\n${files.map(f => `- ${f}`).join("\n")}\n\n(钉钉机器人暂不支持直接发送文件附件)`; await sendMessage(session.sessionWebhook, "text", { content: text }); return { content: [{ type: "text", text: `已发送 ${files.length} 个文件信息` }], details: {} }; }, }); pi.registerTool({ name: "dingtalkbot-send", label: "发送消息", description: "发送消息到钉钉", parameters: Type.Object({ message: Type.String(), format: Type.Optional(Type.Union([Type.Literal("text"), Type.Literal("markdown")], { default: "text" })), messageId: Type.Optional(Type.String()), }), async execute(_id, p) { if (!client || !connected) return { content: [{ type: "text", text: "机器人未连接" }], details: {} }; const targetMsgId = p.messageId; const session = targetMsgId ? dingTalkSessions.get(targetMsgId) : getLatestSession(); if (!session) return { content: [{ type: "text", text: "无活跃会话" }], details: {} }; if (p.format === "markdown") { await sendMessage(session.sessionWebhook, "markdown", { title: "消息", text: p.message }); } else { await sendMessage(session.sessionWebhook, "text", { content: p.message }); } return { content: [{ type: "text", text: "✅ 已发送" }], details: {} }; }, }); pi.registerTool({ name: "dingtalkbot-send-and-wait", label: "发送并等待回复", description: "发送消息到钉钉并等待用户回复,超时后自动取消", parameters: Type.Object({ message: Type.String({ description: "要发送的消息内容" }), timeout: Type.Optional(Type.Number({ default: 300, description: "等待超时时间(秒),默认300秒(5分钟)" })), format: Type.Optional(Type.Union([Type.Literal("text"), Type.Literal("markdown")], { default: "text" })), messageId: Type.Optional(Type.String({ description: "指定会话的消息ID,不传则发送到最新会话" })), }), async execute(_id, p) { if (!client || !connected) return { content: [{ type: "text", text: "机器人未连接" }], details: {} }; const targetMsgId = p.messageId; const session = targetMsgId ? dingTalkSessions.get(targetMsgId) : getLatestSession(); if (!session) return { content: [{ type: "text", text: "无活跃会话" }], details: {} }; const sendContent = p.message; // 发送消息并等待回复 const result = await sendAndWait(session.sessionWebhook, sendContent, { timeout: p.timeout }); // 提取用户回复的关键信息 const replyPreview = result.reply.slice(0, 100); const replyText = result.reply.length > 100 ? replyPreview + "..." : replyPreview; return { content: [{ type: "text", text: `📨 已收到回复 (${Math.round((Date.now() - (result.message as any).createTime) / 1000)}秒):\n${replyText}` }], details: { reply: result.reply, message: result.message, senderNick: result.message.senderNick, sendContent: sendContent, } }; }, }); pi.registerTool({ name: "dingtalkbot-cancel-wait", label: "取消等待", description: "取消指定会话的等待状态", parameters: Type.Object({ messageId: Type.Optional(Type.String({ description: "会话的消息ID,不传则取消最新会话的等待" })), }), async execute(_id, p) { if (!client || !connected) return { content: [{ type: "text", text: "机器人未连接" }], details: {} }; const targetMsgId = p.messageId; const session = targetMsgId ? dingTalkSessions.get(targetMsgId) : getLatestSession(); if (!session) return { content: [{ type: "text", text: "无活跃会话" }], details: {} }; const conversationId = extractConversationId(session.sessionWebhook); const cancelled = pendingReplyStore.cancelByConversation(conversationId, "等待被取消"); return { content: [{ type: "text", text: cancelled ? "✅ 已取消等待" : "ℹ️ 该会话没有等待中的消息" }], details: { cancelled, conversationId } }; }, }); // ============================================================================ // Commands // ============================================================================ pi.registerCommand("dingtalkbot-add", { description: "添加机器人", handler: async (_args, ctx) => { const name = await ctx.ui.input("机器人名称(可选)", ""); const clientId = await ctx.ui.input("ClientID", "dingxxxxxxxxxxxxxxxx"); if (!clientId) return; const clientSecret = await ctx.ui.input("ClientSecret", ""); if (!clientSecret) return; const globalCfg = await loadGlobalConfig(); if (globalCfg.bots.find(b => b.clientId === clientId.trim())) { ctx.ui.notify("❌ 该机器人已存在", "error"); return; } const newBot: BotConfig = { clientId: clientId.trim(), clientSecret: clientSecret.trim(), name: name?.trim() || undefined }; globalCfg.bots.push(newBot); await saveGlobalConfig(globalCfg); globalBots = globalCfg.bots; if (!sessionCfg.activeBotId) { sessionCfg.activeBotId = clientId.trim(); await saveSessionConfig(sessionCfg); } ctx.ui.notify(`✅ 已添加 ${name || getBotDisplayName(newBot)}`, "info"); await connect(ctx, newBot); }, }); pi.registerCommand("dingtalkbot-list", { description: "列出所有机器人", handler: async (_args, ctx) => { globalBots = (await loadGlobalConfig()).bots; if (globalBots.length === 0) { ctx.ui.notify("暂无配置的机器人", "info"); return; } const list = globalBots.map(b => { const isActive = b.clientId === sessionCfg.activeBotId; return `${isActive ? "▶" : "○"} ${getBotDisplayName(b)}`; }).join("\n"); const activeBot = globalBots.find(b => b.clientId === sessionCfg.activeBotId); ctx.ui.notify(`机器人列表:\n${list}`, "info"); }, }); pi.registerCommand("dingtalkbot-use", { description: "切换机器人", handler: async (_args, ctx) => { globalBots = (await loadGlobalConfig()).bots; if (globalBots.length === 0) { ctx.ui.notify("暂无配置的机器人", "warning"); return; } const options = globalBots.map(b => `${b.clientId === sessionCfg.activeBotId ? "▶ " : "○ "}${getBotDisplayName(b)}`); const selected = await ctx.ui.select("选择机器人", options); if (!selected) return; const label = selected.replace(/^[▶○] /, ""); const bot = globalBots.find(b => getBotDisplayName(b) === label || b.clientId === label); if (!bot) return; sessionCfg.activeBotId = bot.clientId; await saveSessionConfig(sessionCfg); ctx.ui.notify(`✅ 已切换到 ${getBotDisplayName(bot)}`, "info"); await connect(ctx, bot); }, }); pi.registerCommand("dingtalkbot-remove", { description: "删除机器人", handler: async (_args, ctx) => { globalBots = (await loadGlobalConfig()).bots; if (globalBots.length === 0) { ctx.ui.notify("暂无配置的机器人", "warning"); return; } const name = await ctx.ui.input("输入要删除的 ClientID 或名称", ""); if (!name) return; const idx = globalBots.findIndex(b => b.clientId === name || b.name === name); if (idx === -1) { ctx.ui.notify("❌ 机器人不存在", "error"); return; } const removed = globalBots.splice(idx, 1)[0]; await saveGlobalConfig({ bots: globalBots }); if (sessionCfg.activeBotId === removed.clientId) { disconnect(); const nextBot = globalBots[0]; sessionCfg.activeBotId = nextBot?.clientId; await saveSessionConfig(sessionCfg); if (nextBot) { ctx.ui.notify(`✅ 已删除,切换到 ${getBotDisplayName(nextBot)}`, "info"); await connect(ctx, nextBot); } else { ctx.ui.notify(`✅ 已删除`, "info"); } } else { ctx.ui.notify(`✅ 已删除 ${getBotDisplayName(removed)}`, "info"); } }, }); pi.registerCommand("dingtalkbot-status", { description: "查看机器人状态", handler: async (_args, ctx) => { globalBots = (await loadGlobalConfig()).bots; const active = globalBots.find(b => b.clientId === sessionCfg.activeBotId); if (!active) { ctx.ui.notify(`机器人: ${globalBots.length} 个\n状态: 未选择`, "info"); return; } const status = !sessionCfg.enabled ? "🔴 禁用" : connected ? "✅ 已连接" : "❌ 已断开"; ctx.ui.notify(`${status}\n机器人: ${getBotDisplayName(active)}`, "info"); }, }); pi.registerCommand("dingtalkbot-enable", { description: "启用机器人", handler: async (_args, ctx) => { sessionCfg.enabled = true; await saveSessionConfig(sessionCfg); const bot = globalBots.find(b => b.clientId === sessionCfg.activeBotId) || globalBots[0]; if (bot) { await connect(ctx, bot); ctx.ui.notify(`✅ 已启用 ${getBotDisplayName(bot)}`, "info"); } }, }); pi.registerCommand("dingtalkbot-disable", { description: "禁用机器人", handler: async (_args, ctx) => { sessionCfg.enabled = false; await saveSessionConfig(sessionCfg); disconnect(); ctx.ui.notify("🔌 已禁用", "info"); }, }); // ============================================================================ // Events // ============================================================================ pi.on("session_start", async (_e, ctx) => { try { globalBots = (await loadGlobalConfig()).bots; sessionCfg = await loadSessionConfig(); if (sessionCfg.enabled && sessionCfg.activeBotId) { const bot = globalBots.find(b => b.clientId === sessionCfg.activeBotId); if (bot) await connect(ctx, bot); } setStatus(); } catch (err) { console.error(`[dingtalkbot] session_start 异常:`, err); } }); pi.on("session_shutdown", () => disconnect()); // 阻止未连接时调用钉钉工具,避免在 pi 会话日志中产生噪声 pi.on("tool_call", async (event) => { if (!event.toolName.startsWith("dingtalkbot-")) return; if (connected) return; return { block: true, reason: "钉钉机器人未连接,无法调用此工具。请先使用 /dingtalkbot-add 或 /dingtalkbot-use 启用机器人。", }; }); pi.on("before_agent_start", async (e) => { // 只有机器人连接后才添加提示 if (!connected) return {}; return { systemPrompt: e.systemPrompt + PROMPT }; }); pi.on("agent_end", async (e) => { // 只有机器人连接后才处理回复 if (!connected) return; setStatus(); // 多个用户的消息可能通过 steer/followUp 进入同一个 agent run, // e.messages 包含多对 (user, assistant[, tool_call, tool_result, ...]) // 每对都需要单独回复到对应用户。 // // 历史 bug: 之前从 txt (assistant 响应) 正则匹配 [dingtalkbot] [...] 前缀找 messageId, // 但 assistant 响应不含该前缀 → regex 永远不匹配 → fallback 到 // getLatestSession() 返回最新入队的 session, 多用户并发时发错用户。 // // 修复: 遍历消息找 (user, assistant...) 对, 从 user 消息提取 messageId, // 从后续 assistant 消息提取响应文本。 const messages = e.messages as any[]; for (let i = 0; i < messages.length; i++) { const userMsg = messages[i]; if (userMsg.role !== "user") continue; // 从 user 消息文本提取 messageId // 格式: [dingtalkbot] [bot] [nick] [messageId]\n{content} const userText = (userMsg.content as any[])?.find((b: any) => b.type === "text")?.text; if (!userText) continue; const match = userText.match(/\[dingtalkbot\] \[.*?\] \[.*?\] \[(.+?)\]\n/); if (!match) continue; // 不是钉钉注入的消息 const messageId = match[1]; const session = dingTalkSessions.get(messageId); if (!session) continue; // session 丢失 (可能 disconnect) // 收集该 user 之后所有 assistant 文本(处理含 tool call 的回合), // 直到下一个 user 消息或数组末尾 const textParts: string[] = []; for (let j = i + 1; j < messages.length; j++) { const m = messages[j]; if (m.role === "user") break; // 进入下一轮 if (m.role === "assistant") { const t = (m.content as any[])?.find((b: any) => b.type === "text")?.text; if (t) textParts.push(t); } } const content = textParts.join("").trim(); if (!content) continue; // 发送回复(无论成功与否都清理 + 推进队列, 避免 webhook 报错卡住队列) try { await sendReply(session.sessionWebhook, content); } catch (err) { console.error(`[dingtalkbot] 发送回复失败 [${session.messageId}]:`, err); } dingTalkSessions.delete(session.messageId); // 清除进度通知 const progressInterval = messageTimeouts.get(messageId + "_progress"); if (progressInterval) { clearInterval(progressInterval); messageTimeouts.delete(messageId + "_progress"); } notifiedPoints.delete(messageId); // 继续处理该会话的下一条消息 if (session.conversationId) { const conversationId = session.conversationId; if (currentProcessingMessageId === messageId) { currentProcessingMessageId = null; } setSessionProcessing(conversationId, false); processNextForSession(conversationId); } } currentProcessingMessageId = null; }); }