import http from "http"; import type { ClawdbotConfig, RuntimeEnv, HistoryEntry } from "openclaw/plugin-sdk"; import type { PopoConfig } from "./types.js"; import { resolvePopoCredentials } from "./accounts.js"; import { verifySignature, decryptMessage, encryptMessage } from "./crypto.js"; import { handlePopoMessage, type PopoMessageEvent } from "./bot.js"; import { probePopo } from "./probe.js"; export type MonitorPopoOpts = { config?: ClawdbotConfig; runtime?: RuntimeEnv; abortSignal?: AbortSignal; accountId?: string; }; let currentServer: http.Server | null = null; export async function monitorPopoProvider(opts: MonitorPopoOpts = {}): Promise { const cfg = opts.config; if (!cfg) { throw new Error("Config is required for POPO monitor"); } const popoCfg = cfg.channels?.popo as PopoConfig | undefined; const creds = resolvePopoCredentials(popoCfg); if (!creds) { throw new Error("POPO credentials not configured (appKey, appSecret required)"); } const log = opts.runtime?.log ?? console.log; const error = opts.runtime?.error ?? console.error; // Verify credentials by getting a token const probeResult = await probePopo(popoCfg); if (!probeResult.ok) { throw new Error(`POPO probe failed: ${probeResult.error}`); } log(`popo: credentials verified for appKey ${probeResult.appKey}`); const webhookPath = popoCfg?.webhookPath ?? "/popo/events"; const webhookPort = popoCfg?.webhookPort ?? 3001; const chatHistories = new Map(); return new Promise((resolve, reject) => { const server = http.createServer(async (req, res) => { // Handle CORS preflight if (req.method === "OPTIONS") { res.writeHead(200, { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type, Authorization", }); res.end(); return; } const url = new URL(req.url ?? "/", `http://${req.headers.host}`); // Only handle the webhook path if (url.pathname !== webhookPath) { res.writeHead(404); res.end("Not Found"); return; } // Handle URL validation (GET request) if (req.method === "GET") { const nonce = url.searchParams.get("nonce"); const timestamp = url.searchParams.get("timestamp"); const signature = url.searchParams.get("signature"); if (nonce && timestamp && signature && creds.token) { const valid = verifySignature({ token: creds.token, nonce, timestamp, signature, }); if (valid) { log(`popo: URL validation successful`); res.writeHead(200, { "Content-Type": "text/plain" }); res.end(nonce); return; } } res.writeHead(400); res.end("Invalid validation request"); return; } // Handle webhook event (POST request) if (req.method === "POST") { try { const body = await readRequestBody(req); const payload = JSON.parse(body); // Check for encrypted payload let eventData: unknown; if (payload.encrypt && creds.aesKey) { // Verify signature first const { nonce, timestamp, signature } = payload; if (nonce && timestamp && signature && creds.token) { const valid = verifySignature({ token: creds.token, nonce, timestamp, signature, }); if (!valid) { log(`popo: invalid signature in webhook event`); res.writeHead(403); res.end("Invalid signature"); return; } } // Decrypt the message const decrypted = decryptMessage(payload.encrypt, creds.aesKey); eventData = JSON.parse(decrypted); } else { eventData = payload; } const event = eventData as { eventType?: string }; // Handle valid_url event (inline URL validation) if (event.eventType === "valid_url") { log(`popo: received valid_url event`); const response = { eventType: "valid_url" }; if (creds.aesKey) { const encrypted = encryptMessage(JSON.stringify(response), creds.aesKey); res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ encrypt: encrypted })); } else { res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify(response)); } return; } // Handle message events if ( event.eventType === "IM_P2P_TO_ROBOT_MSG" || event.eventType === "IM_CHAT_TO_ROBOT_AT_MSG" ) { const messageEvent = eventData as PopoMessageEvent; log(`popo: received ${event.eventType} event`); // Process message asynchronously handlePopoMessage({ cfg, event: messageEvent, runtime: opts.runtime, chatHistories, }).catch((err) => { error(`popo: error handling message: ${String(err)}`); }); } // Handle ACTION events (card interactions) if (event.eventType === "ACTION") { log(`popo: received ACTION event`); // TODO: Implement card action handling if needed } // Return success response const successResponse = { success: true }; if (creds.aesKey) { const encrypted = encryptMessage(JSON.stringify(successResponse), creds.aesKey); res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ encrypt: encrypted })); } else { res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify(successResponse)); } } catch (err) { error(`popo: error processing webhook: ${String(err)}`); res.writeHead(500); res.end("Internal Server Error"); } return; } res.writeHead(405); res.end("Method Not Allowed"); }); currentServer = server; const cleanup = () => { if (currentServer === server) { server.close(); currentServer = null; } }; const handleAbort = () => { log("popo: abort signal received, stopping webhook server"); cleanup(); resolve(); }; if (opts.abortSignal?.aborted) { cleanup(); resolve(); return; } opts.abortSignal?.addEventListener("abort", handleAbort, { once: true }); server.on("error", (err) => { cleanup(); opts.abortSignal?.removeEventListener("abort", handleAbort); reject(err); }); server.listen(webhookPort, () => { log(`popo: webhook server started on port ${webhookPort}, path ${webhookPath}`); }); }); } function readRequestBody(req: http.IncomingMessage): Promise { return new Promise((resolve, reject) => { const chunks: Buffer[] = []; req.on("data", (chunk) => chunks.push(chunk)); req.on("end", () => resolve(Buffer.concat(chunks).toString("utf8"))); req.on("error", reject); }); } export function stopPopoMonitor(): void { if (currentServer) { currentServer.close(); currentServer = null; } }