/** * Server-Sent Events parser as an AsyncGenerator. * * Yields parsed events from a `Response` body. Handles the split-read case * where `event:` and `data:` arrive in separate TCP packets. Skips malformed * JSON gracefully. Honors AbortSignal (caller passes one to fetch). */ import type { ChatStreamEvent } from '../../types'; import { LIMITS } from '../../constants'; interface RawEvent { event?: string; data?: string; } export interface ParseSSEOptions { signal?: AbortSignal; /** Map a raw SSE event to zero, one, or many `ChatStreamEvent`s. * Default: parse `data` as JSON and assume the JSON shape already * matches `ChatStreamEvent`. */ map?: (raw: RawEvent) => ChatStreamEvent | ChatStreamEvent[] | null; idleTimeoutMs?: number; } const DEFAULT_MAP = (raw: RawEvent): ChatStreamEvent | null => { if (!raw.data) return null; try { const parsed = JSON.parse(raw.data) as ChatStreamEvent; if (raw.event && !('type' in parsed)) { return { ...(parsed as object), type: raw.event } as ChatStreamEvent; } return parsed; } catch { return null; } }; export async function* parseSSE( response: Response, options: ParseSSEOptions = {}, ): AsyncGenerator { if (!response.body) { throw new Error('SSE response has no body'); } const map = options.map ?? DEFAULT_MAP; const idleMs = options.idleTimeoutMs ?? LIMITS.sseIdleMs; const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; // Race `reader.read()` against an idle timer so a fully stalled // connection — one that accepts the request but never sends a byte — // is surfaced as an error instead of hanging the generator forever. // The previous check only ran *after* a successful read, so it could // never fire while `reader.read()` itself was blocked. const readWithIdleTimeout = (): Promise> => { let timer: ReturnType | undefined; const idle = new Promise((_, reject) => { timer = setTimeout( () => reject(new Error(`SSE idle timeout (${idleMs}ms)`)), idleMs, ); }); return Promise.race([reader.read(), idle]).finally(() => { if (timer) clearTimeout(timer); }) as Promise>; }; try { while (true) { if (options.signal?.aborted) { return; } const { value, done } = await readWithIdleTimeout(); if (done) break; buffer += decoder.decode(value, { stream: true }); // Split on blank line which delimits SSE events. let separator = buffer.indexOf('\n\n'); while (separator !== -1) { const rawBlock = buffer.slice(0, separator); buffer = buffer.slice(separator + 2); const raw = parseEventBlock(rawBlock); const evt = map(raw); if (evt) { if (Array.isArray(evt)) { for (const e of evt) yield e; } else { yield evt; } } separator = buffer.indexOf('\n\n'); } } // Flush any trailing partial event. if (buffer.trim()) { const raw = parseEventBlock(buffer); const evt = map(raw); if (evt) { if (Array.isArray(evt)) { for (const e of evt) yield e; } else { yield evt; } } } } finally { try { reader.releaseLock(); } catch { /* ignore */ } } } function parseEventBlock(block: string): RawEvent { const out: RawEvent = {}; const lines = block.split(/\r?\n/); const dataLines: string[] = []; for (const line of lines) { if (!line || line.startsWith(':')) continue; const colon = line.indexOf(':'); if (colon === -1) continue; const field = line.slice(0, colon).trim(); const value = line.slice(colon + 1).replace(/^ /, ''); if (field === 'event') out.event = value; else if (field === 'data') dataLines.push(value); } if (dataLines.length) out.data = dataLines.join('\n'); return out; }