import { AsyncIterableX } from './asynciterablex.js'; import { wrapWithAbort } from './operators/withabort.js'; import { throwIfAborted } from '../aborterror.js'; import { safeRace } from '../util/safeRace.js'; type MergeResult = { value: T; index: number }; function wrapPromiseWithIndex(promise: Promise, index: number) { return promise.then((value) => ({ value, index })) as Promise>; } /** @ignore */ export class RaceAsyncIterable extends AsyncIterableX { private _sources: AsyncIterable[]; constructor(sources: AsyncIterable[]) { super(); this._sources = sources; } async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); const sources = this._sources; const length = sources.length; const iterators = new Array>(length); const nexts = new Array>>>(length); for (let i = 0; i < length; i++) { const iterator = wrapWithAbort(sources[i], signal)[Symbol.asyncIterator](); iterators[i] = iterator; nexts[i] = wrapPromiseWithIndex(iterator.next(), i); } const next = safeRace(nexts); const { value: next$, index } = await next; if (!next$.done) { yield next$.value; } const iterator$ = iterators[index]; // Cancel/finish other iterators for (let i = 0; i < length; i++) { if (i === index) { continue; } const otherIterator = iterators[i]; if (otherIterator.return) { otherIterator.return(); } } let nextItem; while (!(nextItem = await iterator$.next()).done) { yield nextItem.value; } } } /** * Propagates the async sequence that reacts first. * * @param {...AsyncIterable[]} sources The source sequences. * @return {AsyncIterable} An async sequence that surfaces either of the given sequences, whichever reacted first. */ export function race(...sources: AsyncIterable[]): AsyncIterableX { return new RaceAsyncIterable(sources); }