const FILTERED = Symbol("filtered AsyncArray element"); type FilteredElement = typeof FILTERED; const isNotFilteredElement = (elem: T | FilteredElement): elem is T => elem !== FILTERED; const promisifyValue = (val: V | Promise): Promise => Promise.resolve(val); // Two main characteristics: // 1. Takes asynchronous functions for operating on arrays. // 2. Operates on arrays concurrently. // For example: // .map maps values to a potential value, and calls the mapping function on elements non-deterministically. // .some/.every takes asynchronous predicates and races value Promises for first to pass/fail predicate, not necessarily most initial in position. export default class AsyncArray { private readonly elements: Promise[]; private constructor(from: Iterable>) { this.elements = [...from]; } static from(from: Iterable>) { return new AsyncArray(Array.from(from, promisifyValue)); } async toArray(): Promise { return (await Promise.all(this.elements)).filter(isNotFilteredElement); } concat(...vals: (T | Promise | T[] | Promise[])[]): AsyncArray { return new AsyncArray( this.elements.concat( ...vals.map((val) => Array.isArray(val) ? (val as any).map(promisifyValue) : promisifyValue(val) ) ) ); } async forEach(iterator: (val: T) => void | Promise): Promise { await Promise.all( this.elements.map(async (val) => { const resolved = await val; if (isNotFilteredElement(resolved)) { await iterator(resolved); } }) ); } async every( predicate: (val: T) => boolean | Promise ): Promise { // Use .some for short-circuiting behaviour. return !(await this.some(async (val) => !(await predicate(val)))); } some(predicate: (val: T) => boolean | Promise): Promise { return new Promise((resolve, reject) => { let remaining = this.elements.length; this.elements.forEach(async (val) => { try { const resolved = await val; if (isNotFilteredElement(resolved) && (await predicate(resolved))) { resolve(true); } } catch (e) { reject(e); } if (--remaining == 0) { resolve(false); } }); }); } map(mapper: (val: T) => M | Promise): AsyncArray { return this.next((val) => promisifyValue(mapper(val))); } filter(predicate: (val: T) => boolean | Promise): AsyncArray { return this.next(async (elem) => (await predicate(elem)) ? elem : FILTERED ); } // TypeScript does not allow asynchronous user-defined type guards currently. filterType(predicate: (val: T) => val is R): AsyncArray { return this.filter(predicate) as any; } // Hack to allow filtering using a synchronous user-defined type guard. push(...vals: (T | Promise)[]): number { return this.elements.push(...vals.map(promisifyValue)); } async pop(): Promise { let popped; while ((popped = await this.elements.pop()) === FILTERED) {} return popped; } unshift(vals: (T | Promise)[]): number { return this.elements.unshift(...vals.map(promisifyValue)); } async shift(): Promise { let shifted; while ((shifted = await this.elements.shift()) === FILTERED) {} return shifted; } private next(fn: (val: T) => Promise) { return new AsyncArray( this.elements.map(async (val) => { const resolved = await val; return resolved !== FILTERED ? fn(resolved) : FILTERED; }) ); } }