/** * MCP Streamable HTTP 传输层模块 * * 负责: * - MCP JSON-RPC over HTTP 通信(发送请求、解析响应) * - Streamable HTTP session 生命周期管理(initialize 握手 → Mcp-Session-Id 维护 → 失效重建) * - 自动检测无状态 Server:如果 initialize 响应未返回 Mcp-Session-Id, * 则标记为无状态模式,后续请求跳过握手和 session 管理 * - SSE 流式响应解析 * - MCP 配置运行时缓存(通过 WSClient 拉取 URL 并缓存在内存中) */ import { readFileSync } from "node:fs"; import { dirname, resolve } from "node:path"; import { fileURLToPath } from "node:url"; import { generateReqId } from "@wecom/aibot-node-sdk"; import { DEFAULT_ACCOUNT_ID } from "../compat/plugin-sdk-shim.js"; import { getWsClient } from "../ws-adapter.js"; import { withTimeout } from "../timeout.js"; // ============================================================================ // 常量 // ============================================================================ /** 获取 MCP 配置的 WebSocket 命令 */ const MCP_GET_CONFIG_CMD = "aibot_get_mcp_config"; /** MCP 配置拉取超时时间(毫秒) */ const MCP_CONFIG_FETCH_TIMEOUT_MS = 15_000; /** 从 package.json 读取插件版本号 */ const getPluginVersion = (): string => { try { const currentDir = dirname(fileURLToPath(import.meta.url)); const pkgPath = resolve(currentDir, "..", "..", "package.json"); const pkg = JSON.parse(readFileSync(pkgPath, "utf-8")) as { version?: string }; return pkg.version ?? ""; } catch { return ""; } }; const PLUGIN_VERSION = getPluginVersion(); // ============================================================================ // 类型定义 // ============================================================================ /** MCP JSON-RPC 请求体 */ interface JsonRpcRequest { jsonrpc: "2.0"; id?: string; method: string; params?: Record; } /** MCP JSON-RPC 响应体 */ interface JsonRpcResponse { jsonrpc: "2.0"; id: number | string; result?: unknown; error?: { code: number; message: string; data?: unknown; }; } /** * Streamable HTTP 会话信息 * * 每个 MCP Server category 维护一个独立的会话,包含: * - sessionId: 服务端通过 Mcp-Session-Id 响应头返回的会话标识 * - initialized: 是否已完成 initialize 握手 * - stateless: 服务端未返回 Mcp-Session-Id 时标记为无状态模式,后续请求跳过 session 管理 */ interface McpSession { sessionId: string | null; initialized: boolean; stateless: boolean; } // ============================================================================ // 内部状态 // ============================================================================ /** HTTP 请求超时时间(毫秒) */ const HTTP_REQUEST_TIMEOUT_MS = 30_000; /** 日志前缀 */ const LOG_TAG = "[mcp]"; /** * MCP JSON-RPC 错误 * * 携带服务端返回的 JSON-RPC error.code, * 用于上层按错误码进行差异化处理(如特定错误码触发缓存清理)。 */ export class McpRpcError extends Error { constructor( public readonly code: number, message: string, public readonly data?: unknown, ) { super(message); this.name = "McpRpcError"; } } /** * MCP HTTP 错误 * * 携带 HTTP 状态码,用于精确判断 session 失效(404)等场景, * 避免通过字符串匹配 "404" 导致的误判。 */ export class McpHttpError extends Error { constructor( public readonly statusCode: number, message: string, ) { super(message); this.name = "McpHttpError"; } } /** * 需要清理缓存的 JSON-RPC 错误码集合 * * 当 MCP Server 返回以下错误码时,说明服务端状态已发生变化(如配置变更、 * 服务重启等),需要清理对应 category 的全部缓存,确保下次请求重新 * 拉取配置并重建会话。 * * - -32001: 服务不可用(Server Unavailable) * - -32002: 配置已变更(Config Changed) * - -32003: 认证失败(Auth Failed) */ const CACHE_CLEAR_ERROR_CODES = new Set([-32001, -32002, -32003]); /** MCP 配置缓存:category → response.body(完整配置) */ const mcpConfigCache = new Map>(); /** Streamable HTTP 会话缓存:category → session */ const mcpSessionCache = new Map(); /** 已确认为无状态的 MCP Server 品类集合(跳过后续握手) */ const statelessCategories = new Set(); /** 正在进行中的 initialize 请求(防止并发重复初始化),key 为 category */ const inflightInitRequests = new Map>(); // ============================================================================ // MCP 配置拉取与缓存 // ============================================================================ /** * 通过 WSClient 拉取指定 category 的 MCP 完整配置 * * @param category - MCP 品类名称,如 doc、contact * @returns 完整的 response.body 配置对象(至少包含 url 字段) */ async function fetchMcpConfig(category: string): Promise> { const wsClient = getWsClient(DEFAULT_ACCOUNT_ID); if (!wsClient) { throw new Error("WSClient 未连接,无法拉取 MCP 配置"); } const reqId = generateReqId("mcp_config"); const response = await withTimeout( wsClient.reply( { headers: { req_id: reqId } }, { biz_type: category, plugin_version: PLUGIN_VERSION }, MCP_GET_CONFIG_CMD, ), MCP_CONFIG_FETCH_TIMEOUT_MS, `MCP config fetch for "${category}" timed out after ${MCP_CONFIG_FETCH_TIMEOUT_MS}ms`, ); if (response.errcode !== undefined && response.errcode !== 0) { const errMsg = `MCP 配置请求失败: errcode=${response.errcode}, errmsg=${response.errmsg ?? "unknown"}`; console.error(`${LOG_TAG} ${errMsg}`); throw new Error(errMsg); } const body = response.body as { url?: string } | undefined; if (!body?.url) { throw new Error( `MCP 配置响应缺少 url 字段 (category="${category}")`, ); } console.log(`${LOG_TAG} 配置拉取成功 (category="${category}")`); return body as Record; } /** * 获取指定品类的 MCP Server URL * * 优先从内存缓存中读取,未命中时通过 WSClient 拉取并缓存。 * * @param category - MCP 品类名称 * @returns MCP Server URL */ async function getMcpUrl(category: string): Promise { // 查内存缓存 const cached = mcpConfigCache.get(category); if (cached) return cached.url as string; // 缓存未命中,通过 WSClient 拉取 const body = await fetchMcpConfig(category); // 写入缓存 mcpConfigCache.set(category, body); console.log(`${LOG_TAG} getMcpUrl ${category}: ${body.url}`); return body.url as string; } // ============================================================================ // HTTP 底层通信 // ============================================================================ /** * 发送原始 HTTP 请求到 MCP Server(底层方法) * * 自动携带 Mcp-Session-Id 请求头(如果有), * 并从响应头中更新 sessionId。 */ async function sendRawJsonRpc( url: string, session: McpSession, body: JsonRpcRequest, ): Promise<{ response: Response; rpcResult: unknown; newSessionId: string | null }> { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), HTTP_REQUEST_TIMEOUT_MS); const headers: Record = { "Content-Type": "application/json", Accept: "application/json, text/event-stream", }; // Streamable HTTP:携带会话 ID if (session.sessionId) { headers["Mcp-Session-Id"] = session.sessionId; } let response: Response; try { response = await fetch(url, { method: "POST", headers, body: JSON.stringify(body), signal: controller.signal, }); } catch (err) { if (err instanceof DOMException && err.name === "AbortError") { throw new Error(`MCP 请求超时 (${HTTP_REQUEST_TIMEOUT_MS}ms)`); } throw new Error(`MCP 网络请求失败: ${err instanceof Error ? err.message : String(err)}`); } finally { clearTimeout(timeoutId); } // 从响应头提取新的 sessionId(不直接修改入参,由调用方决定如何更新) const newSessionId = response.headers.get("mcp-session-id"); if (!response.ok) { throw new McpHttpError( response.status, `MCP HTTP 请求失败: ${response.status} ${response.statusText}`, ); } // Streamable HTTP:notification 响应可能无响应体(204 或 content-length: 0) const contentLength = response.headers.get("content-length"); if (response.status === 204 || contentLength === "0") { return { response, rpcResult: undefined, newSessionId }; } const contentType = response.headers.get("content-type") ?? ""; // 处理 SSE 流式响应 if (contentType.includes("text/event-stream")) { return { response, rpcResult: await parseSseResponse(response), newSessionId }; } // 普通 JSON 响应 — 先读取文本,防止空内容导致 JSON.parse 报错 const text = await response.text(); if (!text.trim()) { return { response, rpcResult: undefined, newSessionId }; } const rpc = JSON.parse(text) as JsonRpcResponse; if (rpc.error) { throw new McpRpcError( rpc.error.code, `MCP 调用错误 [${rpc.error.code}]: ${rpc.error.message}`, rpc.error.data, ); } return { response, rpcResult: rpc.result, newSessionId }; } // ============================================================================ // Session 管理 // ============================================================================ /** * 对指定 URL 执行 Streamable HTTP 的 initialize 握手 * * 发送 initialize → 接收 serverInfo → 发送 initialized 通知。 * 如果服务端未返回 Mcp-Session-Id,则标记为无状态模式,后续请求跳过 session 管理。 */ async function initializeSession(url: string, category: string): Promise { const session: McpSession = { sessionId: null, initialized: false, stateless: false }; console.log(`${LOG_TAG} 开始 initialize 握手 (category="${category}")`); // 1. 发送 initialize 请求 const initBody: JsonRpcRequest = { jsonrpc: "2.0", id: generateReqId("mcp_init"), method: "initialize", params: { protocolVersion: "2025-03-26", capabilities: {}, clientInfo: { name: "wecom_mcp", version: "1.0.0" }, }, }; const { newSessionId: initSessionId } = await sendRawJsonRpc(url, session, initBody); // 用返回的 newSessionId 更新 session(不再依赖副作用修改) if (initSessionId) { session.sessionId = initSessionId; } // 检查服务端是否返回了 Mcp-Session-Id // 如果没有返回,说明该 Server 是无状态实现,无需维护 session if (!session.sessionId) { session.stateless = true; session.initialized = true; statelessCategories.add(category); mcpSessionCache.set(category, session); console.log(`${LOG_TAG} 无状态 Server 确认 (category="${category}")`); return session; } // 2. 发送 initialized 通知(JSON-RPC notification 不带 id 字段) const notifyBody: JsonRpcRequest = { jsonrpc: "2.0", method: "notifications/initialized", }; // initialized 通知不需要等待响应,但 Streamable HTTP 要求通过 POST 发送 const { newSessionId: notifySessionId } = await sendRawJsonRpc(url, session, notifyBody); // 如果 initialized 通知的响应也携带了 sessionId,以最新的为准 if (notifySessionId) { session.sessionId = notifySessionId; } session.initialized = true; mcpSessionCache.set(category, session); console.log(`${LOG_TAG} 有状态 Session 建立成功 (category="${category}", sessionId="${session.sessionId}")`); return session; } /** * 获取或创建指定 URL 的 MCP 会话 * * - 已确认无状态的 category:直接返回空 session,跳过握手 * - 已有可用有状态会话:直接返回缓存 * - 其他情况:执行 initialize 握手,并发请求会被合并 */ async function getOrCreateSession(url: string, category: string): Promise { // 已确认为无状态的 Server,直接返回空 session 跳过握手 if (statelessCategories.has(category)) { const cached = mcpSessionCache.get(category); if (cached) return cached; // 首次发现被清除(理论上不会走到这里),重新走握手探测 } const cached = mcpSessionCache.get(category); if (cached?.initialized) return cached; // 防止并发重复初始化 const inflight = inflightInitRequests.get(category); if (inflight) return inflight; const promise = initializeSession(url, category).finally(() => { inflightInitRequests.delete(category); }); inflightInitRequests.set(category, promise); return promise; } // ============================================================================ // SSE 解析 // ============================================================================ /** * 解析 SSE 流式响应,提取最终的 JSON-RPC result * * 按照 SSE 规范,同一事件中的多个 `data:` 行会用换行符拼接。 * 空行分隔不同事件,取最后一个完整事件的数据。 */ async function parseSseResponse(response: Response): Promise { const text = await response.text(); const lines = text.split("\n"); // 按 SSE 规范解析:空行分隔事件,同一事件内的 data 行用换行拼接 let currentDataParts: string[] = []; let lastEventData = ""; for (const line of lines) { if (line.startsWith("data: ")) { currentDataParts.push(line.slice(6)); } else if (line.startsWith("data:")) { // data: 后无空格时,值为空字符串 currentDataParts.push(line.slice(5)); } else if (line.trim() === "" && currentDataParts.length > 0) { // 空行表示事件结束,拼接所有 data 行 lastEventData = currentDataParts.join("\n").trim(); currentDataParts = []; } } // 处理最后一个未以空行结尾的事件 if (currentDataParts.length > 0) { lastEventData = currentDataParts.join("\n").trim(); } if (!lastEventData) { throw new Error("SSE 响应中未包含有效数据"); } try { const rpc = JSON.parse(lastEventData) as JsonRpcResponse; if (rpc.error) { throw new McpRpcError( rpc.error.code, `MCP 调用错误 [${rpc.error.code}]: ${rpc.error.message}`, rpc.error.data, ); } return rpc.result; } catch (err) { if (err instanceof SyntaxError) { throw new Error(`SSE 响应解析失败: ${lastEventData.slice(0, 200)}`); } throw err; } } // ============================================================================ // 公共 API // ============================================================================ /** * 清理指定品类的所有 MCP 缓存(配置、会话、无状态标记) * * 当 MCP Server 返回特定错误码时调用,确保下次请求重新拉取配置并重建会话。 * * @param category - MCP 品类名称 */ export function clearCategoryCache(category: string): void { console.log(`${LOG_TAG} 清理缓存 (category="${category}")`); mcpConfigCache.delete(category); mcpSessionCache.delete(category); statelessCategories.delete(category); inflightInitRequests.delete(category); } /** tools/list 返回的工具描述 */ export interface McpToolInfo { name: string; description?: string; inputSchema?: Record; } /** * 发送 JSON-RPC 请求到 MCP Server(Streamable HTTP 协议) * * 自动管理 session 生命周期: * - 无状态 Server:跳过 session 管理,直接发送请求 * - 有状态 Server:首次调用先执行 initialize 握手,session 失效(404)时自动重建并重试 * * @param category - MCP 品类名称 * @param method - JSON-RPC 方法名 * @param params - JSON-RPC 参数 * @returns JSON-RPC result */ export async function sendJsonRpc( category: string, method: string, params?: Record, ): Promise { const url = await getMcpUrl(category); const body: JsonRpcRequest = { jsonrpc: "2.0", id: generateReqId("mcp_rpc"), method, ...(params !== undefined ? { params } : {}), }; let session = await getOrCreateSession(url, category); try { const { rpcResult, newSessionId } = await sendRawJsonRpc(url, session, body); // 用最新的 sessionId 更新 session if (newSessionId) { session.sessionId = newSessionId; } return rpcResult; } catch (err) { // 特定 JSON-RPC 错误码触发缓存清理(统一在传输层处理,上层无需关心) if (err instanceof McpRpcError && CACHE_CLEAR_ERROR_CODES.has(err.code)) { clearCategoryCache(category); } // 无状态 Server 不存在 session 失效问题,直接抛出错误 if (session.stateless) throw err; // 有状态 Server:session 失效时服务端返回 404,需要重新初始化并重试一次 // 使用 McpHttpError.statusCode 精确匹配,避免字符串匹配 "404" 导致误判 if (err instanceof McpHttpError && err.statusCode === 404) { console.log(`${LOG_TAG} Session 失效 (category="${category}"),开始重建...`); mcpSessionCache.delete(category); // 使用 rebuildSession 合并并发的 session 重建请求,避免竞态条件 session = await rebuildSession(url, category); const { rpcResult, newSessionId } = await sendRawJsonRpc(url, session, body); if (newSessionId) { session.sessionId = newSessionId; } return rpcResult; } // 其他错误记录日志后抛出 console.error(`${LOG_TAG} RPC 请求失败 (category="${category}", method="${method}"): ${err instanceof Error ? err.message : String(err)}`); throw err; } } /** * 合并并发的 session 重建请求 * * 与 getOrCreateSession 类似,使用 inflightInitRequests 防止 * 多个并发请求同时遇到 404 时重复执行 initialize 握手。 */ async function rebuildSession(url: string, category: string): Promise { const inflight = inflightInitRequests.get(category); if (inflight) return inflight; const promise = initializeSession(url, category).finally(() => { inflightInitRequests.delete(category); }); inflightInitRequests.set(category, promise); return promise; }