import { aggregateErrors, bailableWait } from './util' // reads values from a generator into a list // breaks when isDone signals `true` AND `waitFor` completes OR when a max length is reached // NOTE: does not signal generator to close. it *will* continue to produce values export const readFromGenerator = async ( gen: AsyncGenerator, isDone: (last?: T) => Promise | boolean, waitFor: Promise = Promise.resolve(), maxLength = Number.MAX_SAFE_INTEGER, ): Promise => { const evts: T[] = [] let bail: undefined | (() => void) let hasBroke = false const awaitDone = async () => { if (await isDone(evts.at(-1))) { return true } const bailable = bailableWait(20) await bailable.wait() bail = bailable.bail if (hasBroke) return false return await awaitDone() } const breakOn: Promise = new Promise((resolve) => { waitFor.then(() => { awaitDone().then(() => resolve()) }) }) try { while (evts.length < maxLength) { const maybeEvt = await Promise.race([gen.next(), breakOn]) if (!maybeEvt) break const evt = maybeEvt as IteratorResult if (evt.done) break evts.push(evt.value) } } finally { hasBroke = true bail && bail() } return evts } export type Deferrable = { resolve: (value: T | PromiseLike) => void reject: (reason?: unknown) => void complete: Promise } export function createDeferrable(): Deferrable { let res: (value: T | PromiseLike) => void let rej: (reason?: unknown) => void const promise = new Promise((resolve, reject) => { res = resolve rej = reject }) return { resolve: res!, reject: rej!, complete: promise } } export const createDeferrables = (count: number): Deferrable[] => { const list: Deferrable[] = [] for (let i = 0; i < count; i++) { list.push(createDeferrable()) } return list } export const allComplete = async (deferrables: Deferrable[]): Promise => { await Promise.all(deferrables.map((d) => d.complete)) } export class AsyncBuffer { private buffer: T[] = [] private promise: Promise private resolve: () => void private closed = false private toThrow: unknown | undefined constructor(public maxSize?: number) { // Initializing to satisfy types/build, immediately reset by resetPromise() this.promise = Promise.resolve() this.resolve = () => null this.resetPromise() } get curr(): T[] { return this.buffer } get size(): number { return this.buffer.length } get isClosed(): boolean { return this.closed } resetPromise() { this.promise = new Promise((r) => (this.resolve = r)) } push(item: T) { this.buffer.push(item) this.resolve() } pushMany(items: T[]) { items.forEach((i) => this.buffer.push(i)) this.resolve() } async *events(): AsyncGenerator { while (true) { if (this.closed && this.buffer.length === 0) { if (this.toThrow) { throw this.toThrow } else { return } } await this.promise if (this.toThrow) { throw this.toThrow } if (this.maxSize && this.size > this.maxSize) { throw new AsyncBufferFullError(this.maxSize) } const [first, ...rest] = this.buffer if (first) { this.buffer = rest yield first } else { this.resetPromise() } } } throw(err: unknown) { this.toThrow = err this.closed = true this.resolve() } close() { this.closed = true this.resolve() } } export class AsyncBufferFullError extends Error { constructor(maxSize: number) { super(`ReachedMaxBufferSize: ${maxSize}`) } } /** * Utility function that behaves like {@link Promise.allSettled} but returns the * same result as {@link Promise.all} in case every promise is fulfilled, and * throws an {@link AggregateError} if there are more than one errors. */ export function allFulfilled( promises: T, ): Promise<{ -readonly [P in keyof T]: Awaited }> export function allFulfilled( promises: Iterable>, ): Promise[]> export function allFulfilled( promises: Iterable>, ): Promise { return Promise.allSettled(promises).then(handleAllSettledErrors) } export function handleAllSettledErrors< T extends readonly PromiseSettledResult[] | [], >( results: T, ): { -readonly [P in keyof T]: T[P] extends PromiseSettledResult ? U : never } export function handleAllSettledErrors( results: PromiseSettledResult[], ): T[] export function handleAllSettledErrors( results: PromiseSettledResult[], ): unknown[] { if (results.every(isFulfilledResult)) return results.map(extractValue) const errors = results.filter(isRejectedResult).map(extractReason) throw aggregateErrors(errors) } export function isRejectedResult( result: PromiseSettledResult, ): result is PromiseRejectedResult { return result.status === 'rejected' } function extractReason(result: PromiseRejectedResult): unknown { return result.reason } export function isFulfilledResult( result: PromiseSettledResult, ): result is PromiseFulfilledResult { return result.status === 'fulfilled' } function extractValue(result: PromiseFulfilledResult): T { return result.value }