import type { IncomingMessage } from "node:http"; import { Result } from "better-result"; import WebSocket from "ws"; import { CancelledError, LogStreamError } from "./errors.ts"; const isBun = "Bun" in globalThis; export type LogRecord = { type: "log"; text: string; byteStart: number; byteEnd: number; }; export type TerminalRecord = { type: "terminal"; kind: "end" | "error"; code: string; message: string; retryable: boolean; cursor: string | null; details?: Record; }; export type StreamRecord = LogRecord | TerminalRecord; export type LogStreamOptions = { baseUrl: string; token: string; versionId: string; tail?: number; fromStart?: boolean; cursor?: string; signal?: AbortSignal; }; export type StreamResult = { terminal: TerminalRecord | null; lastByteEnd: number | null; }; export type StreamLogsError = LogStreamError | CancelledError; function parseErrorBody(body: string, statusCode: number): string { try { const parsed = JSON.parse(body) as { message?: string; error?: { message?: string }; }; return parsed.error?.message ?? parsed.message ?? `HTTP ${statusCode}`; } catch { return body || `HTTP ${statusCode}`; } } function toError(value: unknown): Error { if (value instanceof Error) return value; // Bun emits ErrorEvent instead of Error on WebSocket failures if ( typeof value === "object" && value !== null && "message" in value && typeof (value as { message: unknown }).message === "string" ) { return new Error((value as { message: string }).message); } return new Error(String(value)); } function buildWebSocketUrl(options: LogStreamOptions): string { const httpUrl = new URL( `/v1/compute-services/versions/${encodeURIComponent(options.versionId)}/logs`, options.baseUrl, ); httpUrl.protocol = httpUrl.protocol === "https:" ? "wss:" : "ws:"; if (options.tail != null) httpUrl.searchParams.set("tail", String(options.tail)); if (options.fromStart) httpUrl.searchParams.set("from_start", "true"); if (options.cursor) httpUrl.searchParams.set("cursor", options.cursor); return httpUrl.toString(); } type ConnectResult = { terminal: TerminalRecord | null; lastByteEnd: number | null; }; function connectOnce( options: LogStreamOptions, onRecord: (record: StreamRecord) => void, ): Promise { return new Promise((resolve, reject) => { if (options.signal?.aborted) { reject(new CancelledError()); return; } const url = buildWebSocketUrl(options); const ws = new WebSocket(url, { headers: { Authorization: `Bearer ${options.token}` }, }); let terminal: TerminalRecord | null = null; let lastByteEnd: number | null = null; let settled = false; const cleanup = () => { options.signal?.removeEventListener("abort", onAbort); }; const onAbort = () => { cleanup(); if (!settled) { settled = true; ws.close(); reject(new CancelledError()); } }; options.signal?.addEventListener("abort", onAbort, { once: true }); if (options.signal?.aborted) { onAbort(); return; } // Node.js ws: fires when the server responds with a non-101 status, // giving us access to the HTTP response for structured error messages. // Bun doesn't implement this event — errors go to the "error" handler. if (!isBun) { ws.on("unexpected-response", (_req: unknown, res: IncomingMessage) => { const chunks: Buffer[] = []; res.on("data", (chunk: Buffer) => { chunks.push(chunk); }); res.on("end", () => { cleanup(); if (!settled) { settled = true; const statusCode = res.statusCode ?? 0; const body = Buffer.concat(chunks).toString(); reject( new LogStreamError({ statusCode, message: parseErrorBody(body, statusCode), }), ); } }); }); } ws.on("message", (data: WebSocket.RawData) => { let record: StreamRecord; try { record = JSON.parse(data.toString()) as StreamRecord; } catch { return; } if (record.type === "log") { lastByteEnd = record.byteEnd; } else if (record.type === "terminal") { terminal = record; } try { onRecord(record); } catch (err) { cleanup(); if (!settled) { settled = true; ws.close(); reject(toError(err)); } } }); ws.on("close", () => { cleanup(); if (!settled) { settled = true; resolve({ terminal, lastByteEnd }); } }); ws.on("error", (err: unknown) => { cleanup(); if (!settled) { settled = true; const error = toError(err); reject(new LogStreamError({ statusCode: 0, message: error.message })); } }); }); } /** * Open a WebSocket to the log-streaming endpoint and deliver records via callback. * * Returns `Ok` with the stream result on graceful close, or `Err` with a * `LogStreamError` (connection/HTTP errors) or `CancelledError` (abort signal). */ export async function streamLogs( options: LogStreamOptions, onRecord: (record: StreamRecord) => void, ): Promise> { return Result.tryPromise({ try: () => connectOnce(options, onRecord), catch: (e) => { if (CancelledError.is(e)) return e; if (LogStreamError.is(e)) return e; const error = toError(e); return new LogStreamError({ statusCode: 0, message: error.message }); }, }); }