export type SseEvent = { id?: string; eventType: string; data: T; }; export class EventStreamDecoder { private buffer = ''; static async *decode(body: ReadableStream): AsyncGenerator> { const decoder = new EventStreamDecoder(); for await (const chunk of body) { for (const { id, eventType, data } of decoder.push(Buffer.from(chunk))) { try { yield { id, eventType, data: JSON.parse(data) }; } catch { // skip malformed events — non-fatal } } } } push(chunk: Buffer | string): SseEvent[] { const text = typeof chunk === 'string' ? chunk : chunk.toString('utf8'); this.buffer += text.replace(/\r\n/g, '\n').replace(/\r/g, '\n'); const events: SseEvent[] = []; let boundaryIdx: number; while ((boundaryIdx = this.buffer.indexOf('\n\n')) !== -1) { const eventText = this.buffer.slice(0, boundaryIdx); this.buffer = this.buffer.slice(boundaryIdx + 2); if (!eventText.trim()) continue; let id: string | undefined; let eventType = ''; const dataLines: string[] = []; for (const line of eventText.split('\n')) { if (line.startsWith('id:')) { id = line.slice(3).trim(); } else if (line.startsWith('event:')) { eventType = line.slice(6).trim(); } else if (line.startsWith('data:')) { dataLines.push(line.slice(5).trim()); } } const data = dataLines.join('\n'); if (eventType && data) { events.push({ id, eventType, data }); } } return events; } }