{"version":3,"file":"graph.cjs","sources":["../../src/graph.ts"],"sourcesContent":["import { MultiSet } from './multiset.js'\nimport type { MultiSetArray } from './multiset.js'\nimport type {\n  IDifferenceStreamReader,\n  IDifferenceStreamWriter,\n  IOperator,\n} from './types.js'\n\n/**\n * A read handle to a dataflow edge that receives data from a writer.\n */\nexport class DifferenceStreamReader<T> implements IDifferenceStreamReader<T> {\n  #queue: Array<MultiSet<T>>\n\n  constructor(queue: Array<MultiSet<T>>) {\n    this.#queue = queue\n  }\n\n  drain(): Array<MultiSet<T>> {\n    const out = [...this.#queue].reverse()\n    this.#queue.length = 0\n    return out\n  }\n\n  isEmpty(): boolean {\n    return this.#queue.length === 0\n  }\n}\n\n/**\n * A write handle to a dataflow edge that is allowed to publish data.\n */\nexport class DifferenceStreamWriter<T> implements IDifferenceStreamWriter<T> {\n  #queues: Array<Array<MultiSet<T>>> = []\n\n  sendData(collection: MultiSet<T> | MultiSetArray<T>): void {\n    if (!(collection instanceof MultiSet)) {\n      collection = new MultiSet(collection)\n    }\n\n    for (const q of this.#queues) {\n      q.unshift(collection)\n    }\n  }\n\n  newReader(): DifferenceStreamReader<T> {\n    const q: Array<MultiSet<T>> = []\n    this.#queues.push(q)\n    return new DifferenceStreamReader(q)\n  }\n}\n\n/**\n * A generic implementation of a dataflow operator (node) that has multiple incoming edges (read handles) and\n * one outgoing edge (write handle).\n */\nexport abstract class Operator<T> implements IOperator<T> {\n  protected inputs: Array<DifferenceStreamReader<T>>\n  protected output: DifferenceStreamWriter<T>\n\n  constructor(\n    public id: number,\n    inputs: Array<DifferenceStreamReader<T>>,\n    output: DifferenceStreamWriter<T>,\n  ) {\n    this.inputs = inputs\n    this.output = output\n  }\n\n  abstract run(): void\n\n  hasPendingWork(): boolean {\n    return this.inputs.some((input) => !input.isEmpty())\n  }\n}\n\n/**\n * A convenience implementation of a dataflow operator that has a handle to one\n * incoming stream of data, and one handle to an outgoing stream of data.\n */\nexport abstract class UnaryOperator<Tin, Tout = Tin> extends Operator<\n  Tin | Tout\n> {\n  constructor(\n    public id: number,\n    inputA: DifferenceStreamReader<Tin>,\n    output: DifferenceStreamWriter<Tout>,\n  ) {\n    super(id, [inputA], output)\n  }\n\n  inputMessages(): Array<MultiSet<Tin>> {\n    return this.inputs[0]!.drain() as Array<MultiSet<Tin>>\n  }\n}\n\n/**\n * A convenience implementation of a dataflow operator that has a handle to two\n * incoming streams of data, and one handle to an outgoing stream of data.\n */\nexport abstract class BinaryOperator<T> extends Operator<T> {\n  constructor(\n    public id: number,\n    inputA: DifferenceStreamReader<T>,\n    inputB: DifferenceStreamReader<T>,\n    output: DifferenceStreamWriter<T>,\n  ) {\n    super(id, [inputA, inputB], output)\n  }\n\n  inputAMessages(): Array<MultiSet<T>> {\n    return this.inputs[0]!.drain()\n  }\n\n  inputBMessages(): Array<MultiSet<T>> {\n    return this.inputs[1]!.drain()\n  }\n}\n\n/**\n * Base class for operators that process a single input stream\n */\nexport abstract class LinearUnaryOperator<T, U> extends UnaryOperator<T | U> {\n  abstract inner(collection: MultiSet<T | U>): MultiSet<U>\n\n  run(): void {\n    for (const message of this.inputMessages()) {\n      this.output.sendData(this.inner(message))\n    }\n  }\n}\n"],"names":["MultiSet"],"mappings":";;;AAWO,MAAM,uBAAgE;AAAA,EAC3E;AAAA,EAEA,YAAY,OAA2B;AACrC,SAAK,SAAS;AAAA,EAChB;AAAA,EAEA,QAA4B;AAC1B,UAAM,MAAM,CAAC,GAAG,KAAK,MAAM,EAAE,QAAA;AAC7B,SAAK,OAAO,SAAS;AACrB,WAAO;AAAA,EACT;AAAA,EAEA,UAAmB;AACjB,WAAO,KAAK,OAAO,WAAW;AAAA,EAChC;AACF;AAKO,MAAM,uBAAgE;AAAA,EAC3E,UAAqC,CAAA;AAAA,EAErC,SAAS,YAAkD;AACzD,QAAI,EAAE,sBAAsBA,SAAAA,WAAW;AACrC,mBAAa,IAAIA,SAAAA,SAAS,UAAU;AAAA,IACtC;AAEA,eAAW,KAAK,KAAK,SAAS;AAC5B,QAAE,QAAQ,UAAU;AAAA,IACtB;AAAA,EACF;AAAA,EAEA,YAAuC;AACrC,UAAM,IAAwB,CAAA;AAC9B,SAAK,QAAQ,KAAK,CAAC;AACnB,WAAO,IAAI,uBAAuB,CAAC;AAAA,EACrC;AACF;AAMO,MAAe,SAAoC;AAAA,EAIxD,YACS,IACP,QACA,QACA;AAHO,SAAA,KAAA;AAIP,SAAK,SAAS;AACd,SAAK,SAAS;AAAA,EAChB;AAAA,EAIA,iBAA0B;AACxB,WAAO,KAAK,OAAO,KAAK,CAAC,UAAU,CAAC,MAAM,SAAS;AAAA,EACrD;AACF;AAMO,MAAe,sBAAuC,SAE3D;AAAA,EACA,YACS,IACP,QACA,QACA;AACA,UAAM,IAAI,CAAC,MAAM,GAAG,MAAM;AAJnB,SAAA,KAAA;AAAA,EAKT;AAAA,EAEA,gBAAsC;AACpC,WAAO,KAAK,OAAO,CAAC,EAAG,MAAA;AAAA,EACzB;AACF;AAMO,MAAe,uBAA0B,SAAY;AAAA,EAC1D,YACS,IACP,QACA,QACA,QACA;AACA,UAAM,IAAI,CAAC,QAAQ,MAAM,GAAG,MAAM;AAL3B,SAAA,KAAA;AAAA,EAMT;AAAA,EAEA,iBAAqC;AACnC,WAAO,KAAK,OAAO,CAAC,EAAG,MAAA;AAAA,EACzB;AAAA,EAEA,iBAAqC;AACnC,WAAO,KAAK,OAAO,CAAC,EAAG,MAAA;AAAA,EACzB;AACF;AAKO,MAAe,4BAAkC,cAAqB;AAAA,EAG3E,MAAY;AACV,eAAW,WAAW,KAAK,iBAAiB;AAC1C,WAAK,OAAO,SAAS,KAAK,MAAM,OAAO,CAAC;AAAA,IAC1C;AAAA,EACF;AACF;;;;;;;"}