import { publish } from './operators/publish.js'; import { fromDOMStream } from './fromdomstream.js'; import { AsyncIterableX } from './asynciterablex.js'; /** @ignore */ export type ReadableBYOBStreamOptions = QueuingStrategy & { type: 'bytes' }; /** @ignore */ export type ReadableByteStreamOptions = QueuingStrategy & { type: 'bytes'; autoAllocateChunkSize?: number; }; type AsyncSourceIterator = AsyncIterator< TSource, any, number | ArrayBufferView | undefined | null >; /** @ignore */ function memcpy( target: TTarget, source: TSource, targetByteOffset = 0, sourceByteLength = source.byteLength ) { const targetByteLength = target.byteLength; const dst = new Uint8Array(target.buffer, target.byteOffset, targetByteLength); const src = new Uint8Array( source.buffer, source.byteOffset, Math.min(sourceByteLength, targetByteLength, source.buffer.byteLength - source.byteOffset) ); dst.set(src, targetByteOffset); return src.byteLength; } abstract class AbstractUnderlyingSource { constructor(protected _source: AsyncSourceIterator | null) {} async cancel() { const source = this._source; if (source && source.return) { await source.return(); } this._source = null; } } class UnderlyingAsyncIterableDefaultSource extends AbstractUnderlyingSource implements UnderlyingSource { constructor(source: AsyncSourceIterator | null) { super(source); } // eslint-disable-next-line consistent-return async pull(controller: ReadableStreamController) { const source = this._source; if (source) { const r = await source.next(controller.desiredSize); if (!r.done) { return (controller as ReadableStreamDefaultController).enqueue(r.value); } } controller.close(); } } class UnderlyingAsyncIterableByteSource extends AbstractUnderlyingSource implements UnderlyingSource { // public readonly type: 'bytes'; public readonly autoAllocateChunkSize?: number; // If we can't create a "byob" reader (no browsers currently suppor it), // fallback to pulling values from the source iterator and enqueueing like // object streams private fallbackDefaultSource: UnderlyingAsyncIterableDefaultSource; constructor( reader: AsyncSourceIterator | null, opts: { autoAllocateChunkSize?: number } = {} ) { super(reader); (this as any).type = 'bytes'; this.autoAllocateChunkSize = opts['autoAllocateChunkSize']; this.fallbackDefaultSource = new UnderlyingAsyncIterableDefaultSource(reader); } // eslint-disable-next-line consistent-return async pull(controller: ReadableStreamController) { if (!(controller as any).byobRequest) { return await this.fallbackDefaultSource.pull( controller as ReadableStreamDefaultController ); } if (this._source) { const { view } = (controller as any).byobRequest; const { done, value } = await this._source.next(view); if (!done) { // Did the source write into the BYOB view itself, // then yield us the `bytesWritten` value? If so, // pass that along if (typeof value === 'number') { return (controller as any).byobRequest.respond(value); } // otherwise if the source is only producing buffers // but doesn't expect to be given one, we should copy // the produced buffer into the front of the BYOB view if (ArrayBuffer.isView(value)) { return value.buffer === view.buffer ? (controller as any).byobRequest.respondWithNewView(value) : (controller as any).byobRequest.respond(memcpy(view, value)); } } } controller.close(); } } // Generate subclasses of ReadableStream that conform to the // AsyncIterable protocol. These classes are dynamically created // the first time a ReadableStream is produced because ReadableStream // is a browser-only API, and closure-compiler won't compile if they're // statically defined at the module scope. /** @ignore */ const asyncIterableReadableStream = (() => { let AsyncIterableReadableByteStream_: any; let AsyncIterableDefaultReadableStream_: any; // A function that's called the first time someone creates a // ReadableStream via `toDOMStream()` const createFirstTime = (source: any, opts?: any) => { // Generate the subclasses with [Symbol.asyncIterator]() methods class AsyncIterableDefaultReadableStream extends ReadableStream { [Symbol.asyncIterator]() { return fromDOMStream(this)[Symbol.asyncIterator](); } } class AsyncIterableReadableByteStream extends ReadableStream { [Symbol.asyncIterator]() { return fromDOMStream(this, { mode: 'byob' })[Symbol.asyncIterator](); } } AsyncIterableReadableByteStream_ = AsyncIterableReadableByteStream; AsyncIterableDefaultReadableStream_ = AsyncIterableDefaultReadableStream; // Now point `createAsyncIterableReadableStream` to the function that // instantiates the classes we just created // eslint-disable-next-line @typescript-eslint/no-use-before-define, no-use-before-define createAsyncIterableReadableStream = createAsyncIterableReadableStreamEveryOtherTime; // Create and return the first ReadableStream instance // eslint-disable-next-line @typescript-eslint/no-use-before-define, no-use-before-define return createAsyncIterableReadableStreamEveryOtherTime(source, opts) as ReadableStream; }; // Shared function pointer that's called by the wrapper closure we return let createAsyncIterableReadableStream = createFirstTime; // Create instances of the classes generated by `createFirstTime` const createAsyncIterableReadableStreamEveryOtherTime = (source: any, opts?: any) => { return source instanceof UnderlyingAsyncIterableByteSource ? (new AsyncIterableReadableByteStream_(source, opts) as ReadableStream) : (new AsyncIterableDefaultReadableStream_(source, opts) as ReadableStream); }; return (source: any, opts?: any) => createAsyncIterableReadableStream(source, opts); })(); /** * Converts an async-iterable instance to a DOM stream. * @param source The source async-iterable to convert to a DOM stream. * @param strategy The queueing strategy to apply to the DOM stream. */ export function toDOMStream( source: AsyncIterable, strategy?: QueuingStrategy ): ReadableStream; /** * Converts an async-iterable stream to a DOM stream. * @param source The async-iterable stream to convert to a DOM stream. * @param options The ReadableBYOBStreamOptions to apply to the DOM stream. */ export function toDOMStream( source: AsyncIterable, options: ReadableBYOBStreamOptions ): ReadableStream; /** * Converts an async-iterable stream to a DOM stream. * @param source The async-iterable stream to convert to a DOM stream. * @param options The ReadableByteStreamOptions to apply to the DOM stream. */ export function toDOMStream( source: AsyncIterable, options: ReadableByteStreamOptions ): ReadableStream; /** * Converts an async-iterable stream to a DOM stream. * @param source The async-iterable stream to convert to a DOM stream. * @param options The options to apply to the DOM stream. */ export function toDOMStream( source: AsyncIterable, options?: QueuingStrategy | ReadableBYOBStreamOptions | ReadableByteStreamOptions ) { if (!options || !('type' in options) || options['type'] !== 'bytes') { return asyncIterableReadableStream( new UnderlyingAsyncIterableDefaultSource(source[Symbol.asyncIterator]()), options ); } return asyncIterableReadableStream( new UnderlyingAsyncIterableByteSource( source[Symbol.asyncIterator](), options as ReadableByteStreamOptions ), options ); } AsyncIterableX.prototype.tee = function (this: AsyncIterableX) { return _getDOMStream(this).tee(); }; AsyncIterableX.prototype.pipeTo = function ( this: AsyncIterableX, writable: WritableStream, options?: StreamPipeOptions ) { return _getDOMStream(this).pipeTo(writable, options); }; AsyncIterableX.prototype.pipeThrough = function >( this: AsyncIterableX, duplex: { writable: WritableStream; readable: R }, options?: StreamPipeOptions ) { return _getDOMStream(this).pipeThrough(duplex, options); }; function _getDOMStream(self: any) { return self._DOMStream || (self._DOMStream = self.pipe(publish(), toDOMStream)); } /** * @ignore */ export function toDOMStreamProto( this: AsyncIterable, strategy?: QueuingStrategy ): ReadableStream; export function toDOMStreamProto( this: AsyncIterable, options: ReadableBYOBStreamOptions ): ReadableStream; export function toDOMStreamProto( this: AsyncIterable, options: ReadableByteStreamOptions ): ReadableStream; export function toDOMStreamProto( this: AsyncIterable, options?: QueuingStrategy | ReadableBYOBStreamOptions | ReadableByteStreamOptions ) { return !options ? toDOMStream(this) : toDOMStream(this, options); } AsyncIterableX.prototype.toDOMStream = toDOMStreamProto; declare module '../asynciterable/asynciterablex' { interface AsyncIterableX { toDOMStream: typeof toDOMStreamProto; tee(): [ReadableStream, ReadableStream]; pipeTo(writable: WritableStream, options?: StreamPipeOptions): Promise; pipeThrough>( duplex: { writable: WritableStream; readable: R }, options?: StreamPipeOptions ): R; } }