import * as http from "http"; import { type ClawdbotConfig, type RuntimeEnv, type HistoryEntry, installRequestBodyLimitGuard, } from "openclaw/plugin-sdk"; import type { ResolvedQiaoqiaoAccount } from "./types.js"; import { createEventDispatcher } from "./client.js"; import { resolveQiaoqiaoAccount, listEnabledQiaoqiaoAccounts } from "./accounts.js"; import { handleQiaoqiaoMessage, type QiaoqiaoMessageEvent, type QiaoqiaoBotAddedEvent } from "./bot.js"; import { probeQiaoqiao } from "./probe.js"; import { resolveQiaoqiaoCredentials, verifyQiaoqiaoAgent, type QiaoqiaoAgentInfo, } from "./auth.js"; import { startQiaoqiaoPluginSocketClient } from "./plugin-websocket-server.js"; export type MonitorQiaoqiaoOpts = { config?: ClawdbotConfig; runtime?: RuntimeEnv; abortSignal?: AbortSignal; accountId?: string; }; // Per-account HTTP servers and bot info const httpServers = new Map(); const botOpenIds = new Map(); const QIAOQIAO_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024; const QIAOQIAO_WEBHOOK_BODY_TIMEOUT_MS = 30_000; async function fetchBotOpenId( account: ResolvedQiaoqiaoAccount, ): Promise { try { const result = await probeQiaoqiao(account); return result.ok ? result.botOpenId : undefined; } catch { return undefined; } } /** * Per-chat serial queue that ensures messages from the same chat are processed * in arrival order while allowing different chats to run concurrently. * Uses .then(task, task) so a failure in one message never blocks the next. */ function createChatQueue() { const queues = new Map>(); return (chatId: string, task: () => Promise): Promise => { const prev = queues.get(chatId) ?? Promise.resolve(); const next = prev.then(task, task); queues.set(chatId, next); void next.finally(() => { if (queues.get(chatId) === next) { queues.delete(chatId); } }); return next; }; } /** * Register event handlers for Qiaoqiao. * Qiaoqiao uses custom WebSocket connection, not SDK's EventDispatcher. */ function registerEventHandlers( context: { cfg: ClawdbotConfig; accountId: string; runtime?: RuntimeEnv; chatHistories: Map; fireAndForget?: boolean; }, ) { const { cfg, accountId, runtime, chatHistories, fireAndForget } = context; const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; // Event handlers are registered in plugin-websocket-server.ts // This function is kept for compatibility but doesn't register SDK events log(`qiaoqiao[${accountId}]: Event handlers registered via custom WebSocket`); } type MonitorAccountParams = { cfg: ClawdbotConfig; account: ResolvedQiaoqiaoAccount; runtime?: RuntimeEnv; abortSignal?: AbortSignal; }; /** * Monitor a single Qiaoqiao account. */ async function monitorSingleAccount(params: MonitorAccountParams): Promise { const { cfg, account, runtime, abortSignal } = params; const { accountId } = account; const log = runtime?.log ?? console.log; const errorLog = runtime?.error ?? console.error; // --- Qiaoqiao agent identity verification (optional, only when credentials are set) --- const qiaoqiaoCreds = resolveQiaoqiaoCredentials({ appId: account.appId, appSecret: account.appSecret, apiBase: account.config.apiBase, }); let qiaoqiaoAgent: QiaoqiaoAgentInfo | null = null; if (qiaoqiaoCreds) { try { qiaoqiaoAgent = await verifyQiaoqiaoAgent(qiaoqiaoCreds, log); log(`qiaoqiao[${accountId}]: Qiaoqiao agent verified: ${qiaoqiaoAgent.username}`); } catch (err) { errorLog(`qiaoqiao[${accountId}]: Qiaoqiao agent verification failed: ${String(err)}`); errorLog(`qiaoqiao[${accountId}]: Proceeding without Qiaoqiao integration.`); } } // Fetch bot open_id const botOpenId = await fetchBotOpenId(account); botOpenIds.set(accountId, botOpenId ?? ""); log(`qiaoqiao[${accountId}]: bot open_id resolved: ${botOpenId ?? "unknown"}`); const connectionMode = account.config.connectionMode ?? "websocket"; const chatHistories = new Map(); registerEventHandlers({ cfg, accountId, runtime, chatHistories, fireAndForget: connectionMode === "webhook", }); if (connectionMode === "webhook") { return monitorWebhook({ params, accountId }); } return monitorWebSocket({ params, accountId }); } type ConnectionParams = { params: MonitorAccountParams; accountId: string; }; async function monitorWebSocket({ params, accountId }: ConnectionParams): Promise { const { account, runtime, abortSignal } = params; const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; log(`qiaoqiao[${accountId}]: WebSocket connection handled by plugin-websocket-server`); // Qiaoqiao uses custom WebSocket connection via plugin-websocket-server // No need to create SDK WebSocket client return new Promise((resolve, reject) => { const cleanup = () => { botOpenIds.delete(accountId); }; const handleAbort = () => { log(`qiaoqiao[${accountId}]: abort signal received, stopping`); cleanup(); resolve(); }; if (abortSignal?.aborted) { cleanup(); resolve(); return; } abortSignal?.addEventListener("abort", handleAbort, { once: true }); // WebSocket connection is managed by plugin-websocket-server // Just keep the monitor alive log(`qiaoqiao[${accountId}]: WebSocket monitor active`); }); } async function monitorWebhook({ params, accountId }: ConnectionParams): Promise { const { account, runtime, abortSignal } = params; const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; const port = account.config.webhookPort ?? 3000; const path = account.config.webhookPath ?? "/qiaoqiao/events"; log(`qiaoqiao[${accountId}]: starting Webhook server on port ${port}, path ${path}...`); const server = http.createServer(); // TODO: Implement Qiaoqiao webhook handler server.on("request", (req, res) => { if (req.url === path && req.method === 'POST') { // Handle Qiaoqiao webhook events res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('OK'); } else { res.writeHead(404); res.end('Not Found'); } }); httpServers.set(accountId, server); return new Promise((resolve, reject) => { const cleanup = () => { server.close(); httpServers.delete(accountId); botOpenIds.delete(accountId); }; const handleAbort = () => { log(`qiaoqiao[${accountId}]: abort signal received, stopping Webhook server`); cleanup(); resolve(); }; if (abortSignal?.aborted) { cleanup(); resolve(); return; } abortSignal?.addEventListener("abort", handleAbort, { once: true }); server.listen(port, () => { log(`qiaoqiao[${accountId}]: Webhook server listening on port ${port}`); }); server.on("error", (err) => { error(`qiaoqiao[${accountId}]: Webhook server error: ${err}`); abortSignal?.removeEventListener("abort", handleAbort); reject(err); }); }); } /** * Main entry: start monitoring for all enabled accounts. */ export async function monitorQiaoqiaoProvider(opts: MonitorQiaoqiaoOpts = {}): Promise { console.log('[Qiaoqiao Monitor] Starting monitor with opts:', { hasConfig: !!opts.config, accountId: opts.accountId, hasRuntime: !!opts.runtime, abortSignalAborted: opts.abortSignal?.aborted }); const cfg = opts.config; if (!cfg) { console.error('[Qiaoqiao Monitor] Config is required for Qiaoqiao monitor'); throw new Error("Config is required for Qiaoqiao monitor"); } const log = opts.runtime?.log ?? console.log; console.log('[Qiaoqiao Monitor] Log function initialized'); // If accountId is specified, only monitor that account if (opts.accountId) { console.log('[Qiaoqiao Monitor] Monitoring single account:', opts.accountId); const account = resolveQiaoqiaoAccount({ cfg, accountId: opts.accountId }); console.log('[Qiaoqiao Monitor] Account resolved:', { accountId: account.accountId, enabled: account.enabled, configured: account.configured, connectionMode: account.config?.connectionMode }); if (!account.enabled || !account.configured) { console.error('[Qiaoqiao Monitor] Account not configured or disabled:', opts.accountId); throw new Error(`Qiaoqiao account "${opts.accountId}" not configured or disabled`); } console.log('[Qiaoqiao Monitor] Starting single account monitor'); return monitorSingleAccount({ cfg, account, runtime: opts.runtime, abortSignal: opts.abortSignal, }); } // Otherwise, start all enabled accounts const accounts = listEnabledQiaoqiaoAccounts(cfg); if (accounts.length === 0) { throw new Error("No enabled Qiaoqiao accounts configured"); } log(`qiaoqiao: starting ${accounts.length} account(s): ${accounts.map((a) => a.accountId).join(", ")}`); // Start all accounts in parallel await Promise.all( accounts.map((account) => monitorSingleAccount({ cfg, account, runtime: opts.runtime, abortSignal: opts.abortSignal, }), ), ); } /** * Stop monitoring for a specific account or all accounts. */ export function stopQiaoqiaoMonitor(accountId?: string): void { if (accountId) { const server = httpServers.get(accountId); if (server) { server.close(); httpServers.delete(accountId); } botOpenIds.delete(accountId); } else { for (const server of httpServers.values()) { server.close(); } httpServers.clear(); botOpenIds.clear(); } }