/**
* WeCom WebSocket 长链接模式适配器
*
* 职责:管理 WSClient 生命周期,将 SDK 事件桥接到现有 monitor.ts 消息管线。
*
* SDK WsFrame 事件
* ↓
* ws-adapter 转换为 WecomBotInboundMessage 格式
* ↓
* 复用 monitor.ts 中的 shouldProcessBotInboundMessage → buildInboundBody
* → streamStore.addPendingMessage → flushPending 管线
*/
import type { OpenClawConfig, PluginRuntime } from "openclaw/plugin-sdk";
import { WSClient } from "@wecom/aibot-node-sdk";
import type {
WsFrame,
BaseMessage,
TextMessage,
ImageMessage,
MixedMessage,
VoiceMessage,
FileMessage,
EventMessage,
EventMessageWith,
ReplyMsgItem,
} from "@wecom/aibot-node-sdk";
import type { EnterChatEvent, TemplateCardEventData } from "@wecom/aibot-node-sdk";
import type { ResolvedBotAccount, WecomNetworkConfig, WecomBotInboundMessage } from "./types/index.js";
import type { WecomRuntimeEnv, WecomWebhookTarget, StreamState } from "./monitor/types.js";
import { shouldProcessBotInboundMessage, buildInboundBody } from "./monitor.js";
import { monitorState } from "./monitor/state.js";
import { getWecomRuntime } from "./runtime.js";
import { fetchAndSaveMcpConfig } from "./mcp-config.js";
// ─── Constants ─────────────────────────────────────────────────────────
/** "思考中" 占位消息,让用户立即看到机器人正在响应 */
const THINKING_MESSAGE = "";
// ─── WSClient Instance Registry ────────────────────────────────────────
const wsClients = new Map();
/**
* 获取指定账号的 WSClient 实例
*/
export function getWsClient(accountId: string): WSClient | undefined {
return wsClients.get(accountId);
}
/**
* 等待 WSClient 连接就绪,最多等待 timeoutMs 毫秒(默认 30 秒)。
* 如果已连接则立即返回;如果 client 尚未创建,会轮询等待创建后再监听连接事件。
*/
export async function waitForWsConnection(accountId: string, timeoutMs = 30_000): Promise {
const deadline = Date.now() + timeoutMs;
// 等待 client 实例出现(gateway 重启时 client 可能还没注册)
while (!wsClients.has(accountId)) {
if (Date.now() >= deadline) return false;
await new Promise((r) => setTimeout(r, 500));
}
const client = wsClients.get(accountId)!;
if (client.isConnected) return true;
const remaining = deadline - Date.now();
if (remaining <= 0) return false;
return new Promise((resolve) => {
const timer = setTimeout(() => {
cleanup();
resolve(false);
}, remaining);
const onConnected = () => {
cleanup();
resolve(true);
};
const cleanup = () => {
clearTimeout(timer);
client.off("connected", onConnected);
};
client.on("connected", onConnected);
// 再检查一次,防止在注册监听器之前已连上
if (client.isConnected) {
cleanup();
resolve(true);
}
});
}
// ─── Stream Reply Watcher ──────────────────────────────────────────────
/**
* 流式回复监听器:轮询 StreamState 变化并通过 WSClient 推送回复
*/
function watchStreamReply(params: {
wsClient: WSClient;
frame: WsFrame;
streamId: string;
log?: (msg: string) => void;
error?: (msg: string) => void;
}): void {
const { wsClient, frame, streamId, log, error } = params;
const streamStore = monitorState.streamStore;
let lastSentContent = "";
let finished = false;
const POLL_INTERVAL_MS = 200;
const tick = async () => {
if (finished) return;
const state = streamStore.getStream(streamId);
if (!state) {
finished = true;
return;
}
const content = state.content ?? "";
const isFinished = state.finished ?? false;
// 有新内容或流结束时发送
if (content !== lastSentContent || isFinished) {
try {
// 构建图片附件(仅在结束时)
let msgItems: ReplyMsgItem[] | undefined;
if (isFinished && state.images?.length) {
msgItems = state.images.map((img) => ({
msgtype: "image" as const,
image: { base64: img.base64, md5: img.md5 },
}));
}
await wsClient.replyStream(
frame,
streamId,
content,
isFinished,
msgItems,
);
lastSentContent = content;
log?.(`ws-reply: streamId=${streamId} len=${content.length} finish=${isFinished}`);
} catch (err) {
error?.(`ws-reply: replyStream failed streamId=${streamId}: ${String(err)}`);
}
}
if (isFinished) {
finished = true;
return;
}
setTimeout(tick, POLL_INTERVAL_MS);
};
// 初次延迟启动,等待 agent 开始生产内容
setTimeout(tick, POLL_INTERVAL_MS);
}
// ─── SDK Message → WecomBotInboundMessage Conversion ───────────────────
/**
* 将 SDK 的 WsFrame 转换为现有的 WecomBotInboundMessage 格式
*/
function convertSdkMessageToInbound(body: BaseMessage): WecomBotInboundMessage {
const base: WecomBotInboundMessage = {
msgid: body.msgid,
aibotid: body.aibotid,
chattype: body.chattype,
chatid: body.chatid,
response_url: body.response_url,
from: body.from ? { userid: body.from.userid } : undefined,
msgtype: body.msgtype as string,
};
const msgtype = String(body.msgtype ?? "").toLowerCase();
if (msgtype === "text") {
const textBody = body as TextMessage;
return { ...base, msgtype: "text", text: textBody.text, quote: textBody.quote as any };
}
if (msgtype === "voice") {
const voiceBody = body as VoiceMessage;
return { ...base, msgtype: "voice", voice: voiceBody.voice, quote: voiceBody.quote as any };
}
if (msgtype === "image") {
const imageBody = body as ImageMessage;
return { ...base, msgtype: "image" as any, image: imageBody.image, quote: imageBody.quote as any } as any;
}
if (msgtype === "file") {
const fileBody = body as FileMessage;
return { ...base, msgtype: "file" as any, file: fileBody.file, quote: fileBody.quote as any } as any;
}
if (msgtype === "video") {
// SDK 没有导出 VideoMessage 类型,直接从 BaseMessage 取 video 字段
return { ...base, msgtype: "video" as any, video: (body as any).video, quote: (body as any).quote } as any;
}
if (msgtype === "mixed") {
const mixedBody = body as MixedMessage;
return { ...base, msgtype: "mixed" as any, mixed: mixedBody.mixed, quote: mixedBody.quote as any } as any;
}
// Fallback: pass through as-is
return { ...base, ...body };
}
// ─── WS Event Handlers ────────────────────────────────────────────────
function setupMessageHandler(params: {
wsClient: WSClient;
accountId: string;
target: WecomWebhookTarget;
}) {
const { wsClient, accountId, target } = params;
const streamStore = monitorState.streamStore;
// 监听所有消息类型
wsClient.on("message", (frame: WsFrame) => {
const body = frame.body;
if (!body) return;
const msgtype = String(body.msgtype ?? "").toLowerCase();
// event 类型由专门的 event handler 处理
if (msgtype === "event") return;
const msg = convertSdkMessageToInbound(body);
const decision = shouldProcessBotInboundMessage(msg);
if (!decision.shouldProcess) {
target.runtime.log?.(
`[${accountId}] ws-inbound: skipped msgtype=${msgtype} reason=${decision.reason}`,
);
return;
}
const userid = decision.senderUserId!;
const chatId = decision.chatId ?? userid;
const conversationKey = `wecom:${accountId}:${userid}:${chatId}`;
const msgContent = buildInboundBody(msg);
target.runtime.log?.(
`[${accountId}] ws-inbound: msgtype=${msgtype} chattype=${String(msg.chattype ?? "")} ` +
`from=${userid} msgid=${String(msg.msgid ?? "")}`,
);
// 消息去重
if (msg.msgid) {
const existingStreamId = streamStore.getStreamByMsgId(String(msg.msgid));
if (existingStreamId) {
target.runtime.log?.(
`[${accountId}] ws-inbound: duplicate msgid=${msg.msgid}, skipping`,
);
return;
}
}
// 加入 Pending 队列(复用现有防抖/聚合逻辑)
const { streamId } = streamStore.addPendingMessage({
conversationKey,
target,
msg,
msgContent,
nonce: "",
timestamp: String(Date.now()),
debounceMs: (target.account.config as any).debounceMs,
});
// 标记 wsMode
streamStore.updateStream(streamId, (s: StreamState) => {
s.wsMode = true;
});
// 立即发送"思考中"占位消息,让用户看到即时反馈
const sendThinking = (target.account.config as any).sendThinkingMessage ?? true;
if (sendThinking) {
wsClient.replyStream(frame, streamId, THINKING_MESSAGE, false).catch((err) => {
target.runtime.error?.(
`[${accountId}] ws-thinking: failed to send thinking message: ${String(err)}`,
);
});
}
// 注册流式回复监听器
watchStreamReply({
wsClient,
frame,
streamId,
log: (msg) => target.runtime.log?.(`[${accountId}] ${msg}`),
error: (msg) => target.runtime.error?.(`[${accountId}] ${msg}`),
});
target.statusSink?.({ lastInboundAt: Date.now() });
});
}
function setupEventHandler(params: {
wsClient: WSClient;
accountId: string;
target: WecomWebhookTarget;
welcomeText?: string;
}) {
const { wsClient, accountId, target, welcomeText } = params;
const streamStore = monitorState.streamStore;
// 进入会话事件 → 欢迎语
wsClient.on("event.enter_chat", async (frame: WsFrame>) => {
const text = welcomeText?.trim();
if (!text) return;
try {
await wsClient.replyWelcome(frame, {
msgtype: "text",
text: { content: text },
});
target.runtime.log?.(`[${accountId}] ws-event: sent welcome text`);
} catch (err) {
target.runtime.error?.(`[${accountId}] ws-event: replyWelcome failed: ${String(err)}`);
}
});
// 模板卡片交互事件 → 转换为文本消息注入管线
wsClient.on("event.template_card_event", (frame: WsFrame>) => {
const body = frame.body;
if (!body) return;
const eventData = body.event;
let interactionDesc = `[卡片交互] 按钮: ${eventData?.event_key || "unknown"}`;
if (eventData?.task_id) interactionDesc += ` (任务ID: ${eventData.task_id})`;
const msgid = body.msgid ? String(body.msgid) : undefined;
// 去重
if (msgid && streamStore.getStreamByMsgId(msgid)) {
target.runtime.log?.(`[${accountId}] ws-event: template_card_event already processed msgid=${msgid}`);
return;
}
const streamId = streamStore.createStream({ msgid });
streamStore.markStarted(streamId);
streamStore.updateStream(streamId, (s: StreamState) => {
s.wsMode = true;
});
const syntheticMsg: WecomBotInboundMessage = {
msgid,
aibotid: body.aibotid,
chattype: body.chattype,
chatid: body.chatid,
from: body.from ? { userid: body.from.userid } : undefined,
msgtype: "text",
text: { content: interactionDesc },
};
let core: PluginRuntime;
try {
core = getWecomRuntime();
} catch {
target.runtime.error?.(`[${accountId}] ws-event: runtime not ready for template_card_event`);
streamStore.markFinished(streamId);
return;
}
// 由于卡片事件没有经过防抖队列,直接触发 flushPending 的等效操作
// 需要通过 addPendingMessage 注入,让现有管线处理
const userid = body.from?.userid ?? "unknown";
const chatId = body.chatid ?? userid;
const conversationKey = `wecom:${accountId}:${userid}:${chatId}`;
// 先清除之前创建的 stream(addPendingMessage 会创建新的)
// 直接用 addPendingMessage 复用完整管线
const enrichedTarget: WecomWebhookTarget = { ...target, core };
const { streamId: actualStreamId } = streamStore.addPendingMessage({
conversationKey,
target: enrichedTarget,
msg: syntheticMsg,
msgContent: interactionDesc,
nonce: "",
timestamp: String(Date.now()),
debounceMs: 0, // 卡片事件不防抖
});
streamStore.updateStream(actualStreamId, (s: StreamState) => {
s.wsMode = true;
});
watchStreamReply({
wsClient,
frame,
streamId: actualStreamId,
log: (msg) => target.runtime.log?.(`[${accountId}] ${msg}`),
error: (msg) => target.runtime.error?.(`[${accountId}] ${msg}`),
});
});
// 反馈事件 → 仅记录日志
wsClient.on("event.feedback_event", (frame) => {
target.runtime.log?.(
`[${accountId}] ws-event: feedback_event received (logged only)`,
);
});
}
// ─── WSClient Lifecycle ────────────────────────────────────────────────
export type StartWsClientParams = {
accountId: string;
botId: string;
secret: string;
account: ResolvedBotAccount;
config: OpenClawConfig;
runtime: WecomRuntimeEnv;
core: PluginRuntime;
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
welcomeText?: string;
network?: WecomNetworkConfig;
};
/**
* 启动 WebSocket 长链接客户端
* @returns cleanup 函数(用于注销)
*/
export function startWsClient(params: StartWsClientParams): () => void {
const {
accountId, botId, secret,
account, config, runtime, core,
statusSink, welcomeText,
} = params;
// 如果已有实例,先停止
stopWsClient(accountId);
const wsClient = new WSClient({
botId,
secret,
maxReconnectAttempts: -1, // 无限重连
logger: {
debug: (msg: string) => runtime.log?.(`[${accountId}][ws-sdk] ${msg}`),
info: (msg: string) => runtime.log?.(`[${accountId}][ws-sdk] ${msg}`),
warn: (msg: string) => runtime.log?.(`[${accountId}][ws-sdk] WARN: ${msg}`),
error: (msg: string) => runtime.error?.(`[${accountId}][ws-sdk] ERROR: ${msg}`),
},
});
wsClients.set(accountId, wsClient);
// 构建 WecomWebhookTarget 以复用 monitor 管线
const target: WecomWebhookTarget = {
account,
config,
runtime,
core,
path: `ws://${accountId}`,
statusSink,
};
// 设置消息和事件处理
setupMessageHandler({ wsClient, accountId, target });
setupEventHandler({ wsClient, accountId, target, welcomeText });
// 连接状态日志
wsClient.on("connected", () => {
runtime.log?.(`[${accountId}] ws: connected`);
});
wsClient.on("authenticated", () => {
runtime.log?.(`[${accountId}] ws: authenticated successfully`);
// 认证成功后拉取 MCP 配置(非阻塞,失败仅记日志)
void fetchAndSaveMcpConfig(wsClient, accountId, runtime);
});
wsClient.on("disconnected", (reason: string) => {
runtime.log?.(`[${accountId}] ws: disconnected - ${reason}`);
});
wsClient.on("reconnecting", (attempt: number) => {
runtime.log?.(`[${accountId}] ws: reconnecting attempt=${attempt}`);
});
wsClient.on("error", (err: Error) => {
runtime.error?.(`[${accountId}] ws: error - ${err.message}`);
});
// 建立连接
wsClient.connect();
runtime.log?.(`[${accountId}] ws: starting connection (botId=${botId})`);
// 返回清理函数
return () => {
stopWsClient(accountId);
};
}
/**
* 停止指定账号的 WSClient
*/
export function stopWsClient(accountId: string): void {
const existing = wsClients.get(accountId);
if (existing) {
existing.disconnect();
wsClients.delete(accountId);
}
}