const ARRAY_VALUE = 'value'; const ARRAY_ERROR = 'error'; interface AsyncSinkItem { type: string; value?: T; error?: any; } interface AsyncResolver { resolve: (value: T | PromiseLike) => void; reject: (reason?: any) => void; } /** @ignore */ export class AsyncSink implements AsyncIterableIterator { private _ended: boolean; private _values: AsyncSinkItem[]; private _resolvers: AsyncResolver>[]; constructor() { this._ended = false; this._values = []; this._resolvers = []; } [Symbol.asyncIterator]() { return this; } write(value: TSource) { this._push({ type: ARRAY_VALUE, value }); } error(error: any) { this._push({ type: ARRAY_ERROR, error }); } private _push(item: AsyncSinkItem) { if (this._ended) { throw new Error('AsyncSink already ended'); } if (this._resolvers.length > 0) { const { resolve, reject } = this._resolvers.shift()!; if (item.type === ARRAY_ERROR) { reject(item.error!); } else { resolve({ done: false, value: item.value! }); } } else { this._values.push(item); } } next() { if (this._values.length > 0) { const { type, value, error } = this._values.shift()!; if (type === ARRAY_ERROR) { return Promise.reject(error); } else { return Promise.resolve({ done: false, value } as IteratorResult); } } if (this._ended) { return Promise.resolve({ done: true } as IteratorResult); } return new Promise>((resolve, reject) => { this._resolvers.push({ resolve, reject }); }); } end() { while (this._resolvers.length > 0) { this._resolvers.shift()!.resolve({ done: true } as IteratorResult); } this._ended = true; } }