import { EventSourceMessage, EventSourceParserStream, } from 'eventsource-parser/stream'; import { ParseResult, safeParseJSON } from './parse-json'; import { FlexibleSchema } from './schema'; /** * Parses a JSON event stream into a stream of parsed JSON objects. */ export function parseJsonEventStream({ stream, schema, }: { stream: ReadableStream; schema: FlexibleSchema; }): ReadableStream> { return stream .pipeThrough(new TextDecoderStream()) .pipeThrough(new EventSourceParserStream()) .pipeThrough( new TransformStream>({ async transform({ data }, controller) { // ignore the 'DONE' event that e.g. OpenAI sends: if (data === '[DONE]') { return; } controller.enqueue(await safeParseJSON({ text: data, schema })); }, }), ); }