import type { MaybePromiseLike } from "@yume-chan/async"; import type { QueuingStrategy, ReadableStreamDefaultController, ReadableStreamDefaultReader, } from "./stream.js"; import { ReadableStream } from "./stream.js"; export type WrapReadableStreamStart = ( controller: ReadableStreamDefaultController, ) => MaybePromiseLike>; export interface ReadableStreamWrapper { start: WrapReadableStreamStart; cancel?: (reason?: unknown) => MaybePromiseLike; close?: () => MaybePromiseLike; error?: (reason?: unknown) => MaybePromiseLike; } function getWrappedReadableStream( wrapper: | ReadableStream | WrapReadableStreamStart | ReadableStreamWrapper, controller: ReadableStreamDefaultController, ) { if ("start" in wrapper) { return wrapper.start(controller); } else if (typeof wrapper === "function") { return wrapper(controller); } else { // Can't use `wrapper instanceof ReadableStream` // Because we want to be compatible with any ReadableStream-like objects return wrapper; } } /** * This class has multiple usages: * * 1. Get notified when the stream is cancelled or closed. * 2. Synchronously create a `ReadableStream` by asynchronously return another `ReadableStream`. * 3. Convert native `ReadableStream`s to polyfilled ones so they can `pipe` between. */ export class WrapReadableStream extends ReadableStream { readable!: ReadableStream; #reader!: ReadableStreamDefaultReader; constructor( wrapper: | ReadableStream | WrapReadableStreamStart | ReadableStreamWrapper, strategy?: QueuingStrategy, ) { super( { start: async (controller) => { const readable = await getWrappedReadableStream( wrapper, controller, ); // `start` is called in `super()`, so can't use `this` synchronously. // but it's fine after the first `await` this.readable = readable; this.#reader = this.readable.getReader(); }, pull: async (controller) => { const { done, value } = await this.#reader .read() .catch((e) => { if ("error" in wrapper) { wrapper.error(e); } throw e; }); if (done) { controller.close(); if ("close" in wrapper) { await wrapper.close?.(); } } else { controller.enqueue(value); } }, cancel: async (reason) => { await this.#reader.cancel(reason); if ("cancel" in wrapper) { await wrapper.cancel?.(reason); } }, }, strategy, ); } }