import { AsyncIterableX } from './asynciterablex.js'; import { wrapWithAbort } from './operators/withabort.js'; import { throwIfAborted } from '../aborterror.js'; import { safeRace } from '../util/safeRace.js'; // eslint-disable-next-line @typescript-eslint/no-empty-function const NEVER_PROMISE = new Promise(() => {}); type MergeResult = { value: T; index: number; done?: boolean; error?: any }; function wrapPromiseWithIndex(promise: Promise>, index: number) { return promise .then(({ value, done }) => ({ value, done, index })) .catch((error) => ({ error, index })) as Promise>; } /** @ignore */ export class MergeAsyncIterable extends AsyncIterableX { private _source: AsyncIterable[]; constructor(source: AsyncIterable[]) { super(); this._source = source; } async *[Symbol.asyncIterator](signal?: AbortSignal): AsyncIterator { throwIfAborted(signal); const length = this._source.length; const iterators = new Array>(length); const nexts = new Array>>(length); let active = length; for (let i = 0; i < length; i++) { const iterator = wrapWithAbort(this._source[i], signal)[Symbol.asyncIterator](); iterators[i] = iterator; nexts[i] = wrapPromiseWithIndex(iterator.next(), i); } while (active > 0) { const next = await safeRace(nexts); if (next.hasOwnProperty('error')) { throw next.error; } else if (next.done) { nexts[next.index] = >>NEVER_PROMISE; active--; } else { const iterator$ = iterators[next.index]; nexts[next.index] = wrapPromiseWithIndex(iterator$.next(), next.index); yield next.value; } } } } /** * Merges elements from all of the specified async-iterable sequences into a single async-iterable sequence. * * @template T The type of the first async-iterable sequence. * @template T2 The type of the second async-iterable sequence. * @param {AsyncIterable} source The first async-iterable source to merge. * @param {AsyncIterable} v2 The second async-iterable source to merge. * @returns {(AsyncIterableX)} The merged elements from all of the specified async-iterable sequences into a single async-iterable sequence. */ export function merge( source: AsyncIterable, v2: AsyncIterable ): AsyncIterableX; /** * Merges elements from all of the specified async-iterable sequences into a single async-iterable sequence. * * @template T The type of the first async-iterable sequence. * @template T2 The type of the second async-iterable sequence. * @template T3 The type of the third async-iterable sequence. * @param {AsyncIterable} source The first async-iterable source to merge. * @param {AsyncIterable} v2 The second async-iterable source to merge. * @param {AsyncIterable} v3 The third async-iterable source to merge. * @returns {(AsyncIterableX)} The merged elements from all of the specified async-iterable sequences * into a single async-iterable sequence. */ export function merge( source: AsyncIterable, v2: AsyncIterable, v3: AsyncIterable ): AsyncIterableX; /** * Merges elements from all of the specified async-iterable sequences into a single async-iterable sequence. * * @template T The type of the first async-iterable sequence. * @template T2 The type of the second async-iterable sequence. * @template T3 The type of the third async-iterable sequence. * @template T4 The type of the fourth async-iterable sequence. * @param {AsyncIterable} source The first async-iterable source to merge. * @param {AsyncIterable} v2 The second async-iterable source to merge. * @param {AsyncIterable} v3 The third async-iterable source to merge. * @param {AsyncIterable} v4 The fourth async-iterable source to merge. * @returns {(AsyncIterableX)} The merged elements from all of the specified async-iterable sequences * into a single async-iterable sequence. */ export function merge( source: AsyncIterable, v2: AsyncIterable, v3: AsyncIterable, v4: AsyncIterable ): AsyncIterableX; /** * Merges elements from all of the specified async-iterable sequences into a single async-iterable sequence. * * @template T The type of the first async-iterable sequence. * @template T2 The type of the second async-iterable sequence. * @template T3 The type of the third async-iterable sequence. * @template T4 The type of the fourth async-iterable sequence. * @template T5 The type of the fifth async-iterable sequence. * @param {AsyncIterable} source The first async-iterable source to merge. * @param {AsyncIterable} v2 The second async-iterable source to merge. * @param {AsyncIterable} v3 The third async-iterable source to merge. * @param {AsyncIterable} v4 The fourth async-iterable source to merge. * @param {AsyncIterable} v5 The fifth async-iterable source to merge. * @returns {(AsyncIterableX)} The merged elements from all of the specified async-iterable sequences * into a single async-iterable sequence. */ export function merge( source: AsyncIterable, v2: AsyncIterable, v3: AsyncIterable, v4: AsyncIterable, v5: AsyncIterable ): AsyncIterableX; /** * Merges elements from all of the specified async-iterable sequences into a single async-iterable sequence. * * @template T The type of the first async-iterable sequence. * @template T2 The type of the second async-iterable sequence. * @template T3 The type of the third async-iterable sequence. * @template T4 The type of the fourth async-iterable sequence. * @template T5 The type of the fifth async-iterable sequence. * @template T6 The type of the sixth async-iterable sequence. * @param {AsyncIterable} source The first async-iterable source to merge. * @param {AsyncIterable} v2 The second async-iterable source to merge. * @param {AsyncIterable} v3 The third async-iterable source to merge. * @param {AsyncIterable} v4 The fourth async-iterable source to merge. * @param {AsyncIterable} v5 The fifth async-iterable source to merge. * @param {AsyncIterable} v6 The sixth async-iterable source to merge. * @returns {(AsyncIterableX)} The merged elements from all of the specified async-iterable sequences * into a single async-iterable sequence. */ export function merge( source: AsyncIterable, v2: AsyncIterable, v3: AsyncIterable, v4: AsyncIterable, v5: AsyncIterable, v6: AsyncIterable ): AsyncIterableX; /** * Merges elements from all of the specified async-iterable sequences into a single async-iterable sequence. * * @template T The type of the elements in the sequence to merge. * @param {AsyncIterable} source The first async-iterable source to merge. * @param {...AsyncIterable[]} args The async-iterable sources to merge. * @returns {AsyncIterableX} The merged elements from all of the specified async-iterable sequences into a single async-iterable sequence. */ export function merge(source: AsyncIterable, ...args: AsyncIterable[]): AsyncIterableX; export function merge(source: AsyncIterable, ...args: AsyncIterable[]): AsyncIterableX { return new MergeAsyncIterable([source, ...args]); }