import { applyBasicWebhookRequestGuards, type RuntimeEnv, installRequestBodyLimitGuard, } from "@hanzo/bot/plugin-sdk/feishu"; import * as Lark from "@larksuiteoapi/node-sdk"; import * as http from "http"; import type { ResolvedFeishuAccount } from "./types.js"; import { createFeishuWSClient } from "./client.js"; import { botNames, botOpenIds, FEISHU_WEBHOOK_BODY_TIMEOUT_MS, FEISHU_WEBHOOK_MAX_BODY_BYTES, feishuWebhookRateLimiter, httpServers, recordWebhookStatus, wsClients, } from "./monitor.state.js"; export type MonitorTransportParams = { account: ResolvedFeishuAccount; accountId: string; runtime?: RuntimeEnv; abortSignal?: AbortSignal; eventDispatcher: Lark.EventDispatcher; }; export async function monitorWebSocket({ account, accountId, runtime, abortSignal, eventDispatcher, }: MonitorTransportParams): Promise { const log = runtime?.log ?? console.log; log(`feishu[${accountId}]: starting WebSocket connection...`); const wsClient = createFeishuWSClient(account); wsClients.set(accountId, wsClient); return new Promise((resolve, reject) => { const cleanup = () => { wsClients.delete(accountId); botOpenIds.delete(accountId); botNames.delete(accountId); }; const handleAbort = () => { log(`feishu[${accountId}]: abort signal received, stopping`); cleanup(); resolve(); }; if (abortSignal?.aborted) { cleanup(); resolve(); return; } abortSignal?.addEventListener("abort", handleAbort, { once: true }); try { wsClient.start({ eventDispatcher }); log(`feishu[${accountId}]: WebSocket client started`); } catch (err) { cleanup(); abortSignal?.removeEventListener("abort", handleAbort); reject(err); } }); } export async function monitorWebhook({ account, accountId, runtime, abortSignal, eventDispatcher, }: MonitorTransportParams): Promise { const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; const port = account.config.webhookPort ?? 3000; const path = account.config.webhookPath ?? "/feishu/events"; const host = account.config.webhookHost ?? "127.0.0.1"; log(`feishu[${accountId}]: starting Webhook server on ${host}:${port}, path ${path}...`); const server = http.createServer(); const webhookHandler = Lark.adaptDefault(path, eventDispatcher, { autoChallenge: true }); server.on("request", (req, res) => { res.on("finish", () => { recordWebhookStatus(runtime, accountId, path, res.statusCode); }); const rateLimitKey = `${accountId}:${path}:${req.socket.remoteAddress ?? "unknown"}`; if ( !applyBasicWebhookRequestGuards({ req, res, rateLimiter: feishuWebhookRateLimiter, rateLimitKey, nowMs: Date.now(), requireJsonContentType: true, }) ) { return; } const guard = installRequestBodyLimitGuard(req, res, { maxBytes: FEISHU_WEBHOOK_MAX_BODY_BYTES, timeoutMs: FEISHU_WEBHOOK_BODY_TIMEOUT_MS, responseFormat: "text", }); if (guard.isTripped()) { return; } void Promise.resolve(webhookHandler(req, res)) .catch((err) => { if (!guard.isTripped()) { error(`feishu[${accountId}]: webhook handler error: ${String(err)}`); } }) .finally(() => { guard.dispose(); }); }); httpServers.set(accountId, server); return new Promise((resolve, reject) => { const cleanup = () => { server.close(); httpServers.delete(accountId); botOpenIds.delete(accountId); botNames.delete(accountId); }; const handleAbort = () => { log(`feishu[${accountId}]: abort signal received, stopping Webhook server`); cleanup(); resolve(); }; if (abortSignal?.aborted) { cleanup(); resolve(); return; } abortSignal?.addEventListener("abort", handleAbort, { once: true }); server.listen(port, host, () => { log(`feishu[${accountId}]: Webhook server listening on ${host}:${port}`); }); server.on("error", (err) => { error(`feishu[${accountId}]: Webhook server error: ${err}`); abortSignal?.removeEventListener("abort", handleAbort); reject(err); }); }); }