import { AsyncIterableX } from './asynciterablex.js'; import { identity } from '../util/identity.js'; import { wrapWithAbort } from './operators/withabort.js'; import { throwIfAborted } from '../aborterror.js'; import { safeRace } from '../util/safeRace.js'; // eslint-disable-next-line @typescript-eslint/no-empty-function const NEVER_PROMISE = new Promise(() => {}); type MergeResult = { value: T; index: number }; function wrapPromiseWithIndex(promise: Promise, index: number) { return promise.then((value) => ({ value, index })) as Promise>; } /** @ignore */ export class CombineLatestAsyncIterable extends AsyncIterableX { private _sources: AsyncIterable[]; constructor(sources: AsyncIterable[]) { super(); this._sources = sources; } async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); const length = this._sources.length; const iterators = new Array>(length); const nexts = new Array>>>(length); let hasValueAll = false; const values = new Array(length); const hasValues = new Array(length); let active = length; hasValues.fill(false); for (let i = 0; i < length; i++) { const iterator = wrapWithAbort(this._sources[i], signal)[Symbol.asyncIterator](); iterators[i] = iterator; nexts[i] = wrapPromiseWithIndex(iterator.next(), i); } while (active > 0) { const next = safeRace(nexts); const { value: { value: value$, done: done$ }, index, } = await next; if (done$) { nexts[index] = >>>NEVER_PROMISE; active--; } else { values[index] = value$; hasValues[index] = true; const iterator$ = iterators[index]; nexts[index] = wrapPromiseWithIndex(iterator$.next(), index); if (hasValueAll || (hasValueAll = hasValues.every(identity))) { yield values; } } } } } /** * Merges multiple async-iterable sequences into one async-iterable sequence as an array whenever * one of the async-iterable sequences produces an element. * * @template T The type of the elements in the first source sequence. * @template T2 The type of the elements in the second source sequence. * @param {AsyncIterable} source First async-iterable source. * @param {AsyncIterable} source2 Second async-iterable source. * @returns {AsyncIterableX<[T, T2]>} An async-iterable sequence containing an array of all sources. */ export function combineLatest( source: AsyncIterable, source2: AsyncIterable ): AsyncIterableX<[T, T2]>; /** * Merges multiple async-iterable sequences into one async-iterable sequence as an array whenever * one of the async-iterable sequences produces an element. * * @template T The type of the elements in the first source sequence. * @template T2 The type of the elements in the second source sequence. * @template T3 The type of the elements in the third source sequence. * @param {AsyncIterable} source First async-iterable source. * @param {AsyncIterable} source2 Second async-iterable source. * @param {AsyncIterable} source3 Third async-iterable source. * @returns {AsyncIterableX<[T, T2, T3]>} An async-iterable sequence containing an array of all sources. */ export function combineLatest( source: AsyncIterable, source2: AsyncIterable, source3: AsyncIterable ): AsyncIterableX<[T, T2, T3]>; /** * Merges multiple async-iterable sequences into one async-iterable sequence as an array whenever * one of the async-iterable sequences produces an element. * * @template T The type of the elements in the first source sequence. * @template T2 The type of the elements in the second source sequence. * @template T3 The type of the elements in the third source sequence. * @template T4 The type of the elements in the fourth source sequence. * @param {AsyncIterable} source First async-iterable source. * @param {AsyncIterable} source2 Second async-iterable source. * @param {AsyncIterable} source3 Third async-iterable source. * @param {AsyncIterable} source4 Fourth async-iterable source. * @returns {AsyncIterableX<[T, T2, T3, T4]>} An async-iterable sequence containing an array of all sources. */ export function combineLatest( source: AsyncIterable, source2: AsyncIterable, source3: AsyncIterable, source4: AsyncIterable ): AsyncIterableX<[T, T2, T3, T4]>; /** * Merges multiple async-iterable sequences into one async-iterable sequence as an array whenever * one of the async-iterable sequences produces an element. * * @template T The type of the elements in the first source sequence. * @template T2 The type of the elements in the second source sequence. * @template T3 The type of the elements in the third source sequence. * @template T4 The type of the elements in the fourth source sequence. * @template T5 The type of the elements in the fifth source sequence. * @param {AsyncIterable} source First async-iterable source. * @param {AsyncIterable} source2 Second async-iterable source. * @param {AsyncIterable} source3 Third async-iterable source. * @param {AsyncIterable} source4 Fourth async-iterable source. * @param {AsyncIterable} source5 Fifth async-iterable source. * @returns {AsyncIterableX<[T, T2, T3, T4, T5]>} An async-iterable sequence containing an array of all sources. */ export function combineLatest( source: AsyncIterable, source2: AsyncIterable, source3: AsyncIterable, source4: AsyncIterable, source5: AsyncIterable ): AsyncIterableX<[T, T2, T3, T4, T5]>; /** * Merges multiple async-iterable sequences into one async-iterable sequence as an array whenever * one of the async-iterable sequences produces an element. * * @template T The type of the elements in the first source sequence. * @template T2 The type of the elements in the second source sequence. * @template T3 The type of the elements in the third source sequence. * @template T4 The type of the elements in the fourth source sequence. * @template T5 The type of the elements in the fifth source sequence. * @template T6 The type of the elements in the sixth source sequence. * @param {AsyncIterable} source First async-iterable source. * @param {AsyncIterable} source2 Second async-iterable source. * @param {AsyncIterable} source3 Third async-iterable source. * @param {AsyncIterable} source4 Fourth async-iterable source. * @param {AsyncIterable} source5 Fifth async-iterable source. * @param {AsyncIterable} source6 Sixth async-iterable source. * @returns {AsyncIterableX<[T, T2, T3, T4, T5, T6]>} An async-iterable sequence containing an array of all sources. */ export function combineLatest( source: AsyncIterable, source2: AsyncIterable, source3: AsyncIterable, source4: AsyncIterable, source5: AsyncIterable, source6: AsyncIterable ): AsyncIterableX<[T, T2, T3, T4, T5, T6]>; /** * Merges multiple async-iterable sequences into one async-iterable sequence as an array whenever * one of the async-iterable sequences produces an element. * * @template T The of the elements in the source sequences. * @param {...AsyncIterable[]} sources The async-iterable sources. * @returns {AsyncIterableX} An async-iterable sequence containing an array of all sources. */ export function combineLatest(...sources: AsyncIterable[]): AsyncIterableX; export function combineLatest(...sources: any[]): AsyncIterableX { return new CombineLatestAsyncIterable(sources); }