import { Consumable } from "../consumable.js"; import type { QueuingStrategy } from "../stream.js"; import { ReadableStream } from "../stream.js"; export interface ConsumableReadableStreamController { enqueue(chunk: T): Promise; close(): void; error(reason: unknown): void; } export interface ConsumableReadableStreamSource { start?( controller: ConsumableReadableStreamController, ): void | PromiseLike; pull?( controller: ConsumableReadableStreamController, ): void | PromiseLike; cancel?(reason: unknown): void | PromiseLike; } export class ConsumableReadableStream extends ReadableStream> { static async enqueue( controller: { enqueue: (chunk: Consumable) => void }, chunk: T, ) { const output = new Consumable(chunk); controller.enqueue(output); await output.consumed; } constructor( source: ConsumableReadableStreamSource, strategy?: QueuingStrategy, ) { let wrappedController!: ConsumableReadableStreamController; let wrappedStrategy: QueuingStrategy> | undefined; if (strategy) { wrappedStrategy = {}; if ("highWaterMark" in strategy) { wrappedStrategy.highWaterMark = strategy.highWaterMark; } if ("size" in strategy) { wrappedStrategy.size = (chunk) => { return strategy.size!(chunk.value); }; } } super( { start(controller) { wrappedController = { enqueue(chunk) { return ConsumableReadableStream.enqueue( controller, chunk, ); }, close() { controller.close(); }, error(reason) { controller.error(reason); }, }; return source.start?.(wrappedController); }, pull() { return source.pull?.(wrappedController); }, cancel(reason) { return source.cancel?.(reason); }, }, wrappedStrategy, ); } }