import WebSocket from 'ws'; import { handleQiaoqiaoMessage, type QiaoqiaoMessageEvent } from './bot.js'; import { listQiaoqiaoAccountIds, resolveQiaoqiaoAccount } from './accounts.js'; import { clearReplyCapture, consumeReplyCapture, runWithReplyRequestContext, startReplyCapture, } from './reply-handlers.js'; import type { ClawdbotConfig, RuntimeEnv } from 'openclaw/plugin-sdk'; import type { QiaoqiaoConfig } from './types.js'; const DEFAULT_BACKEND_WS_URL = 'wss://ws.qiaoqiao.social/qiaoqiao-ws'; const RECONNECT_BASE_DELAY_MS = 1000; const RECONNECT_MAX_DELAY_MS = 30000; const REGISTER_ACK_TIMEOUT_MS = 10000; const HEARTBEAT_INTERVAL_MS = 25000; const HEARTBEAT_ACK_TIMEOUT_MS = 12000; const MAX_MISSED_HEARTBEATS = 2; const DEFAULT_MESSAGE_REPLY_TIMEOUT_MS = 180000; const REQUEST_DEDUP_TTL_MS = 2 * 60 * 1000; let channelSocket: WebSocket | null = null; let reconnectTimer: NodeJS.Timeout | null = null; let isConnecting = false; let isRegistered = false; let registerAckTimer: NodeJS.Timeout | null = null; let heartbeatTimer: NodeJS.Timeout | null = null; let heartbeatAckTimer: NodeJS.Timeout | null = null; let reconnectAttempt = 0; let missedHeartbeatAcks = 0; const processingRequestIds = new Set(); const recentReplies = new Map(); function pruneRecentReplies() { const cutoff = Date.now() - REQUEST_DEDUP_TTL_MS; for (const [requestId, record] of recentReplies.entries()) { if (record.at < cutoff) { recentReplies.delete(requestId); } } } function clearSocketTimers() { if (registerAckTimer) { clearTimeout(registerAckTimer); registerAckTimer = null; } if (heartbeatTimer) { clearInterval(heartbeatTimer); heartbeatTimer = null; } if (heartbeatAckTimer) { clearTimeout(heartbeatAckTimer); heartbeatAckTimer = null; } } function scheduleHeartbeat(ws: WebSocket) { if (heartbeatTimer) { clearInterval(heartbeatTimer); } heartbeatTimer = setInterval(() => { if (ws.readyState !== WebSocket.OPEN || !isRegistered) { return; } ws.send(JSON.stringify({ type: 'heartbeat', ts: Date.now() })); if (heartbeatAckTimer) { clearTimeout(heartbeatAckTimer); } heartbeatAckTimer = setTimeout(() => { missedHeartbeatAcks += 1; console.warn( `[Qiaoqiao Plugin] Heartbeat ACK timeout (${missedHeartbeatAcks}/${MAX_MISSED_HEARTBEATS})`, ); if (missedHeartbeatAcks >= MAX_MISSED_HEARTBEATS) { console.warn('[Qiaoqiao Plugin] Consecutive heartbeat ACK timeouts reached threshold, closing socket'); ws.close(); } }, HEARTBEAT_ACK_TIMEOUT_MS); }, HEARTBEAT_INTERVAL_MS); } function resolveBackendWsUrl(cfg: ClawdbotConfig): string { const qiaoqiaoCfg = cfg.channels?.qiaoqiao as QiaoqiaoConfig | undefined; const topLevel = qiaoqiaoCfg?.backendWsUrl?.trim(); if (topLevel) return topLevel; const accountIds = listQiaoqiaoAccountIds(cfg); for (const accountId of accountIds) { const account = resolveQiaoqiaoAccount({ cfg, accountId }); const accountLevel = account.config.backendWsUrl?.trim(); if (accountLevel) return accountLevel; } return process.env.QIAOQIAO_BACKEND_WS_URL?.trim() || DEFAULT_BACKEND_WS_URL; } function resolveRegisterAgentIds(cfg: ClawdbotConfig): string[] { // 获取所有账户 const accountIds = listQiaoqiaoAccountIds(cfg); const agentIds: string[] = []; // 解析每个账户的 appId for (const accountId of accountIds) { const account = resolveQiaoqiaoAccount({ cfg, accountId }); if (account.appId) { agentIds.push(account.appId); } } // 从环境变量获取额外的 agentIds const fromEnvSingle = process.env.QIAOQIAO_AGENT_ID ? [process.env.QIAOQIAO_AGENT_ID] : []; const fromEnvBatch = (process.env.QIAOQIAO_AGENT_IDS || '') .split(',') .map((v) => v.trim()) .filter(Boolean); const unique = new Set([...agentIds, ...fromEnvSingle, ...fromEnvBatch]); return [...unique]; } function resolveRegisterCredentials(cfg: ClawdbotConfig): { appId: string; appSecret: string } | null { const accountIds = listQiaoqiaoAccountIds(cfg); for (const accountId of accountIds) { const account = resolveQiaoqiaoAccount({ cfg, accountId }); if (account.appId && account.appSecret) { return { appId: account.appId, appSecret: account.appSecret }; } } const appId = process.env.QIAOQIAO_APP_ID?.trim(); const appSecret = process.env.QIAOQIAO_APP_SECRET?.trim(); if (appId && appSecret) return { appId, appSecret }; return null; } function resolveMessageReplyTimeoutMs(cfg: ClawdbotConfig, accountId?: string): number { const accountConfig = resolveQiaoqiaoAccount({ cfg, accountId }).config as any; const accountTimeoutMs = accountConfig?.replyTimeoutMs; if (Number.isFinite(accountTimeoutMs)) { return Math.max(0, Number(accountTimeoutMs)); } const envTimeoutMs = Number(process.env.QIAOQIAO_REPLY_TIMEOUT_MS); if (Number.isFinite(envTimeoutMs)) { return Math.max(0, envTimeoutMs); } return DEFAULT_MESSAGE_REPLY_TIMEOUT_MS; } async function withOptionalTimeout(promise: Promise, timeoutMs: number, message: string): Promise { if (timeoutMs <= 0) { return promise; } let timer: NodeJS.Timeout | null = null; try { return await Promise.race([ promise, new Promise((_, reject) => { timer = setTimeout(() => reject(new Error(message)), timeoutMs); }), ]); } finally { if (timer) { clearTimeout(timer); } } } function scheduleReconnect(cfg: ClawdbotConfig, runtime?: RuntimeEnv) { if (reconnectTimer) return; const expDelay = Math.min(RECONNECT_MAX_DELAY_MS, RECONNECT_BASE_DELAY_MS * (2 ** reconnectAttempt)); const jitter = Math.floor(Math.random() * Math.max(1, Math.floor(expDelay * 0.3))); const delayMs = expDelay + jitter; reconnectAttempt += 1; console.warn( `[Qiaoqiao Plugin] Reconnect scheduled in ${delayMs}ms (attempt=${reconnectAttempt}, base=${expDelay}, jitter=${jitter})`, ); reconnectTimer = setTimeout(() => { reconnectTimer = null; startQiaoqiaoPluginSocketClient(cfg, runtime); }, delayMs); } export function startQiaoqiaoPluginSocketClient( cfg: ClawdbotConfig, runtime?: RuntimeEnv ) { if (channelSocket && channelSocket.readyState === WebSocket.OPEN) { return channelSocket; } if (isConnecting) { return channelSocket; } isConnecting = true; isRegistered = false; const registerAgentIds = resolveRegisterAgentIds(cfg); const registerCredentials = resolveRegisterCredentials(cfg); const backendWsUrl = resolveBackendWsUrl(cfg); const registrationState = { agentIds: registerAgentIds, hasCredentials: Boolean(registerCredentials?.appId && registerCredentials?.appSecret), appId: registerCredentials?.appId || "", hasAppSecret: Boolean(registerCredentials?.appSecret), }; console.log(`[Qiaoqiao Plugin] Connecting channel socket to backend: ${backendWsUrl}`); console.log(`[Qiaoqiao Plugin] Registering agentIds: ${registerAgentIds.join(', ') || 'none'}`); console.log(`[Qiaoqiao Plugin] registration_state: ${JSON.stringify(registrationState)}`); if (!registerCredentials) { console.error('[Qiaoqiao Plugin] No credentials (appId/appSecret) found in config, cannot register'); } const ws = new WebSocket(backendWsUrl); channelSocket = ws; ws.on('open', () => { isConnecting = false; reconnectAttempt = 0; missedHeartbeatAcks = 0; clearSocketTimers(); registerAckTimer = setTimeout(() => { console.error('[Qiaoqiao Plugin] register_ack timeout, closing socket for reconnect'); ws.close(); }, REGISTER_ACK_TIMEOUT_MS); ws.send( JSON.stringify({ type: 'register_agent_socket', agentIds: registerAgentIds, appId: registerCredentials?.appId || '', appSecret: registerCredentials?.appSecret || '', }), ); }); ws.on('message', async (raw) => { let incomingRequestId: string | undefined; try { const message = JSON.parse(raw.toString()); incomingRequestId = typeof message.requestId === 'string' ? message.requestId : undefined; if (message.type === 'register_ack') { if (registerAckTimer) { clearTimeout(registerAckTimer); registerAckTimer = null; } if (message.success) { isRegistered = true; missedHeartbeatAcks = 0; console.log( `[Qiaoqiao Plugin] Agent socket registered: agentId=${message.agentId || 'unknown'} requested=${registerAgentIds.join(',') || 'none'}`, ); scheduleHeartbeat(ws); } else { isRegistered = false; console.error('[Qiaoqiao Plugin] Agent socket registration failed:', message.error || 'unknown'); ws.close(); } return; } if (message.type === 'heartbeat_ack') { if (heartbeatAckTimer) { clearTimeout(heartbeatAckTimer); heartbeatAckTimer = null; } missedHeartbeatAcks = 0; return; } if (message.type !== 'qiaoqiao_message') { return; } if (!isRegistered) { console.warn('[Qiaoqiao Plugin] Ignore message before registration ready'); return; } const requestId = incomingRequestId ?? ''; console.log('[Qiaoqiao Plugin] Received message with requestId:', requestId || 'missing'); if (!requestId) { throw new Error('Missing requestId in qiaoqiao_message'); } pruneRecentReplies(); if (processingRequestIds.has(requestId)) { ws.send( JSON.stringify({ type: 'qiaoqiao_ack', requestId, agentId: message.agentId, ack: '⏳', }), ); console.warn('[Qiaoqiao Plugin] Duplicate requestId in progress, skipped:', requestId); return; } const cachedReply = recentReplies.get(requestId); if (cachedReply) { ws.send( JSON.stringify({ type: 'qiaoqiao_ack', requestId, agentId: message.agentId, ack: '♻️', }), ); ws.send( JSON.stringify({ type: 'qiaoqiao_reply', requestId, agentId: message.agentId, ...cachedReply.payload, }), ); console.log('[Qiaoqiao Plugin] Replayed cached reply for duplicate requestId:', requestId); return; } processingRequestIds.add(requestId); ws.send( JSON.stringify({ type: 'qiaoqiao_ack', requestId, agentId: message.agentId, ack: '👀', }), ); const response = await handleQiaoqiaoPluginMessage(message.data, cfg, runtime, requestId, message.agentId); recentReplies.set(requestId, { at: Date.now(), payload: response }); console.log('[Qiaoqiao Plugin] Sending response with requestId:', requestId); ws.send( JSON.stringify({ type: 'qiaoqiao_reply', requestId, agentId: message.agentId, ...response, }), ); console.log('[Qiaoqiao Plugin] Response sent successfully'); processingRequestIds.delete(requestId); } catch (error: any) { console.error('[Qiaoqiao Plugin] Error processing backend message:', error); if (incomingRequestId) { processingRequestIds.delete(incomingRequestId); } ws.send( JSON.stringify({ type: 'qiaoqiao_error', requestId: incomingRequestId, success: false, error: error?.message || 'Unknown error', }), ); } }); ws.on('close', (code, reasonBuffer) => { const wasRegistered = isRegistered; isConnecting = false; isRegistered = false; channelSocket = null; clearSocketTimers(); const reason = String(reasonBuffer || '').trim(); console.warn( `[Qiaoqiao Plugin] Channel socket closed, scheduling reconnect code=${code} reason=${reason || 'none'} registered=${wasRegistered}`, ); scheduleReconnect(cfg, runtime); }); ws.on('error', (error) => { isConnecting = false; isRegistered = false; console.error('[Qiaoqiao Plugin] Channel socket error:', error); scheduleReconnect(cfg, runtime); }); return ws; } // 处理来自后端的 Qiaoqiao 消息 async function handleQiaoqiaoPluginMessage( messageData: any, cfg: ClawdbotConfig, runtime?: RuntimeEnv, requestId?: string, outerAgentId?: string, ) { try { const { event, agentId: innerAgentId } = messageData; const agentId = innerAgentId || outerAgentId; if (!requestId) { throw new Error('Missing requestId from backend message'); } // 构造 QiaoqiaoMessageEvent const qiaoqiaoEvent: QiaoqiaoMessageEvent = { sender: event.sender, message: event.message }; console.log('[Qiaoqiao Plugin] Processing message with OpenClaw bot handler'); startReplyCapture(requestId); const timeoutMs = resolveMessageReplyTimeoutMs(cfg, agentId); if (timeoutMs <= 0) { console.log('[Qiaoqiao Plugin] Reply timeout disabled for requestId:', requestId); } // 调用 OpenClaw bot 的消息处理函数 const chatHistories = new Map(); const botOpenId = 'bot_open_id'; await withOptionalTimeout( runWithReplyRequestContext(requestId, () => handleQiaoqiaoMessage({ cfg, event: qiaoqiaoEvent, botOpenId, runtime: runtime || { log: console.log, error: console.error, exit: () => {} }, chatHistories, accountId: agentId }), ), timeoutMs, `Reply timeout after ${timeoutMs}ms`, ); const reply = consumeReplyCapture(requestId); if (!reply.trim()) { throw new Error('No reply captured from agent dispatch'); } console.log('[Qiaoqiao Plugin] Reply captured for requestId:', requestId); return { success: true, reply: reply, agentInfo: { agentId: agentId, agentName: 'Qiaoqiao Agent', avatarName: 'AI Avatar' }, timestamp: new Date().toISOString() }; } catch (error) { console.error('[Qiaoqiao Plugin] Error handling message:', error); return { success: false, reply: '处理消息时出错', error: error.message }; } finally { if (requestId) { clearReplyCapture(requestId); } } } export default startQiaoqiaoPluginSocketClient;