import type { MaybePromiseLike } from "@yume-chan/async"; import type { TransformStream, WritableStreamDefaultWriter } from "./stream.js"; import { WritableStream } from "./stream.js"; export type WrapWritableStreamStart = () => MaybePromiseLike< WritableStream >; export interface WritableStreamWrapper { start: WrapWritableStreamStart; close?(): void | Promise; } async function getWrappedWritableStream( start: | WritableStream | WrapWritableStreamStart | WritableStreamWrapper, ) { if ("start" in start) { return await start.start(); } else if (typeof start === "function") { return await start(); } else { // Can't use `wrapper instanceof WritableStream` // Because we want to be compatible with any WritableStream-like objects return start; } } export class WrapWritableStream extends WritableStream { writable!: WritableStream; #writer!: WritableStreamDefaultWriter; constructor( start: | WritableStream | WrapWritableStreamStart | WritableStreamWrapper, ) { super({ start: async () => { const writable = await getWrappedWritableStream(start); // `start` is called in `super()`, so can't use `this` synchronously. // but it's fine after the first `await` this.writable = writable; this.#writer = this.writable.getWriter(); }, write: async (chunk) => { await this.#writer.write(chunk); }, abort: async (reason) => { await this.#writer.abort(reason); if (start !== this.writable && "close" in start) { await start.close?.(); } }, close: async () => { // Close the inner stream first. // Usually the inner stream is a logical sub-stream over the outer stream, // closing the outer stream first will make the inner stream incapable of // sending data in its `close` handler. await this.#writer.close(); if (start !== this.writable && "close" in start) { await start.close?.(); } }, }); } bePipedThroughFrom(transformer: TransformStream) { let promise: Promise; return new WrapWritableStream({ start: () => { promise = transformer.readable.pipeTo(this); return transformer.writable; }, async close() { await promise; }, }); } }