import { Consumable } from "../consumable.js"; import type { QueuingStrategy, WritableStreamDefaultController, WritableStreamDefaultWriter, } from "../stream.js"; import { WritableStream } from "../stream.js"; export interface ConsumableWritableStreamSink { start?( controller: WritableStreamDefaultController, ): void | PromiseLike; write?( chunk: T, controller: WritableStreamDefaultController, ): void | PromiseLike; abort?(reason: unknown): void | PromiseLike; close?(): void | PromiseLike; } export class ConsumableWritableStream extends WritableStream< Consumable > { static async write( writer: WritableStreamDefaultWriter>, value: T, ) { const consumable = new Consumable(value); await writer.write(consumable); await consumable.consumed; } constructor( sink: ConsumableWritableStreamSink, strategy?: QueuingStrategy, ) { let wrappedStrategy: QueuingStrategy> | undefined; if (strategy) { wrappedStrategy = {}; if ("highWaterMark" in strategy) { wrappedStrategy.highWaterMark = strategy.highWaterMark; } if ("size" in strategy) { wrappedStrategy.size = (chunk) => { return strategy.size!( chunk instanceof Consumable ? chunk.value : chunk, ); }; } } super( { start(controller) { return sink.start?.(controller); }, write(chunk, controller) { return chunk.tryConsume((chunk) => sink.write?.(chunk, controller), ); }, abort(reason) { return sink.abort?.(reason); }, close() { return sink.close?.(); }, }, wrappedStrategy, ); } }