/** * @module websocket * * WebSocket client for real-time queue message subscriptions. * * Provides both a callback-based API ({@link createQueueWebSocket}) and an * async iterator API ({@link subscribeToQueue}) for receiving messages from * a queue in real time via WebSocket. * * @example Callback-based API * ```typescript * import { createQueueWebSocket } from '@agentuity/server'; * * const connection = createQueueWebSocket({ * queueName: 'order-processing', * baseUrl: 'https://catalyst.agentuity.cloud', * onMessage: (message) => { * console.log('Received:', message.id, message.payload); * }, * onOpen: () => console.log('Connected'), * onClose: (code, reason) => console.log('Closed:', code, reason), * onError: (error) => console.error('Error:', error), * }); * * // Later: close the connection * connection.close(); * ``` * * @example Resuming from a previous session * ```typescript * import { createQueueWebSocket } from '@agentuity/server'; * * // Use a previously obtained clientId and lastOffset to resume * const connection = createQueueWebSocket({ * queueName: 'order-processing', * baseUrl: 'https://catalyst.agentuity.cloud', * clientId: previousClientId, * lastOffset: previousOffset, * onMessage: (message) => { * console.log('Received:', message.id, message.payload); * }, * }); * ``` * * @example Async iterator API * ```typescript * import { subscribeToQueue } from '@agentuity/server'; * * const controller = new AbortController(); * for await (const message of subscribeToQueue({ * queueName: 'order-processing', * baseUrl: 'https://catalyst.agentuity.cloud', * signal: controller.signal, * })) { * console.log('Received:', message.id, message.payload); * } * ``` */ import { z } from 'zod'; import { getEnv } from '@agentuity/config'; import type { Message } from './types.ts'; import { WebSocketAuthResponseSchema, WebSocketMessageSchema } from './types.ts'; import { QueueError } from './util.ts'; import { validateQueueName } from './validation.ts'; // ============================================================================ // Types // ============================================================================ /** Connection state for a queue WebSocket connection. */ export type QueueWebSocketState = | 'connecting' | 'authenticating' | 'connected' | 'reconnecting' | 'closed'; /** Options for creating a queue WebSocket subscription. */ export const QueueWebSocketOptionsSchema = z.object({ queueName: z.string().describe('Queue name to subscribe to'), apiKey: z .string() .optional() .describe('Optional API key override; falls back to AGENTUITY_SDK_KEY'), baseUrl: z.string().describe('Base Catalyst URL used to construct the WebSocket endpoint'), onMessage: z .custom<(message: Message) => void>() .describe('Callback invoked for each received queue message'), onOpen: z.custom<() => void>().optional().describe('Callback invoked after auth succeeds'), onClose: z .custom<(code: number, reason: string) => void>() .optional() .describe('Callback invoked when the socket closes'), onError: z .custom<(error: Error) => void>() .optional() .describe('Callback invoked when an error occurs'), autoReconnect: z .boolean() .optional() .describe('Whether to automatically reconnect after disconnection'), maxReconnectAttempts: z .number() .optional() .describe('Maximum reconnect attempts before giving up'), reconnectDelayMs: z.number().optional().describe('Initial reconnect delay in milliseconds'), maxReconnectDelayMs: z.number().optional().describe('Maximum reconnect delay in milliseconds'), clientId: z .string() .max(256) .optional() .describe( 'Optional client identifier (max 256 characters). Can be any string for your own identification. Reuse on reconnect for resume semantics.' ), lastOffset: z .number() .optional() .describe('Optional last processed offset used to resume stream consumption'), orgId: z .string() .optional() .describe('Optional org id for API keys requiring explicit org scope'), }); export type QueueWebSocketOptions = z.infer; /** Return type from {@link createQueueWebSocket}. */ export interface QueueWebSocketConnection { /** Close the WebSocket connection. Disables auto-reconnect. */ close(): void; /** The current connection state. */ readonly state: QueueWebSocketState; /** The client/subscription ID assigned by the server. Stable across reconnections. */ readonly clientId: string | undefined; /** The offset of the last message processed. */ readonly lastOffset: number | undefined; } /** Options for the async iterator queue subscription. */ export const SubscribeToQueueOptionsSchema = z.object({ queueName: z.string().describe('Queue name to subscribe to'), apiKey: z .string() .optional() .describe('Optional API key override; falls back to AGENTUITY_SDK_KEY'), baseUrl: z.string().describe('Base Catalyst URL used to construct the WebSocket endpoint'), signal: z.custom().optional().describe('AbortSignal used to stop the subscription'), clientId: z .string() .max(256) .optional() .describe( 'Optional client identifier (max 256 characters). Can be any string for your own identification. Reuse on reconnect for resume semantics.' ), lastOffset: z .number() .optional() .describe('Optional last processed offset used to resume stream consumption'), orgId: z .string() .optional() .describe('Optional org id for API keys requiring explicit org scope'), }); export type SubscribeToQueueOptions = z.infer; // ============================================================================ // Internal Helpers // ============================================================================ /** * Resolve the API key from the options or the AGENTUITY_SDK_KEY environment variable. * Throws a {@link QueueError} if no API key is available. */ function resolveApiKey(apiKey?: string): string { const key = apiKey ?? getEnv('AGENTUITY_SDK_KEY'); if (!key) { throw new QueueError({ message: 'No API key provided. Pass apiKey in options or set the AGENTUITY_SDK_KEY environment variable.', }); } return key; } /** * Convert an HTTP(S) base URL to a WebSocket URL and append the queue path. * * The WebSocket route is registered at `/queue/ws/{name}` (not versioned). */ function buildWebSocketUrl(baseUrl: string, queueName: string): string { const wsUrl = baseUrl.replace(/^https:\/\//, 'wss://').replace(/^http:\/\//, 'ws://'); // Remove trailing slash if present const base = wsUrl.replace(/\/$/, ''); return `${base}/queue/ws/${encodeURIComponent(queueName)}`; } // ============================================================================ // Callback-based API // ============================================================================ /** * Create a WebSocket connection to receive real-time messages from a queue. * * The connection handles authentication, automatic reconnection with exponential * backoff, and ping/pong keep-alive (handled automatically by the WebSocket * implementation). * * @param options - Configuration for the WebSocket connection * @returns A {@link QueueWebSocketConnection} handle for managing the connection * @throws {QueueError} If no API key is available * @throws {QueueValidationError} If the queue name is invalid * * @example * ```typescript * const connection = createQueueWebSocket({ * queueName: 'order-processing', * baseUrl: 'https://catalyst.agentuity.cloud', * onMessage: (message) => { * console.log('Received:', message.id, message.payload); * }, * }); * * // Later: close the connection * connection.close(); * ``` */ export function createQueueWebSocket(options: QueueWebSocketOptions): QueueWebSocketConnection { // Validate inputs eagerly so callers get immediate feedback. validateQueueName(options.queueName); const apiKey = resolveApiKey(options.apiKey); const { queueName, baseUrl, onMessage, onOpen, onClose, onError, autoReconnect = true, maxReconnectAttempts = Number.POSITIVE_INFINITY, reconnectDelayMs = 1000, maxReconnectDelayMs = 30000, orgId, } = options; let state: QueueWebSocketState = 'connecting'; let ws: WebSocket | null = null; let intentionallyClosed = false; let reconnectAttempts = 0; let reconnectTimer: ReturnType | null = null; let clientId: string | undefined = options.clientId; let lastProcessedOffset: number | undefined = options.lastOffset; function connect() { if (intentionallyClosed) return; const url = buildWebSocketUrl(baseUrl, queueName); state = reconnectAttempts > 0 ? 'reconnecting' : 'connecting'; try { ws = new WebSocket(url); } catch (err) { state = 'closed'; onError?.( new QueueError({ message: `Failed to create WebSocket connection: ${err instanceof Error ? err.message : String(err)}`, queueName, cause: err instanceof Error ? err : undefined, }) ); scheduleReconnect(); return; } ws.onopen = () => { state = 'authenticating'; // Send auth message — raw API key, no "Bearer " prefix. // Include client_id and last_offset on reconnect for resumption. const authPayload: Record = { authorization: apiKey, orgId }; if (clientId) { authPayload.client_id = clientId; } if (lastProcessedOffset !== undefined) { authPayload.last_offset = lastProcessedOffset; } ws!.send(JSON.stringify(authPayload)); }; /** Whether the auth handshake has completed successfully. */ let authenticated = false; ws.onmessage = (event: MessageEvent) => { const raw = typeof event.data === 'string' ? event.data : String(event.data); if (!authenticated) { // First message after open must be the auth response. try { const parsed = JSON.parse(raw); const authResult = WebSocketAuthResponseSchema.safeParse(parsed); if (!authResult.success) { const err = new QueueError({ message: `Unexpected auth response from server: ${raw}`, queueName, }); onError?.(err); ws?.close(4000, 'Invalid auth response'); return; } if (!authResult.data.success) { const err = new QueueError({ message: `Authentication failed: ${authResult.data.error ?? 'Unknown error'}`, queueName, }); onError?.(err); // Auth rejection is terminal — do not reconnect with the same bad credentials. intentionallyClosed = true; ws?.close(4001, 'Auth failed'); return; } authenticated = true; reconnectAttempts = 0; // Reset on successful auth. state = 'connected'; if (authResult.data.client_id) { clientId = authResult.data.client_id; } onOpen?.(); } catch { const err = new QueueError({ message: `Failed to parse auth response: ${raw}`, queueName, }); onError?.(err); ws?.close(4000, 'Invalid auth response'); } return; } // Normal message after authentication. try { const parsed = JSON.parse(raw); const msgResult = WebSocketMessageSchema.safeParse(parsed); if (msgResult.success && msgResult.data.messages.length > 0) { for (const msg of msgResult.data.messages) { onMessage(msg); if (msg.offset !== undefined) { lastProcessedOffset = msg.offset; } } } } catch { // Non-JSON frames are silently ignored; the server may send // ping text frames that are not JSON. } }; ws.onclose = (event: CloseEvent) => { state = 'closed'; ws = null; // Close codes 4000–4999 are application-level terminal errors // (auth failure, validation error, etc.) — do not reconnect. if (event.code >= 4000 && event.code < 5000) { intentionallyClosed = true; } onClose?.(event.code, event.reason); // Reconnect on any unintentional close — whether we were fully // connected, mid-auth, or never authenticated (transient network issue). if (!intentionallyClosed) { scheduleReconnect(); } }; ws.onerror = () => { // The browser/Node WebSocket fires `error` then `close`. // We report the error but let `onclose` handle reconnection. onError?.( new QueueError({ message: 'WebSocket connection error', queueName, }) ); }; } function scheduleReconnect() { if (intentionallyClosed || !autoReconnect) return; if (reconnectAttempts >= maxReconnectAttempts) { onError?.( new QueueError({ message: `Exceeded maximum reconnection attempts (${maxReconnectAttempts})`, queueName, }) ); return; } // Exponential backoff with jitter, capped at maxReconnectDelayMs. const baseDelay = reconnectDelayMs * 2 ** reconnectAttempts; const jitter = 0.5 + Math.random() * 0.5; const delay = Math.min(Math.floor(baseDelay * jitter), maxReconnectDelayMs); reconnectAttempts++; state = 'reconnecting'; reconnectTimer = setTimeout(() => { reconnectTimer = null; connect(); }, delay); } // Kick off the initial connection. connect(); return { close() { intentionallyClosed = true; if (reconnectTimer !== null) { clearTimeout(reconnectTimer); reconnectTimer = null; } if (ws) { ws.close(1000, 'Client closed'); ws = null; } state = 'closed'; }, get state() { return state; }, get clientId() { return clientId; }, get lastOffset() { return lastProcessedOffset; }, }; } // ============================================================================ // Async Iterator API // ============================================================================ /** * Subscribe to real-time messages from a queue via WebSocket. * * Returns an async iterator that yields messages as they arrive. * The connection is automatically managed (auth, reconnection, cleanup). * * @param options - Configuration for the subscription * @returns An async generator that yields {@link Message} objects * @throws {QueueError} If no API key is available * @throws {QueueValidationError} If the queue name is invalid * * @example * ```typescript * const controller = new AbortController(); * for await (const message of subscribeToQueue({ * queueName: 'order-processing', * baseUrl: 'https://catalyst.agentuity.cloud', * signal: controller.signal, * })) { * console.log('Received:', message.id, message.payload); * } * ``` */ export async function* subscribeToQueue( options: SubscribeToQueueOptions ): AsyncGenerator { const { signal } = options; // Check if already aborted. if (signal?.aborted) return; // A queue for buffering messages between the WebSocket callbacks and the // async iterator consumer. const buffer: Message[] = []; let resolve: (() => void) | null = null; let done = false; let lastError: Error | null = null; function push(message: Message) { buffer.push(message); if (resolve) { resolve(); resolve = null; } } function finish(error?: Error) { done = true; if (error) lastError = error; if (resolve) { resolve(); resolve = null; } } const connection = createQueueWebSocket({ queueName: options.queueName, apiKey: options.apiKey, baseUrl: options.baseUrl, clientId: options.clientId, lastOffset: options.lastOffset, orgId: options.orgId, onMessage: push, onError: (err) => { // Terminal errors should stop the iterator. // Auth failures and max-reconnect-exceeded are terminal. // Transient errors (connection drops, server shutdown) are handled // by the reconnection logic in createQueueWebSocket. const msg = err instanceof Error ? err.message : String(err); if ( msg.includes('Authentication failed') || msg.includes('Exceeded maximum reconnection attempts') || msg.includes('No API key provided') ) { finish(err); } else { // Transient error — don't store it; the reconnection logic in // createQueueWebSocket will handle retry. Storing it would cause // it to be thrown on clean shutdown / abort. } }, onClose: (code: number) => { // Terminal close codes (4000–4999) mean the connection will not // reconnect — signal the async iterator to stop so it doesn't // hang forever. For abort-initiated closes, `finish()` is // already called by the `onAbort` handler; calling it again is // harmless (it's idempotent). if (code >= 4000 && code < 5000) { finish(); } }, autoReconnect: true, }); // Wire up the abort signal to close the connection. const onAbort = () => { connection.close(); finish(); }; signal?.addEventListener('abort', onAbort, { once: true }); try { while (!done) { // Drain buffered messages. while (buffer.length > 0) { yield buffer.shift()!; // Re-check after yield in case signal was aborted. if (done || signal?.aborted) return; } if (done || signal?.aborted) return; // Wait for the next message or completion. await new Promise((r) => { resolve = r; }); } // Drain any remaining messages. while (buffer.length > 0) { yield buffer.shift()!; } if (lastError) { throw lastError; } } finally { signal?.removeEventListener('abort', onAbort); connection.close(); } }