/** * @fileoverview Library for working with lazy tree like stream of chunks. * @license * SPDX-License-Identifier: Apache-2.0 */ import { Stream as StreamInterface, StreamItems } from './interfaces.js'; // Polyfill for Promise.withResolvers if (typeof Promise.withResolvers === 'undefined') { Promise.withResolvers = () => { let resolve; let reject; const promise = new Promise((res, rej) => { resolve = res; reject = rej; }); return { promise, resolve, reject } as unknown as PromiseWithResolvers; }; } /** * A writable AsyncIterable. */ class Stream implements StreamInterface { private closed = false; private readonly children: StreamItems[] = []; private readonly iterators: StreamIterator[] = []; private errorValue: string | undefined; /** * Writes a value to the stream. */ write(value: StreamItems): Promise { if (this.closed) { throw new Error('Stream is closed'); } this.children.push(value); for (const iterator of [...this.iterators]) { iterator.write(value); } return Promise.resolve(); } /** * Write and close. */ async writeAndClose(value: StreamItems): Promise { await this.write(value); await this.close(); } /** * Closes the stream. */ close(): Promise { this.closed = true; for (const iterator of [...this.iterators]) { iterator.close(); } return Promise.resolve(); } get isClosed() { return this.closed; } get size() { return this.children.length; } get items() { return [...this.children]; } getError() { return this.errorValue; } /* * Errors the stream if there has been a problem. */ error(reason?: string) { this.errorValue = reason; this.closed = true; for (const iterator of [...this.iterators]) { iterator.error(reason); } } /** * Async iterator over the raw StreamItems being pushed in. */ rawAsyncIterator(): AsyncIterator> { const iterator = new StreamIterator(this, this.children, () => { const index = this.iterators.indexOf(iterator); if (index >= 0) { this.iterators.splice(index, 1); } }); this.iterators.push(iterator); return iterator; } [Symbol.asyncIterator](): AsyncIterator { const iter = this.rawAsyncIterator(); const stream = iteratorToIterable(iter); const chunks = (async function * () { try { yield* leaves(stream); } finally { // Need to explicitly release this iterator memory. if (iter.return) { void iter.return(); } } })(); return chunks[Symbol.asyncIterator](); } then = thenableAsyncIterable; } async function* iteratorToIterable(iter: AsyncIterator): AsyncIterable { while (true) { const result = await iter.next(); if (result.done) break; yield result.value; } } class StreamIterator implements AsyncIterator> { private readonly writeQueue: StreamItems[] = []; private readonly readQueue: PromiseWithResolvers>>[] = []; constructor( private readonly stream: Stream, current: StreamItems[], private readonly done: () => void, ) { this.writeQueue = [...current]; } write(value: StreamItems): void { const queued = this.readQueue.shift(); if (queued) { queued.resolve({ done: false, value }); } else { this.writeQueue.push(value); } } close() { const queued = this.readQueue.shift(); if (queued) { queued.resolve({ done: true, value: undefined }); } } error(error?: string) { const queued = this.readQueue.shift(); if (queued) { queued.reject(error); } } next(): Promise>> { const value = this.writeQueue.shift(); if (value) { return Promise.resolve({ done: false, value }); } if (this.stream.getError() !== undefined) { return Promise.reject(new Error(this.stream.getError())); } const done = this.writeQueue.length === 0 && this.stream.isClosed; if (done) { this.done(); return Promise.resolve({ done, value: undefined }); } const next = Promise.withResolvers>>(); this.readQueue.push(next); return next.promise; } return(value?: T): Promise>> { this.done(); return Promise.resolve({ done: true, value }); } } /** Constructor function for creating a stream object. */ export function createStream(): StreamInterface { return new Stream(); } /** * Returns true if the provided object implements the AsyncIterator protocol via * implementing a `Symbol.asyncIterator` method. */ export function isAsyncIterable( maybeAsyncIterable: unknown, ): maybeAsyncIterable is AsyncIterable { const iter = maybeAsyncIterable as AsyncIterable; return typeof iter[Symbol.asyncIterator] === 'function'; } /** * Leaves of a Stream because the stream can be recursive this flattens it in * order. */ async function* leaves(items: StreamItems): AsyncIterable { if (items instanceof Array) { for (const node of items) { yield* leaves(node); } } else if (isAsyncIterable>(items)) { for await (const node of items) { yield* leaves(node); } } else { yield items; } } /** A function to aggregate an AsyncIterable to a PromiseLike thenable. */ export function thenableAsyncIterable( this: AsyncIterable, onfulfilled?: | ((value: T[]) => TResult1 | PromiseLike) | null, onrejected?: | ((reason: unknown) => TResult2 | PromiseLike) | null, ): Promise { const aggregate = async () => { const items = []; for await (const item of this) { items.push(item); } return items; }; return aggregate().then(onfulfilled, onrejected); } /** Make an asyncIterable PromiseLike. */ export function awaitableAsyncIterable(iter: AsyncIterable): AsyncIterable & PromiseLike { const then = iter as AsyncIterable & PromiseLike; then.then = thenableAsyncIterable; return then; }