import * as http from "http"; import crypto from "node:crypto"; import * as Lark from "@larksuiteoapi/node-sdk"; import { applyBasicWebhookRequestGuards, readJsonBodyWithLimit, type RuntimeEnv, installRequestBodyLimitGuard, } from "./nextclaw-sdk/feishu.js"; import { createFeishuWSClient } from "./client.js"; import { botNames, botOpenIds, FEISHU_WEBHOOK_BODY_TIMEOUT_MS, FEISHU_WEBHOOK_MAX_BODY_BYTES, feishuWebhookRateLimiter, httpServers, recordWebhookStatus, wsClients, } from "./monitor.state.js"; import type { ResolvedFeishuAccount } from "./types.js"; export type MonitorTransportParams = { account: ResolvedFeishuAccount; accountId: string; runtime?: RuntimeEnv; abortSignal?: AbortSignal; eventDispatcher: Lark.EventDispatcher; }; function isFeishuWebhookPayload(value: unknown): value is Record { return !!value && typeof value === "object" && !Array.isArray(value); } function buildFeishuWebhookEnvelope( req: http.IncomingMessage, payload: Record, ): Record { return Object.assign(Object.create({ headers: req.headers }), payload) as Record; } function isFeishuWebhookSignatureValid(params: { headers: http.IncomingHttpHeaders; payload: Record; encryptKey?: string; }): boolean { const encryptKey = params.encryptKey?.trim(); if (!encryptKey) { return true; } const timestampHeader = params.headers["x-lark-request-timestamp"]; const nonceHeader = params.headers["x-lark-request-nonce"]; const signatureHeader = params.headers["x-lark-signature"]; const timestamp = Array.isArray(timestampHeader) ? timestampHeader[0] : timestampHeader; const nonce = Array.isArray(nonceHeader) ? nonceHeader[0] : nonceHeader; const signature = Array.isArray(signatureHeader) ? signatureHeader[0] : signatureHeader; if (!timestamp || !nonce || !signature) { return false; } const computedSignature = crypto .createHash("sha256") .update(timestamp + nonce + encryptKey + JSON.stringify(params.payload)) .digest("hex"); return computedSignature === signature; } function respondText(res: http.ServerResponse, statusCode: number, body: string): void { res.statusCode = statusCode; res.setHeader("Content-Type", "text/plain; charset=utf-8"); res.end(body); } export async function monitorWebSocket({ account, accountId, runtime, abortSignal, eventDispatcher, }: MonitorTransportParams): Promise { const log = runtime?.log ?? console.log; log(`feishu[${accountId}]: starting WebSocket connection...`); const wsClient = createFeishuWSClient(account); wsClients.set(accountId, wsClient); return new Promise((resolve, reject) => { const cleanup = () => { wsClients.delete(accountId); botOpenIds.delete(accountId); botNames.delete(accountId); }; const handleAbort = () => { log(`feishu[${accountId}]: abort signal received, stopping`); cleanup(); resolve(); }; if (abortSignal?.aborted) { cleanup(); resolve(); return; } abortSignal?.addEventListener("abort", handleAbort, { once: true }); try { wsClient.start({ eventDispatcher }); log(`feishu[${accountId}]: WebSocket client started`); } catch (err) { cleanup(); abortSignal?.removeEventListener("abort", handleAbort); reject(err); } }); } export async function monitorWebhook({ account, accountId, runtime, abortSignal, eventDispatcher, }: MonitorTransportParams): Promise { const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; const port = account.config.webhookPort ?? 3000; const path = account.config.webhookPath ?? "/feishu/events"; const host = account.config.webhookHost ?? "127.0.0.1"; log(`feishu[${accountId}]: starting Webhook server on ${host}:${port}, path ${path}...`); const server = http.createServer(); server.on("request", (req, res) => { res.on("finish", () => { recordWebhookStatus(runtime, accountId, path, res.statusCode); }); const rateLimitKey = `${accountId}:${path}:${req.socket.remoteAddress ?? "unknown"}`; if ( !applyBasicWebhookRequestGuards({ req, res, rateLimiter: feishuWebhookRateLimiter, rateLimitKey, nowMs: Date.now(), requireJsonContentType: true, }) ) { return; } const guard = installRequestBodyLimitGuard(req, res, { maxBytes: FEISHU_WEBHOOK_MAX_BODY_BYTES, timeoutMs: FEISHU_WEBHOOK_BODY_TIMEOUT_MS, responseFormat: "text", }); if (guard.isTripped()) { return; } void (async () => { try { const bodyResult = await readJsonBodyWithLimit(req, { maxBytes: FEISHU_WEBHOOK_MAX_BODY_BYTES, timeoutMs: FEISHU_WEBHOOK_BODY_TIMEOUT_MS, }); if (guard.isTripped() || res.writableEnded) { return; } if (!bodyResult.ok) { if (bodyResult.code === "INVALID_JSON") { respondText(res, 400, "Invalid JSON"); } return; } if (!isFeishuWebhookPayload(bodyResult.value)) { respondText(res, 400, "Invalid JSON"); return; } // Lark's default adapter drops invalid signatures as an empty 200. Reject here instead. if ( !isFeishuWebhookSignatureValid({ headers: req.headers, payload: bodyResult.value, encryptKey: account.encryptKey, }) ) { respondText(res, 401, "Invalid signature"); return; } const { isChallenge, challenge } = Lark.generateChallenge(bodyResult.value, { encryptKey: account.encryptKey ?? "", }); if (isChallenge) { res.statusCode = 200; res.setHeader("Content-Type", "application/json; charset=utf-8"); res.end(JSON.stringify(challenge)); return; } const value = await eventDispatcher.invoke( buildFeishuWebhookEnvelope(req, bodyResult.value), { needCheck: false }, ); if (!res.headersSent) { res.statusCode = 200; res.setHeader("Content-Type", "application/json; charset=utf-8"); res.end(JSON.stringify(value)); } } catch (err) { if (!guard.isTripped()) { error(`feishu[${accountId}]: webhook handler error: ${String(err)}`); if (!res.headersSent) { respondText(res, 500, "Internal Server Error"); } } } finally { guard.dispose(); } })(); }); httpServers.set(accountId, server); return new Promise((resolve, reject) => { const cleanup = () => { server.close(); httpServers.delete(accountId); botOpenIds.delete(accountId); botNames.delete(accountId); }; const handleAbort = () => { log(`feishu[${accountId}]: abort signal received, stopping Webhook server`); cleanup(); resolve(); }; if (abortSignal?.aborted) { cleanup(); resolve(); return; } abortSignal?.addEventListener("abort", handleAbort, { once: true }); server.listen(port, host, () => { log(`feishu[${accountId}]: Webhook server listening on ${host}:${port}`); }); server.on("error", (err) => { error(`feishu[${accountId}]: Webhook server error: ${err}`); abortSignal?.removeEventListener("abort", handleAbort); reject(err); }); }); }