import { spawn } from "node:child_process"; import http from "node:http"; import { URL } from "node:url"; import { isRequestBodyLimitError, readRequestBodyWithLimit, requestBodyErrorToText, } from "openclaw/plugin-sdk"; import type { WebSocket as NodeWebSocket } from "ws"; import { WebSocketServer } from "ws"; import type { VoiceCallConfig } from "./config.js"; import type { CoreConfig } from "./core-bridge.js"; import type { CallManager } from "./manager.js"; import type { MediaStreamConfig } from "./media-stream.js"; import { MediaStreamHandler } from "./media-stream.js"; import type { VoiceCallProvider } from "./providers/base.js"; import { OpenAIRealtimeSTTProvider } from "./providers/stt-openai-realtime.js"; import type { TwilioProvider } from "./providers/twilio.js"; import type { NormalizedEvent, WebhookContext } from "./types.js"; const MAX_WEBHOOK_BODY_BYTES = 1024 * 1024; /** * HTTP server for receiving voice call webhooks from providers. * Supports WebSocket upgrades for media streams when streaming is enabled. */ export class VoiceCallWebhookServer { private server: http.Server | null = null; private config: VoiceCallConfig; private manager: CallManager; private provider: VoiceCallProvider; private coreConfig: CoreConfig | null; private staleCallReaperInterval: ReturnType | null = null; /** Media stream handler for bidirectional audio (when streaming enabled, embedded mode) */ private mediaStreamHandler: MediaStreamHandler | null = null; /** Whether we're using the Realtime API for speech-to-speech (bypasses STT/TTS chain) */ private isRealtimeMode = false; /** Realtime WebSocket server for direct Twilio → OpenAI Realtime API bridging */ private realtimeWss: WebSocketServer | null = null; /** Active realtime sessions keyed by call SID */ private realtimeSessions = new Map void }>(); constructor( config: VoiceCallConfig, manager: CallManager, provider: VoiceCallProvider, coreConfig?: CoreConfig, ) { this.config = config; this.manager = manager; this.provider = provider; this.coreConfig = coreConfig ?? null; this.isRealtimeMode = config.responseMode === "realtime"; if (config.streaming?.enabled) { if (this.isRealtimeMode) { this.initializeRealtimeStreaming(); } else { this.initializeMediaStreaming(); } } } /** * Get the media stream handler (for wiring to provider). */ getMediaStreamHandler(): MediaStreamHandler | null { return this.mediaStreamHandler; } /** Pre-cached realtime session module (avoids dynamic import delay that drops Twilio messages) */ private realtimeModulePromise: Promise | null = null; /** * Initialize realtime streaming mode. * Pre-imports the realtime-session module so it's cached before the first call. */ private initializeRealtimeStreaming(): void { const apiKey = this.config.streaming?.openaiApiKey || process.env.OPENAI_API_KEY; if (!apiKey) { console.warn("[voice-call] Realtime mode enabled but no OpenAI API key found"); return; } this.realtimeModulePromise = import("./providers/realtime-session.js"); this.realtimeModulePromise .then(() => console.log("[voice-call] Realtime module pre-loaded")) .catch((err) => console.error("[voice-call] Realtime module pre-load failed:", err)); console.log("[voice-call] Realtime speech-to-speech mode initialized"); } /** * Handle WebSocket upgrade for realtime mode. * Creates a RealtimeCallSession that bridges Twilio <-> OpenAI Realtime API. */ private handleRealtimeUpgrade( request: http.IncomingMessage, socket: import("node:stream").Duplex, head: Buffer, ): void { if (!this.realtimeWss) { this.realtimeWss = new WebSocketServer({ noServer: true }); this.realtimeWss.on("connection", (ws, req) => { this.handleRealtimeConnection(ws as unknown as NodeWebSocket, req); }); } this.realtimeWss.handleUpgrade(request, socket, head, (ws) => { this.realtimeWss?.emit("connection", ws, request); }); } /** * Handle a new Twilio WebSocket connection in realtime mode. * Creates the RealtimeCallSession IMMEDIATELY so TwilioRealtimeTransportLayer * receives all Twilio protocol messages (connected, start, media) from the start. */ private handleRealtimeConnection(ws: NodeWebSocket, _request: http.IncomingMessage): void { console.log("[voice-call] [realtime] New WebSocket, creating session immediately"); const sessionKey = `realtime-${Date.now()}`; let callSid: string | null = null; const modulePromise = this.realtimeModulePromise || import("./providers/realtime-session.js"); modulePromise .then(({ createRealtimeCallSession }) => { const realtimeSession = createRealtimeCallSession({ twilioWebSocket: ws, voiceConfig: this.config, callId: sessionKey, from: "unknown", }); this.realtimeSessions.set(sessionKey, realtimeSession); }) .catch((err) => { console.error("[voice-call] [realtime] Failed to create session:", err); ws.close(1011, "Session creation failed"); }); ws.on("close", () => { const key = callSid || sessionKey; console.log(`[voice-call] [realtime] WebSocket closed for ${key}`); const session = this.realtimeSessions.get(key); if (session) { session.close(); this.realtimeSessions.delete(key); } if (callSid) { if (this.provider.name === "twilio") { (this.provider as TwilioProvider).unregisterCallStream(callSid); } const call = this.manager.getCallByProviderCallId(callSid); if (call) { void this.manager.endCall(call.callId).catch((err) => { console.warn(`[voice-call] [realtime] Failed to end call ${call.callId}:`, err); }); } } }); ws.on("error", (error) => { console.error("[voice-call] [realtime] WebSocket error:", error); }); } /** * Initialize media streaming with OpenAI Realtime STT. */ private initializeMediaStreaming(): void { const apiKey = this.config.streaming?.openaiApiKey || process.env.OPENAI_API_KEY; if (!apiKey) { console.warn("[voice-call] Streaming enabled but no OpenAI API key found"); return; } const sttProvider = new OpenAIRealtimeSTTProvider({ apiKey, model: this.config.streaming?.sttModel, silenceDurationMs: this.config.streaming?.silenceDurationMs, vadThreshold: this.config.streaming?.vadThreshold, }); const streamConfig: MediaStreamConfig = { sttProvider, shouldAcceptStream: ({ callId, token }) => { const call = this.manager.getCallByProviderCallId(callId); if (!call) { return false; } if (this.provider.name === "twilio") { const twilio = this.provider as TwilioProvider; if (!twilio.isValidStreamToken(callId, token)) { console.warn(`[voice-call] Rejecting media stream: invalid token for ${callId}`); return false; } } return true; }, onTranscript: (providerCallId, transcript) => { console.log(`[voice-call] Transcript for ${providerCallId}: ${transcript}`); // Clear TTS queue on barge-in (user started speaking, interrupt current playback) if (this.provider.name === "twilio") { (this.provider as TwilioProvider).clearTtsQueue(providerCallId); } // Look up our internal call ID from the provider call ID const call = this.manager.getCallByProviderCallId(providerCallId); if (!call) { console.warn(`[voice-call] No active call found for provider ID: ${providerCallId}`); return; } // Create a speech event and process it through the manager const event: NormalizedEvent = { id: `stream-transcript-${Date.now()}`, type: "call.speech", callId: call.callId, providerCallId, timestamp: Date.now(), transcript, isFinal: true, }; this.manager.processEvent(event); // Auto-respond in conversation mode (inbound always, outbound if mode is conversation) const callMode = call.metadata?.mode as string | undefined; const shouldRespond = call.direction === "inbound" || callMode === "conversation"; if (shouldRespond) { this.handleInboundResponse(call.callId, transcript).catch((err) => { console.warn(`[voice-call] Failed to auto-respond:`, err); }); } }, onSpeechStart: (providerCallId) => { if (this.provider.name === "twilio") { (this.provider as TwilioProvider).clearTtsQueue(providerCallId); } }, onPartialTranscript: (callId, partial) => { console.log(`[voice-call] Partial for ${callId}: ${partial}`); }, onConnect: (callId, streamSid) => { console.log(`[voice-call] Media stream connected: ${callId} -> ${streamSid}`); // Register stream with provider for TTS routing if (this.provider.name === "twilio") { (this.provider as TwilioProvider).registerCallStream(callId, streamSid); } // Speak initial message if one was provided when call was initiated // Use setTimeout to allow stream setup to complete setTimeout(() => { this.manager.speakInitialMessage(callId).catch((err) => { console.warn(`[voice-call] Failed to speak initial message:`, err); }); }, 500); }, onDisconnect: (callId) => { console.log(`[voice-call] Media stream disconnected: ${callId}`); // Auto-end call when media stream disconnects to prevent stuck calls. // Without this, calls can remain active indefinitely after the stream closes. const disconnectedCall = this.manager.getCallByProviderCallId(callId); if (disconnectedCall) { console.log( `[voice-call] Auto-ending call ${disconnectedCall.callId} on stream disconnect`, ); void this.manager.endCall(disconnectedCall.callId).catch((err) => { console.warn(`[voice-call] Failed to auto-end call ${disconnectedCall.callId}:`, err); }); } if (this.provider.name === "twilio") { (this.provider as TwilioProvider).unregisterCallStream(callId); } }, }; this.mediaStreamHandler = new MediaStreamHandler(streamConfig); console.log("[voice-call] Media streaming initialized"); } /** * Start the webhook server. */ async start(): Promise { const { port, bind, path: webhookPath } = this.config.serve; const streamPath = this.config.streaming?.streamPath || "/voice/stream"; return new Promise((resolve, reject) => { this.server = http.createServer((req, res) => { this.handleRequest(req, res, webhookPath).catch((err) => { console.error("[voice-call] Webhook error:", err); res.statusCode = 500; res.end("Internal Server Error"); }); }); // Handle WebSocket upgrades for media streams if (this.isRealtimeMode || this.mediaStreamHandler) { this.server.on("upgrade", (request, socket, head) => { const url = new URL(request.url || "/", `http://${request.headers.host}`); if (url.pathname === streamPath) { if (this.isRealtimeMode) { console.log("[voice-call] WebSocket upgrade for realtime session"); this.handleRealtimeUpgrade(request, socket, head); } else { console.log("[voice-call] WebSocket upgrade for media stream"); this.mediaStreamHandler?.handleUpgrade(request, socket, head); } } else { socket.destroy(); } }); } this.server.on("error", reject); this.server.listen(port, bind, () => { const url = `http://${bind}:${port}${webhookPath}`; console.log(`[voice-call] Webhook server listening on ${url}`); if (this.isRealtimeMode) { console.log( `[voice-call] Realtime speech-to-speech WebSocket on ws://${bind}:${port}${streamPath}`, ); } else if (this.mediaStreamHandler) { console.log(`[voice-call] Media stream WebSocket on ws://${bind}:${port}${streamPath}`); } resolve(url); // Start the stale call reaper if configured this.startStaleCallReaper(); }); }); } /** * Start a periodic reaper that ends calls older than the configured threshold. * Catches calls stuck in unexpected states (e.g., notify-mode calls that never * receive a terminal webhook from the provider). */ private startStaleCallReaper(): void { const maxAgeSeconds = this.config.staleCallReaperSeconds; if (!maxAgeSeconds || maxAgeSeconds <= 0) { return; } const CHECK_INTERVAL_MS = 30_000; // Check every 30 seconds const maxAgeMs = maxAgeSeconds * 1000; this.staleCallReaperInterval = setInterval(() => { const now = Date.now(); for (const call of this.manager.getActiveCalls()) { const age = now - call.startedAt; if (age > maxAgeMs) { console.log( `[voice-call] Reaping stale call ${call.callId} (age: ${Math.round(age / 1000)}s, state: ${call.state})`, ); void this.manager.endCall(call.callId).catch((err) => { console.warn(`[voice-call] Reaper failed to end call ${call.callId}:`, err); }); } } }, CHECK_INTERVAL_MS); } /** * Stop the webhook server. */ async stop(): Promise { if (this.staleCallReaperInterval) { clearInterval(this.staleCallReaperInterval); this.staleCallReaperInterval = null; } for (const session of this.realtimeSessions.values()) { session.close(); } this.realtimeSessions.clear(); return new Promise((resolve) => { if (this.server) { this.server.close(() => { this.server = null; resolve(); }); } else { resolve(); } }); } /** * Handle incoming HTTP request. */ private async handleRequest( req: http.IncomingMessage, res: http.ServerResponse, webhookPath: string, ): Promise { const url = new URL(req.url || "/", `http://${req.headers.host}`); // Check path if (!url.pathname.startsWith(webhookPath)) { res.statusCode = 404; res.end("Not Found"); return; } // Only accept POST if (req.method !== "POST") { res.statusCode = 405; res.end("Method Not Allowed"); return; } // Read body let body = ""; try { body = await this.readBody(req, MAX_WEBHOOK_BODY_BYTES); } catch (err) { if (isRequestBodyLimitError(err, "PAYLOAD_TOO_LARGE")) { res.statusCode = 413; res.end("Payload Too Large"); return; } if (isRequestBodyLimitError(err, "REQUEST_BODY_TIMEOUT")) { res.statusCode = 408; res.end(requestBodyErrorToText("REQUEST_BODY_TIMEOUT")); return; } throw err; } // Build webhook context const ctx: WebhookContext = { headers: req.headers as Record, rawBody: body, url: `http://${req.headers.host}${req.url}`, method: "POST", query: Object.fromEntries(url.searchParams), remoteAddress: req.socket.remoteAddress ?? undefined, }; // Verify signature const verification = this.provider.verifyWebhook(ctx); if (!verification.ok) { console.warn(`[voice-call] Webhook verification failed: ${verification.reason}`); res.statusCode = 401; res.end("Unauthorized"); return; } // Parse events const result = this.provider.parseWebhookEvent(ctx); // Process each event for (const event of result.events) { try { this.manager.processEvent(event); } catch (err) { console.error(`[voice-call] Error processing event ${event.type}:`, err); } } // Send response res.statusCode = result.statusCode || 200; if (result.providerResponseHeaders) { for (const [key, value] of Object.entries(result.providerResponseHeaders)) { res.setHeader(key, value); } } res.end(result.providerResponseBody || "OK"); } /** * Read request body as string with timeout protection. */ private readBody( req: http.IncomingMessage, maxBytes: number, timeoutMs = 30_000, ): Promise { return readRequestBodyWithLimit(req, { maxBytes, timeoutMs }); } private static readonly TOOL_HINT_PATTERNS = [ /\b(stripe|revenue|charges|gross|mtd|sales|refund|subscription)\b/i, /\b(check|look up|pull|get|show|what(?:'s| is| are| was| were))\b.*\b(number|metric|data|status|uptime|error|log)\b/i, /\b(yesterday|today|this week|this month|last month)\b.*\b(performance|revenue|gross|net)\b/i, /\b(search|google|find out|look into|news|happening|weather)\b/i, ]; private looksLikeToolQuery(message: string): boolean { return VoiceCallWebhookServer.TOOL_HINT_PATTERNS.some((p) => p.test(message)); } /** * Handle auto-response for inbound calls using the agent system. * Supports tool calling for richer voice interactions. * When a tool-heavy query is detected, speaks filler audio immediately * so the caller knows we're working on it. */ private async handleInboundResponse(callId: string, userMessage: string): Promise { console.log(`[voice-call] Auto-responding to inbound call ${callId}: "${userMessage}"`); const call = this.manager.getCall(callId); if (!call) { console.warn(`[voice-call] Call ${callId} not found for auto-response`); return; } if (!this.coreConfig) { console.warn("[voice-call] Core config missing; skipping auto-response"); return; } // Speak filler audio immediately for queries that likely need tools, // so the caller doesn't wait in silence while scripts run. if (this.looksLikeToolQuery(userMessage)) { this.manager.speak(callId, "Gimme a sec, let me check that for ya.").catch(() => {}); } try { const { generateVoiceResponse } = await import("./response-generator.js"); const result = await generateVoiceResponse({ voiceConfig: this.config, coreConfig: this.coreConfig, callId, from: call.from, transcript: call.transcript, userMessage, }); if (result.error) { console.error(`[voice-call] Response generation error: ${result.error}`); await this.manager.speak(callId, "Sorry, I hit a snag there. Could you try again?"); return; } if (result.text) { console.log(`[voice-call] AI response: "${result.text}"`); await this.manager.speak(callId, result.text); } else { console.warn(`[voice-call] Empty response for call ${callId} — speaking fallback`); await this.manager.speak(callId, "Hmm, I didn't quite get that. Could you say that again?"); } } catch (err) { console.error(`[voice-call] Auto-response error:`, err); try { await this.manager.speak(callId, "Sorry, something went wrong. Give me a moment."); } catch { // best-effort } } } } /** * Resolve the current machine's Tailscale DNS name. */ export type TailscaleSelfInfo = { dnsName: string | null; nodeId: string | null; }; /** * Run a tailscale command with timeout, collecting stdout. */ function runTailscaleCommand( args: string[], timeoutMs = 2500, ): Promise<{ code: number; stdout: string }> { return new Promise((resolve) => { const proc = spawn("tailscale", args, { stdio: ["ignore", "pipe", "pipe"], }); let stdout = ""; proc.stdout.on("data", (data) => { stdout += data; }); const timer = setTimeout(() => { proc.kill("SIGKILL"); resolve({ code: -1, stdout: "" }); }, timeoutMs); proc.on("close", (code) => { clearTimeout(timer); resolve({ code: code ?? -1, stdout }); }); }); } export async function getTailscaleSelfInfo(): Promise { const { code, stdout } = await runTailscaleCommand(["status", "--json"]); if (code !== 0) { return null; } try { const status = JSON.parse(stdout); return { dnsName: status.Self?.DNSName?.replace(/\.$/, "") || null, nodeId: status.Self?.ID || null, }; } catch { return null; } } export async function getTailscaleDnsName(): Promise { const info = await getTailscaleSelfInfo(); return info?.dnsName ?? null; } export async function setupTailscaleExposureRoute(opts: { mode: "serve" | "funnel"; path: string; localUrl: string; }): Promise { const dnsName = await getTailscaleDnsName(); if (!dnsName) { console.warn("[voice-call] Could not get Tailscale DNS name"); return null; } const { code } = await runTailscaleCommand([ opts.mode, "--bg", "--yes", "--set-path", opts.path, opts.localUrl, ]); if (code === 0) { const publicUrl = `https://${dnsName}${opts.path}`; console.log(`[voice-call] Tailscale ${opts.mode} active: ${publicUrl}`); return publicUrl; } console.warn(`[voice-call] Tailscale ${opts.mode} failed`); return null; } export async function cleanupTailscaleExposureRoute(opts: { mode: "serve" | "funnel"; path: string; }): Promise { await runTailscaleCommand([opts.mode, "off", opts.path]); } /** * Setup Tailscale serve/funnel for the webhook server. * This is a helper that shells out to `tailscale serve` or `tailscale funnel`. */ export async function setupTailscaleExposure(config: VoiceCallConfig): Promise { if (config.tailscale.mode === "off") { return null; } const mode = config.tailscale.mode === "funnel" ? "funnel" : "serve"; // Include the path suffix so tailscale forwards to the correct endpoint // (tailscale strips the mount path prefix when proxying) const localUrl = `http://127.0.0.1:${config.serve.port}${config.serve.path}`; return setupTailscaleExposureRoute({ mode, path: config.tailscale.path, localUrl, }); } /** * Cleanup Tailscale serve/funnel. */ export async function cleanupTailscaleExposure(config: VoiceCallConfig): Promise { if (config.tailscale.mode === "off") { return; } const mode = config.tailscale.mode === "funnel" ? "funnel" : "serve"; await cleanupTailscaleExposureRoute({ mode, path: config.tailscale.path }); }