/** * OpenAI Realtime voice provider. * * Uses WebSocket to connect to OpenAI's Realtime API for bidirectional * audio streaming (speech-to-speech). This is the one capability the * Vercel AI SDK does not cover. * * Reference: .mastra-ref/voice/openai-realtime-api/src/index.ts */ import { EventEmitter } from "node:events"; import { PassThrough } from "node:stream"; import { Effect, Exit, Scope } from "effect"; import type { VoiceProvider, SpeakOptions, ListenOptions, SendOptions, VoiceEventType, VoiceEventCallback, } from "./types"; // --------------------------------------------------------------------------- // Constants // --------------------------------------------------------------------------- const DEFAULT_VOICE = "alloy"; const DEFAULT_MODEL = "gpt-4o-mini-realtime-preview-2024-12-17"; const DEFAULT_URL = "wss://api.openai.com/v1/realtime"; const RESPONSE_TIMEOUT_MS = 30_000; const VOICES = [ "alloy", "ash", "ballad", "coral", "echo", "sage", "shimmer", "verse", ] as const; // --------------------------------------------------------------------------- // Config // --------------------------------------------------------------------------- export type OpenAIRealtimeVoiceConfig = { /** OpenAI API key. Falls back to OPENAI_API_KEY env var. */ apiKey?: string; /** Realtime model ID. */ model?: string; /** WebSocket URL override. */ url?: string; /** Default speaker voice. */ speaker?: string; /** Transcription model for input audio. */ transcriber?: string; /** Enable debug logging. */ debug?: boolean; }; // --------------------------------------------------------------------------- // Internal types // --------------------------------------------------------------------------- type StreamWithId = PassThrough & { id: string }; type EventMap = Record; type ManagedRealtimeVoiceProvider = VoiceProvider & { connectEffect?: ( options?: Record, ) => Effect.Effect; }; type ConnectionResource = { socket: import("ws").WebSocket; teardown: () => void; closed: boolean; }; // --------------------------------------------------------------------------- // Factory // --------------------------------------------------------------------------- export function createOpenAIRealtimeVoice( config: OpenAIRealtimeVoiceConfig = {}, ): VoiceProvider { let ws: import("ws").WebSocket | undefined; let state: "connecting" | "open" | "close" = "close"; let activeConnection: ConnectionResource | undefined; let connectionScope: Scope.CloseableScope | undefined; let connectPromise: Promise | undefined; const client = new EventEmitter(); const events: EventMap = {}; const queue: unknown[] = []; const speakerStreams = new Map(); const speaker = config.speaker ?? DEFAULT_VOICE; const transcriber = config.transcriber ?? "whisper-1"; const debug = config.debug ?? false; // -- helpers -- function emit(event: string, ...args: any[]): void { const cbs = events[event]; if (!cbs) return; for (const cb of cbs) { cb(...args); } } function addEventListener(event: string, callback: VoiceEventCallback): void { if (!events[event]) { events[event] = []; } events[event]!.push(callback); } function removeEventListener( event: string, callback: VoiceEventCallback, ): void { const cbs = events[event]; if (!cbs) return; const idx = cbs.indexOf(callback); if (idx !== -1) cbs.splice(idx, 1); } function toError(error: unknown, fallbackMessage: string): Error { if (error instanceof Error) { return error; } if (typeof error === "string" && error.length > 0) { return new Error(error); } return new Error(fallbackMessage); } function sendEvent(type: string, data: any): void { const payload = { type, ...data }; if (!ws || ws.readyState !== ws.OPEN) { queue.push(payload); } else { ws.send(JSON.stringify(payload)); } } function int16ToBase64(int16: Int16Array): string { const buffer = new ArrayBuffer(int16.length * 2); const view = new DataView(buffer); for (let i = 0; i < int16.length; i++) { view.setInt16(i * 2, int16[i]!, true); } const uint8 = new Uint8Array(buffer); let binary = ""; for (let i = 0; i < uint8.length; i++) { binary += String.fromCharCode(uint8[i]!); } return btoa(binary); } function cleanupSpeakerStream(responseId: string, error?: Error): void { const stream = speakerStreams.get(responseId); if (!stream) return; speakerStreams.delete(responseId); if (error) { stream.destroy(error); return; } if (!stream.destroyed) { stream.end(); } } function cleanupSpeakerStreams(error?: Error): void { for (const responseId of [...speakerStreams.keys()]) { cleanupSpeakerStream(responseId, error); } } function clearConnectionState(socket?: import("ws").WebSocket): void { if (!socket || ws === socket) { ws = undefined; } if (!socket || activeConnection?.socket === socket) { activeConnection = undefined; } state = "close"; } async function closeSocket( socket: import("ws").WebSocket, ): Promise { if (socket.readyState === socket.CLOSED) { return; } await new Promise((resolve) => { let settled = false; let timeout: NodeJS.Timeout | undefined; const finish = () => { if (settled) return; settled = true; socket.off("close", finish); socket.off("error", finish); if (timeout) { clearTimeout(timeout); timeout = undefined; } resolve(); }; socket.once("close", finish); socket.once("error", finish); timeout = setTimeout(finish, 1_000); timeout.unref?.(); try { socket.close(); } catch { finish(); } }); } async function releaseConnection( connection: ConnectionResource, error?: Error, ): Promise { if (connection.closed) { return; } connection.closed = true; clearConnectionState(connection.socket); queue.length = 0; cleanupSpeakerStreams(error); connection.teardown(); await closeSocket(connection.socket); } async function closeStoredScope(): Promise { if (!connectionScope) { return; } const scope = connectionScope; connectionScope = undefined; await Effect.runPromise(Scope.close(scope, Exit.void)); } async function releaseActiveConnection(error?: Error): Promise { if (!activeConnection) { return; } const connection = activeConnection; activeConnection = undefined; await releaseConnection(connection, error); } function setupEventListeners( socket: import("ws").WebSocket, ): () => void { const onMessage = (message: Buffer | string) => { let data: any; try { data = JSON.parse(message.toString()); } catch (error) { client.emit( "error", toError(error, "Failed to parse OpenAI realtime WebSocket message"), ); return; } client.emit(data.type, data); if (debug) { const { delta, ...fields } = data; console.info(data.type, fields, delta?.length < 100 ? delta : ""); } }; const onClose = () => { clearConnectionState(socket); cleanupSpeakerStreams(); }; const onSocketError = (error: Error) => { clearConnectionState(socket); cleanupSpeakerStreams(error); client.emit("error", error); }; const onSessionCreated = (ev: any) => { emit("session.created", ev); const queued = queue.splice(0, queue.length); for (const item of queued) { socket.send(JSON.stringify(item)); } }; const onSessionUpdated = (ev: any) => { emit("session.updated", ev); }; const onResponseCreated = (ev: any) => { emit("response.created", ev); const stream = new PassThrough() as StreamWithId; stream.id = ev.response.id; const removeStream = () => { speakerStreams.delete(stream.id); }; stream.once("close", removeStream); stream.once("end", removeStream); stream.once("error", removeStream); speakerStreams.set(ev.response.id, stream); emit("speaker", stream); }; const onResponseAudioDelta = (ev: any) => { const audio = Buffer.from(ev.delta, "base64"); emit("speaking", { audio, response_id: ev.response_id }); const stream = speakerStreams.get(ev.response_id); stream?.write(audio); }; const onResponseAudioDone = (ev: any) => { emit("speaking.done", { response_id: ev.response_id }); cleanupSpeakerStream(ev.response_id); }; const onAudioTranscriptDelta = (ev: any) => { emit("writing", { text: ev.delta, response_id: ev.response_id, role: "assistant", }); }; const onAudioTranscriptDone = (ev: any) => { emit("writing", { text: "\n", response_id: ev.response_id, role: "assistant", }); }; const onResponseTextDelta = (ev: any) => { emit("writing", { text: ev.delta, response_id: ev.response_id, role: "assistant", }); }; const onResponseTextDone = (ev: any) => { emit("writing", { text: "\n", response_id: ev.response_id, role: "assistant", }); }; const onResponseDone = (ev: any) => { emit("response.done", ev); if (typeof ev.response?.id === "string") { cleanupSpeakerStream(ev.response.id); } }; const onClientError = (ev: any) => { emit("error", ev); }; socket.on("message", onMessage); socket.on("close", onClose); socket.on("error", onSocketError); client.on("session.created", onSessionCreated); client.on("session.updated", onSessionUpdated); client.on("response.created", onResponseCreated); client.on("response.audio.delta", onResponseAudioDelta); client.on("response.audio.done", onResponseAudioDone); client.on("response.audio_transcript.delta", onAudioTranscriptDelta); client.on("response.audio_transcript.done", onAudioTranscriptDone); client.on("response.text.delta", onResponseTextDelta); client.on("response.text.done", onResponseTextDone); client.on("response.done", onResponseDone); client.on("error", onClientError); return () => { socket.off("message", onMessage); socket.off("close", onClose); socket.off("error", onSocketError); client.off("session.created", onSessionCreated); client.off("session.updated", onSessionUpdated); client.off("response.created", onResponseCreated); client.off("response.audio.delta", onResponseAudioDelta); client.off("response.audio.done", onResponseAudioDone); client.off("response.audio_transcript.delta", onAudioTranscriptDelta); client.off("response.audio_transcript.done", onAudioTranscriptDone); client.off("response.text.delta", onResponseTextDelta); client.off("response.text.done", onResponseTextDone); client.off("response.done", onResponseDone); client.off("error", onClientError); }; } async function openConnection( _options?: Record, ): Promise { await closeStoredScope(); await releaseActiveConnection(); state = "connecting"; // Dynamic import ws to avoid hard dependency for users who don't need realtime const { WebSocket } = await import("ws"); const url = `${config.url ?? DEFAULT_URL}?model=${config.model ?? DEFAULT_MODEL}`; const apiKey = config.apiKey ?? process.env.OPENAI_API_KEY; if (!apiKey) { throw new Error("Missing OPENAI_API_KEY for OpenAI realtime voice"); } const socket = new WebSocket(url, undefined, { headers: { Authorization: "Bearer " + apiKey, "OpenAI-Beta": "realtime=v1", }, }); ws = socket; const connection: ConnectionResource = { socket, teardown: setupEventListeners(socket), closed: false, }; const connectionReady = new Promise((resolve, reject) => { let isOpen = false; let isSessionCreated = false; const cleanup = () => { socket.off("open", onOpen); socket.off("error", onError); client.off("session.created", onSessionCreated); }; const maybeResolve = () => { if (!isOpen || !isSessionCreated) { return; } cleanup(); resolve(); }; const onOpen = () => { isOpen = true; maybeResolve(); }; const onSessionCreated = () => { isSessionCreated = true; maybeResolve(); }; const onError = (error: Error) => { cleanup(); reject(error); }; socket.on("open", onOpen); socket.on("error", onError); client.on("session.created", onSessionCreated); }); try { await connectionReady; sendEvent("session.update", { session: { input_audio_transcription: { model: transcriber }, voice: speaker, }, }); state = "open"; activeConnection = connection; return connection; } catch (error) { await releaseConnection( connection, toError(error, "Failed to initialize OpenAI realtime connection"), ); throw error; } } function connectEffect(options?: Record) { return Effect.acquireRelease( Effect.tryPromise({ try: () => openConnection(options), catch: (cause) => toError(cause, "Failed to connect to OpenAI realtime voice"), }), (connection) => Effect.promise(() => releaseConnection(connection)).pipe( Effect.catchAll(() => Effect.void), ), ).pipe(Effect.asVoid); } // -- provider -- const provider: ManagedRealtimeVoiceProvider = { name: "openai-realtime", async speak( input: string | NodeJS.ReadableStream, options?: SpeakOptions, ): Promise { let text: string; if (typeof input !== "string") { const chunks: Buffer[] = []; for await (const chunk of input) { chunks.push( Buffer.isBuffer(chunk) ? chunk : Buffer.from(String(chunk)), ); } text = Buffer.concat(chunks).toString("utf-8"); } else { text = input; } if (text.trim().length === 0) { throw new Error("Input text is empty"); } // In realtime mode, speak sends a response.create event // The audio comes back through the 'speaker' event const audioPromise = new Promise( (resolve, reject) => { let timeout: NodeJS.Timeout | undefined; const onSpeaker = (stream: NodeJS.ReadableStream) => { cleanup(); resolve(stream); }; const onError = (error: unknown) => { cleanup(); reject(toError(error, "OpenAI realtime speak request failed")); }; const cleanup = () => { if (timeout) { clearTimeout(timeout); timeout = undefined; } removeEventListener("speaker", onSpeaker); removeEventListener("error", onError); }; timeout = setTimeout(() => { cleanup(); const error = new Error( `Timed out waiting for realtime speaker response after ${RESPONSE_TIMEOUT_MS}ms`, ); emit("error", error); reject(error); }, RESPONSE_TIMEOUT_MS); addEventListener("speaker", onSpeaker); addEventListener("error", onError); }, ); sendEvent("response.create", { response: { instructions: `Repeat the following text: ${text}`, voice: options?.speaker ?? speaker, }, }); return audioPromise; }, async listen( audioData: NodeJS.ReadableStream, _options?: ListenOptions, ): Promise { const chunks: Buffer[] = []; for await (const chunk of audioData) { const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as any); chunks.push(buffer); } const buffer = Buffer.concat(chunks); const int16 = new Int16Array( buffer.buffer, buffer.byteOffset, buffer.byteLength / 2, ); const base64Audio = int16ToBase64(int16); // Collect the text response let responseText = ""; const textPromise = new Promise((resolve, reject) => { let timeout: NodeJS.Timeout | undefined; const onWriting = (data: any) => { if (data.role !== "assistant") { return; } if (data.text === "\n") { cleanup(); resolve(responseText.trim()); return; } responseText += data.text; }; const onError = (error: unknown) => { cleanup(); reject(toError(error, "OpenAI realtime listen request failed")); }; const cleanup = () => { if (timeout) { clearTimeout(timeout); timeout = undefined; } removeEventListener("writing", onWriting); removeEventListener("error", onError); }; timeout = setTimeout(() => { cleanup(); const error = new Error( `Timed out waiting for realtime transcription after ${RESPONSE_TIMEOUT_MS}ms`, ); emit("error", error); reject(error); }, RESPONSE_TIMEOUT_MS); addEventListener("writing", onWriting); addEventListener("error", onError); }); sendEvent("conversation.item.create", { item: { type: "message", role: "user", content: [{ type: "input_audio", audio: base64Audio }], }, }); sendEvent("response.create", { response: { modalities: ["text"], instructions: "ONLY repeat the input and DO NOT say anything else", }, }); return textPromise; }, async send( audioData: NodeJS.ReadableStream | Int16Array | Buffer, options?: SendOptions, ): Promise { if (state !== "open") { console.warn("Cannot send audio when not connected. Call connect() first."); return; } if ( typeof (audioData as any)[Symbol.asyncIterator] === "function" || typeof (audioData as any).on === "function" ) { const stream = audioData as NodeJS.ReadableStream; for await (const chunk of stream) { try { const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as any); sendEvent("input_audio_buffer.append", { audio: buffer.toString("base64"), event_id: options?.eventId, }); } catch (err) { emit("error", err); } } } else if (audioData instanceof Int16Array) { try { const base64 = int16ToBase64(audioData); sendEvent("input_audio_buffer.append", { audio: base64, event_id: options?.eventId, }); } catch (err) { emit("error", err); } } else if (Buffer.isBuffer(audioData)) { try { sendEvent("input_audio_buffer.append", { audio: audioData.toString("base64"), event_id: options?.eventId, }); } catch (err) { emit("error", err); } } }, async connect(options?: Record): Promise { if (state === "open") { return; } if (connectPromise) { return connectPromise; } connectPromise = (async () => { const scope = await Effect.runPromise(Scope.make()); try { await Effect.runPromise( connectEffect(options).pipe( Effect.provideService(Scope.Scope, scope as any), ), ); connectionScope = scope; } catch (error) { await Effect.runPromise(Scope.close(scope, Exit.void)); throw error; } finally { connectPromise = undefined; } })(); return connectPromise; }, close(): void { const scope = connectionScope; connectionScope = undefined; if (scope) { void Effect.runPromise(Scope.close(scope, Exit.void)).catch(() => {}); return; } if (activeConnection) { void releaseConnection(activeConnection).catch(() => {}); } }, async answer(options?: Record): Promise { sendEvent("response.create", { response: options ?? {} }); }, on(event: E, callback: VoiceEventCallback): void { addEventListener(event as string, callback); }, off( event: E, callback: VoiceEventCallback, ): void { removeEventListener(event as string, callback); }, async getSpeakers(): Promise< Array<{ voiceId: string; [key: string]: unknown }> > { return VOICES.map((v) => ({ voiceId: v })); }, updateConfig(sessionConfig: Record): void { sendEvent("session.update", { session: sessionConfig }); }, connectEffect, }; return provider; }