import { DifferenceStreamWriter } from './graph.js' import type { BinaryOperator, DifferenceStreamReader, UnaryOperator, } from './graph.js' import type { MultiSet, MultiSetArray } from './multiset.js' import type { ID2, IStreamBuilder, PipedOperator } from './types.js' export class D2 implements ID2 { #operators: Array | BinaryOperator> = [] #nextOperatorId = 0 #finalized = false constructor() {} #checkNotFinalized(): void { if (this.#finalized) { throw new Error(`Graph already finalized`) } } getNextOperatorId(): number { this.#checkNotFinalized() return this.#nextOperatorId++ } newInput(): RootStreamBuilder { this.#checkNotFinalized() const writer = new DifferenceStreamWriter() // Use the root stream builder that exposes the sendData and sendFrontier methods const streamBuilder = new RootStreamBuilder(this, writer) return streamBuilder } addOperator(operator: UnaryOperator | BinaryOperator): void { this.#checkNotFinalized() this.#operators.push(operator) } finalize() { this.#checkNotFinalized() this.#finalized = true } step(): void { if (!this.#finalized) { throw new Error(`Graph not finalized`) } for (const op of this.#operators) { op.run() } } pendingWork(): boolean { return this.#operators.some((op) => op.hasPendingWork()) } run(): void { while (this.pendingWork()) { this.step() } } } export class StreamBuilder implements IStreamBuilder { #graph: ID2 #writer: DifferenceStreamWriter constructor(graph: ID2, writer: DifferenceStreamWriter) { this.#graph = graph this.#writer = writer } connectReader(): DifferenceStreamReader { return this.#writer.newReader() } get writer(): DifferenceStreamWriter { return this.#writer } get graph(): ID2 { return this.#graph } // Don't judge, this is the only way to type this function. // rxjs has very similar code to type its pipe function // https://github.com/ReactiveX/rxjs/blob/master/packages/rxjs/src/internal/util/pipe.ts // We go to 20 operators deep, because surly that's enough for anyone... // A user can always split the pipe into multiple pipes to get around this. pipe(o1: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator, o7: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator, o7: PipedOperator, o8: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator, o7: PipedOperator, o8: PipedOperator, o9: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator, o7: PipedOperator, o8: PipedOperator, o9: PipedOperator, o10: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator, o7: PipedOperator, o8: PipedOperator, o9: PipedOperator, o10: PipedOperator, o11: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator, o7: PipedOperator, o8: PipedOperator, o9: PipedOperator, o10: PipedOperator, o11: PipedOperator, o12: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator, o7: PipedOperator, o8: PipedOperator, o9: PipedOperator, o10: PipedOperator, o11: PipedOperator, o12: PipedOperator, o13: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator, o7: PipedOperator, o8: PipedOperator, o9: PipedOperator, o10: PipedOperator, o11: PipedOperator, o12: PipedOperator, o13: PipedOperator, o14: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator, o7: PipedOperator, o8: PipedOperator, o9: PipedOperator, o10: PipedOperator, o11: PipedOperator, o12: PipedOperator, o13: PipedOperator, o14: PipedOperator, o15: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator, o7: PipedOperator, o8: PipedOperator, o9: PipedOperator, o10: PipedOperator, o11: PipedOperator, o12: PipedOperator, o13: PipedOperator, o14: PipedOperator, o15: PipedOperator, o16: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator, o7: PipedOperator, o8: PipedOperator, o9: PipedOperator, o10: PipedOperator, o11: PipedOperator, o12: PipedOperator, o13: PipedOperator, o14: PipedOperator, o15: PipedOperator, o16: PipedOperator, o17: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator, o7: PipedOperator, o8: PipedOperator, o9: PipedOperator, o10: PipedOperator, o11: PipedOperator, o12: PipedOperator, o13: PipedOperator, o14: PipedOperator, o15: PipedOperator, o16: PipedOperator, o17: PipedOperator, o18: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator, o7: PipedOperator, o8: PipedOperator, o9: PipedOperator, o10: PipedOperator, o11: PipedOperator, o12: PipedOperator, o13: PipedOperator, o14: PipedOperator, o15: PipedOperator, o16: PipedOperator, o17: PipedOperator, o18: PipedOperator, o19: PipedOperator): IStreamBuilder // prettier-ignore pipe(o1: PipedOperator, o2: PipedOperator, o3: PipedOperator, o4: PipedOperator, o5: PipedOperator, o6: PipedOperator, o7: PipedOperator, o8: PipedOperator, o9: PipedOperator, o10: PipedOperator, o11: PipedOperator, o12: PipedOperator, o13: PipedOperator, o14: PipedOperator, o15: PipedOperator, o16: PipedOperator, o17: PipedOperator, o18: PipedOperator, o19: PipedOperator, o20: PipedOperator): IStreamBuilder pipe(...operators: Array>): IStreamBuilder { return operators.reduce((stream, operator) => { return operator(stream) }, this as IStreamBuilder) } } export class RootStreamBuilder extends StreamBuilder { sendData(collection: MultiSet | MultiSetArray): void { this.writer.sendData(collection) } }