/** * Chunk iterables are used in two cases. * Case 1: To process a large datastream in batches rather than all at once or one at a time. * This is a spatial performance and time-to-first-result optimization * Case 2: To allow query operations that cannot be done in the database to be done in the application. * This allows developers to add filters to their queries that run arbitrary bits of code. * * See more on chunk iterables here: https://tantaman.com/2022-05-26-chunk-iterable.html */ export interface ChunkIterable { [Symbol.asyncIterator](): AsyncIterator; gen(): Promise; map(fn: (x: T) => TOut): ChunkIterable; mapAsync(fn: (x: T) => Promise): ChunkIterable; filter(fn: (x: T) => boolean): ChunkIterable; filterAsync(fn: (x: T) => Promise): ChunkIterable; orderBy(fn: (l: T, r: T) => number): ChunkIterable; // orderByAsync(fn: (l: T, r: T) => Promise): ChunkIterable; take(n: number): ChunkIterable; count(): ChunkIterable; union(other: ChunkIterable): ChunkIterable; } export abstract class BaseChunkIterable implements ChunkIterable { // TODO: we should probs memoize the results of this... abstract [Symbol.asyncIterator](): AsyncIterator; async gen(): Promise { let ret: T[] = []; for await (const chunk of this) { ret = ret.concat(chunk); } return ret; } mapAsync(fn: (x: T) => Promise): ChunkIterable { return new MappedChunkIterable(this, fn); } map(fn: (x: T) => TOut): ChunkIterable { return new SyncMappedChunkIterable(this, fn); } filter(fn: (x: T) => boolean): ChunkIterable { return new SyncFilteredChunkIterable(this, fn); } filterAsync(fn: (x: T) => Promise): ChunkIterable { return new FilteredChunkIterable(this, fn); } orderBy(fn: (l: T, r: T) => number): ChunkIterable { return new OrderedChunkIterable(this, fn); } take(n: number): ChunkIterable { return new TakeChunkIterable(this, n); } count(): ChunkIterable { return new CountChunkIterable(this); } union(other: ChunkIterable): ChunkIterable { return new ConcatenatedChunkIterable([this, other]); } } export class ConcatenatedChunkIterable extends BaseChunkIterable { constructor(private pieces: BaseChunkIterable[]) { super(); } async *[Symbol.asyncIterator](): AsyncIterator { for (const piece of this.pieces) { for await (const chunk of piece) { yield chunk; } } } } export class StaticSourceChunkIterable extends BaseChunkIterable { constructor(private source: T[][]) { super(); } async *[Symbol.asyncIterator](): AsyncIterator { for (const chunk of this.source) { yield chunk; } } } export const emptyChunkIterable = new StaticSourceChunkIterable([]); export class PromiseSourceSingleChunkIterable extends BaseChunkIterable { constructor(private source: Promise) { super(); } async *[Symbol.asyncIterator](): AsyncIterator { const ret = await this.source; yield ret; } } export class MappedChunkIterable extends BaseChunkIterable { constructor(private source: ChunkIterable, private fn: (x: TIn) => Promise) { super(); } async *[Symbol.asyncIterator](): AsyncIterator { for await (const chunk of this.source) { yield await Promise.all(chunk.map(this.fn)); } } } export class SyncMappedChunkIterable extends BaseChunkIterable { constructor(private source: ChunkIterable, private fn: (x: TIn) => TOut) { super(); } async *[Symbol.asyncIterator](): AsyncIterator { for await (const chunk of this.source) { yield chunk.map(this.fn); } } } export class FilteredChunkIterable extends BaseChunkIterable { constructor(private source: ChunkIterable, private fn: (x: T) => Promise) { super(); } async *[Symbol.asyncIterator](): AsyncIterator { for await (const chunk of this.source) { const filterResults = await Promise.all(chunk.map(this.fn)); const filteredChunk: T[] = []; for (let i = 0; i < chunk.length; ++i) { if (filterResults[i]) { filteredChunk.push(chunk[i]); } } if (filteredChunk.length > 0) { yield filteredChunk; } } } } export class SyncFilteredChunkIterable extends BaseChunkIterable { constructor(private source: ChunkIterable, private fn: (x: T) => boolean) { super(); } async *[Symbol.asyncIterator](): AsyncIterator { for await (const chunk of this.source) { const filterResults = chunk.map(this.fn); const filteredChunk: T[] = []; for (let i = 0; i < chunk.length; ++i) { if (filterResults[i]) { filteredChunk.push(chunk[i]); } } if (filteredChunk.length > 0) { yield filteredChunk; } } } } export class TakeChunkIterable extends BaseChunkIterable { constructor(private source: ChunkIterable, private num: number) { super(); } async *[Symbol.asyncIterator](): AsyncIterator { let numLeft = this.num; if (numLeft === 0) { return []; } for await (const chunk of this.source) { if (chunk.length < numLeft) { yield chunk; numLeft -= chunk.length; } else { yield chunk.slice(0, numLeft); numLeft = 0; break; } } } } // Ordering in a chunk iterable is insanely expensive :( // We need to warn the user if/when their queries do this export class OrderedChunkIterable extends BaseChunkIterable { constructor(private source: ChunkIterable, private fn: (l: T, r: T) => number) { super(); } async *[Symbol.asyncIterator](): AsyncIterator { let all: T[] = []; for await (const chunk of this.source) { all = all.concat(chunk); } yield all.sort(this.fn); } } export class CountChunkIterable extends BaseChunkIterable { constructor(private source: ChunkIterable) { super(); } async *[Symbol.asyncIterator](): AsyncIterator { let count = 0; for await (const chunk of this.source) { count += chunk.length; } yield [count]; } }