/** * HTTP + SSE transport. Default implementation for web hosts. * * Backend contract (see @dev/@refactoring7-chat/06-integration.md): * POST /sessions → SessionInfo (JSON) * GET /sessions/:id/history?cursor= → HistoryPage (JSON) * POST /sessions/:id/messages → SSE stream of ChatStreamEvent * POST /sessions/:id/messages/buffered → ChatMessage (JSON, fallback) * DELETE /sessions/:id → 204 */ import type { ChatMessage, ChatStreamEvent, ChatTransport, CreateSessionOptions, HistoryPage, SendOptions, SessionInfo, StreamOptions, } from '../../types'; import { TransportError } from './types'; import { parseSSE } from './sse'; export interface HttpTransportConfig { /** Base URL without trailing slash, e.g. '/api/chat' or 'https://api.example.com/v1/chat'. */ baseUrl: string; /** Optional slug appended/forwarded as project identifier. */ slug?: string; /** Returns headers applied to every request — e.g. Authorization. */ getAuthHeader?: () => Record | Promise>; /** Default fetch timeout (per non-streaming request). */ timeoutMs?: number; /** Override fetch implementation (useful for tests or custom retry layers). */ fetchImpl?: typeof fetch; } const DEFAULT_TIMEOUT = 20_000; async function jsonOrThrow(res: Response, label: string): Promise { if (!res.ok) { const text = await res.text().catch(() => ''); throw new TransportError( `${label} failed (${res.status}): ${text || res.statusText}`, mapStatusToCode(res.status), ); } try { return (await res.json()) as T; } catch { throw new TransportError(`${label} returned invalid JSON`, 'invalid_response'); } } function mapStatusToCode(status: number): string { if (status === 401) return 'unauthorized'; if (status === 403) return 'forbidden'; if (status === 404) return 'not_found'; if (status === 429) return 'rate_limited'; if (status >= 500) return 'server_error'; return 'http_error'; } /** * Derive an AbortSignal that fires after `timeoutMs` or when the caller's * `signal` aborts. The returned `clear()` MUST be called once the request * settles — otherwise the timer keeps the AbortController (and its abort * listener on the parent signal) alive until the full timeout elapses, * leaking one timer + one listener per request. */ function withTimeout( signal: AbortSignal | undefined, timeoutMs: number, ): { signal: AbortSignal; clear: () => void } { const ctrl = new AbortController(); const onAbort = () => ctrl.abort(); signal?.addEventListener('abort', onAbort, { once: true }); const timer = setTimeout(() => ctrl.abort(), timeoutMs); return { signal: ctrl.signal, clear: () => { clearTimeout(timer); signal?.removeEventListener('abort', onAbort); }, }; } export function createHttpTransport(config: HttpTransportConfig): ChatTransport { const fetchImpl = config.fetchImpl ?? fetch; const timeout = config.timeoutMs ?? DEFAULT_TIMEOUT; const base = config.baseUrl.replace(/\/$/, ''); async function buildHeaders(extra?: Record): Promise> { const auth = (await config.getAuthHeader?.()) ?? {}; return { 'Content-Type': 'application/json', ...auth, ...(extra ?? {}), }; } return { async createSession(opts?: CreateSessionOptions): Promise { const t = withTimeout(undefined, timeout); try { const res = await fetchImpl(`${base}/sessions`, { method: 'POST', headers: await buildHeaders(), body: JSON.stringify({ slug: config.slug, metadata: opts?.metadata ?? {} }), signal: t.signal, }); return await jsonOrThrow(res, 'createSession'); } finally { t.clear(); } }, async loadHistory(sessionId, cursor, limit): Promise { const params = new URLSearchParams(); if (cursor) params.set('cursor', cursor); if (limit) params.set('limit', String(limit)); const url = `${base}/sessions/${encodeURIComponent(sessionId)}/history${ params.toString() ? `?${params.toString()}` : '' }`; const t = withTimeout(undefined, timeout); try { const res = await fetchImpl(url, { method: 'GET', headers: await buildHeaders(), signal: t.signal, }); return await jsonOrThrow(res, 'loadHistory'); } finally { t.clear(); } }, async *stream( sessionId: string, content: string, options: StreamOptions, ): AsyncGenerator { const url = `${base}/sessions/${encodeURIComponent(sessionId)}/messages`; const res = await fetchImpl(url, { method: 'POST', headers: await buildHeaders({ Accept: 'text/event-stream' }), body: JSON.stringify({ content, attachments: options.attachments ?? [], metadata: options.metadata ?? {}, }), signal: options.signal, }); if (!res.ok) { const text = await res.text().catch(() => ''); throw new TransportError( `stream failed (${res.status}): ${text || res.statusText}`, mapStatusToCode(res.status), ); } yield* parseSSE(res, { signal: options.signal }); }, async send(sessionId, content, options?: SendOptions): Promise { const url = `${base}/sessions/${encodeURIComponent(sessionId)}/messages/buffered`; // Honour a caller-supplied signal as-is; otherwise apply our own // timeout (and clean it up in `finally`). const t = options?.signal ? null : withTimeout(undefined, timeout); try { const res = await fetchImpl(url, { method: 'POST', headers: await buildHeaders(), body: JSON.stringify({ content, attachments: options?.attachments ?? [], metadata: options?.metadata ?? {}, }), signal: options?.signal ?? t!.signal, }); return await jsonOrThrow(res, 'send'); } finally { t?.clear(); } }, async closeSession(sessionId: string): Promise { const url = `${base}/sessions/${encodeURIComponent(sessionId)}`; const t = withTimeout(undefined, timeout); let res: Response; try { res = await fetchImpl(url, { method: 'DELETE', headers: await buildHeaders(), signal: t.signal, }); } finally { t.clear(); } if (!res.ok && res.status !== 404) { throw new TransportError(`closeSession failed (${res.status})`, mapStatusToCode(res.status)); } }, }; }