import { formatHashLine } from "./hash"; import type { HashlineStreamOptions } from "./types"; interface ResolvedHashlineStreamOptions { startLine: number; maxChunkLines: number; maxChunkBytes: number; } function resolveHashlineStreamOptions(options: HashlineStreamOptions): ResolvedHashlineStreamOptions { return { startLine: options.startLine ?? 1, maxChunkLines: options.maxChunkLines ?? 200, maxChunkBytes: options.maxChunkBytes ?? 64 * 1024, }; } interface HashlineChunkEmitter { pushLine: (line: string) => string[]; flush: () => string | undefined; } function createHashlineChunkEmitter(options: ResolvedHashlineStreamOptions): HashlineChunkEmitter { let lineNumber = options.startLine; let outLines: string[] = []; let outBytes = 0; const flush = (): string | undefined => { if (outLines.length === 0) return undefined; const chunk = outLines.join("\n"); outLines = []; outBytes = 0; return chunk; }; const pushLine = (line: string): string[] => { const formatted = formatHashLine(lineNumber, line); lineNumber++; const chunks: string[] = []; const sepBytes = outLines.length === 0 ? 0 : 1; const lineBytes = Buffer.byteLength(formatted, "utf-8"); const wouldOverflow = outLines.length >= options.maxChunkLines || outBytes + sepBytes + lineBytes > options.maxChunkBytes; if (outLines.length > 0 && wouldOverflow) { const flushed = flush(); if (flushed) chunks.push(flushed); } outLines.push(formatted); outBytes += (outLines.length === 1 ? 0 : 1) + lineBytes; if (outLines.length >= options.maxChunkLines || outBytes >= options.maxChunkBytes) { const flushed = flush(); if (flushed) chunks.push(flushed); } return chunks; }; return { pushLine, flush }; } function isReadableStream(value: unknown): value is ReadableStream { return ( typeof value === "object" && value !== null && "getReader" in value && typeof (value as { getReader?: unknown }).getReader === "function" ); } async function* bytesFromReadableStream(stream: ReadableStream): AsyncGenerator { const reader = stream.getReader(); try { while (true) { const { done, value } = await reader.read(); if (done) return; if (value) yield value; } } finally { reader.releaseLock(); } } export async function* streamHashLinesFromUtf8( source: ReadableStream | AsyncIterable, options: HashlineStreamOptions = {}, ): AsyncGenerator { const resolved = resolveHashlineStreamOptions(options); const decoder = new TextDecoder("utf-8"); const chunks = isReadableStream(source) ? bytesFromReadableStream(source) : source; const emitter = createHashlineChunkEmitter(resolved); let pending = ""; let sawAnyLine = false; for await (const chunk of chunks) { pending += decoder.decode(chunk, { stream: true }); let nl = pending.indexOf("\n"); while (nl !== -1) { const raw = pending.slice(0, nl); const line = raw.endsWith("\r") ? raw.slice(0, -1) : raw; sawAnyLine = true; for (const out of emitter.pushLine(line)) yield out; pending = pending.slice(nl + 1); nl = pending.indexOf("\n"); } } pending += decoder.decode(); if (pending.length > 0) { sawAnyLine = true; const tail = pending.endsWith("\r") ? pending.slice(0, -1) : pending; for (const out of emitter.pushLine(tail)) yield out; } if (!sawAnyLine) { for (const out of emitter.pushLine("")) yield out; } const last = emitter.flush(); if (last) yield last; }