{"version":3,"file":"reduce.cjs","sources":["../../../src/operators/reduce.ts"],"sourcesContent":["import { DifferenceStreamWriter, UnaryOperator } from '../graph.js'\nimport { StreamBuilder } from '../d2.js'\nimport { MultiSet } from '../multiset.js'\nimport { Index } from '../indexes.js'\nimport type { DifferenceStreamReader } from '../graph.js'\nimport type { IStreamBuilder, KeyValue } from '../types.js'\n\n/**\n * Base operator for reduction operations (version-free)\n */\nexport class ReduceOperator<K, V1, V2> extends UnaryOperator<[K, V1], [K, V2]> {\n  #index = new Index<K, V1>()\n  #indexOut = new Index<K, V2>()\n  #f: (values: Array<[V1, number]>) => Array<[V2, number]>\n\n  constructor(\n    id: number,\n    inputA: DifferenceStreamReader<[K, V1]>,\n    output: DifferenceStreamWriter<[K, V2]>,\n    f: (values: Array<[V1, number]>) => Array<[V2, number]>,\n  ) {\n    super(id, inputA, output)\n    this.#f = f\n  }\n\n  run(): void {\n    // Collect all input messages and update the index\n    const keysTodo = new Set<K>()\n    for (const message of this.inputMessages()) {\n      for (const [item, multiplicity] of message.getInner()) {\n        const [key, value] = item\n        this.#index.addValue(key, [value, multiplicity])\n        keysTodo.add(key)\n      }\n    }\n\n    // For each key, compute the reduction and delta\n    const result: Array<[[K, V2], number]> = []\n    for (const key of keysTodo) {\n      const curr = this.#index.get(key)\n      const currOut = this.#indexOut.get(key)\n      const out = this.#f(curr)\n\n      // Create maps for current and previous outputs using values directly as keys\n      const newOutputMap = new Map<V2, number>()\n      const oldOutputMap = new Map<V2, number>()\n\n      // Process new output\n      for (const [value, multiplicity] of out) {\n        const existing = newOutputMap.get(value) ?? 0\n        newOutputMap.set(value, existing + multiplicity)\n      }\n\n      // Process previous output\n      for (const [value, multiplicity] of currOut) {\n        const existing = oldOutputMap.get(value) ?? 0\n        oldOutputMap.set(value, existing + multiplicity)\n      }\n\n      // First, emit removals for old values that are no longer present\n      for (const [value, multiplicity] of oldOutputMap) {\n        if (!newOutputMap.has(value)) {\n          // Remove the old value entirely\n          result.push([[key, value], -multiplicity])\n          this.#indexOut.addValue(key, [value, -multiplicity])\n        }\n      }\n\n      // Then, emit additions for new values that are not present in old\n      for (const [value, multiplicity] of newOutputMap) {\n        if (!oldOutputMap.has(value)) {\n          // Add the new value only if it has non-zero multiplicity\n          if (multiplicity !== 0) {\n            result.push([[key, value], multiplicity])\n            this.#indexOut.addValue(key, [value, multiplicity])\n          }\n        }\n      }\n\n      // Finally, emit multiplicity changes for values that were present and are still present\n      for (const [value, newMultiplicity] of newOutputMap) {\n        const oldMultiplicity = oldOutputMap.get(value)\n        if (oldMultiplicity !== undefined) {\n          const delta = newMultiplicity - oldMultiplicity\n          // Only emit actual changes, i.e. non-zero deltas\n          if (delta !== 0) {\n            result.push([[key, value], delta])\n            this.#indexOut.addValue(key, [value, delta])\n          }\n        }\n      }\n    }\n\n    if (result.length > 0) {\n      this.output.sendData(new MultiSet(result))\n    }\n  }\n}\n\n/**\n * Reduces the elements in the stream by key (version-free)\n */\nexport function reduce<\n  KType extends T extends KeyValue<infer K, infer _V> ? K : never,\n  V1Type extends T extends KeyValue<KType, infer V> ? V : never,\n  R,\n  T,\n>(f: (values: Array<[V1Type, number]>) => Array<[R, number]>) {\n  return (stream: IStreamBuilder<T>): IStreamBuilder<KeyValue<KType, R>> => {\n    const output = new StreamBuilder<KeyValue<KType, R>>(\n      stream.graph,\n      new DifferenceStreamWriter<KeyValue<KType, R>>(),\n    )\n    const operator = new ReduceOperator<KType, V1Type, R>(\n      stream.graph.getNextOperatorId(),\n      stream.connectReader() as DifferenceStreamReader<KeyValue<KType, V1Type>>,\n      output.writer,\n      f,\n    )\n    stream.graph.addOperator(operator)\n    return output\n  }\n}\n"],"names":["UnaryOperator","Index","MultiSet","StreamBuilder","DifferenceStreamWriter"],"mappings":";;;;;;AAUO,MAAM,uBAAkCA,MAAAA,cAAgC;AAAA,EAC7E,SAAS,IAAIC,QAAAA,MAAA;AAAA,EACb,YAAY,IAAIA,QAAAA,MAAA;AAAA,EAChB;AAAA,EAEA,YACE,IACA,QACA,QACA,GACA;AACA,UAAM,IAAI,QAAQ,MAAM;AACxB,SAAK,KAAK;AAAA,EACZ;AAAA,EAEA,MAAY;AAEV,UAAM,+BAAe,IAAA;AACrB,eAAW,WAAW,KAAK,iBAAiB;AAC1C,iBAAW,CAAC,MAAM,YAAY,KAAK,QAAQ,YAAY;AACrD,cAAM,CAAC,KAAK,KAAK,IAAI;AACrB,aAAK,OAAO,SAAS,KAAK,CAAC,OAAO,YAAY,CAAC;AAC/C,iBAAS,IAAI,GAAG;AAAA,MAClB;AAAA,IACF;AAGA,UAAM,SAAmC,CAAA;AACzC,eAAW,OAAO,UAAU;AAC1B,YAAM,OAAO,KAAK,OAAO,IAAI,GAAG;AAChC,YAAM,UAAU,KAAK,UAAU,IAAI,GAAG;AACtC,YAAM,MAAM,KAAK,GAAG,IAAI;AAGxB,YAAM,mCAAmB,IAAA;AACzB,YAAM,mCAAmB,IAAA;AAGzB,iBAAW,CAAC,OAAO,YAAY,KAAK,KAAK;AACvC,cAAM,WAAW,aAAa,IAAI,KAAK,KAAK;AAC5C,qBAAa,IAAI,OAAO,WAAW,YAAY;AAAA,MACjD;AAGA,iBAAW,CAAC,OAAO,YAAY,KAAK,SAAS;AAC3C,cAAM,WAAW,aAAa,IAAI,KAAK,KAAK;AAC5C,qBAAa,IAAI,OAAO,WAAW,YAAY;AAAA,MACjD;AAGA,iBAAW,CAAC,OAAO,YAAY,KAAK,cAAc;AAChD,YAAI,CAAC,aAAa,IAAI,KAAK,GAAG;AAE5B,iBAAO,KAAK,CAAC,CAAC,KAAK,KAAK,GAAG,CAAC,YAAY,CAAC;AACzC,eAAK,UAAU,SAAS,KAAK,CAAC,OAAO,CAAC,YAAY,CAAC;AAAA,QACrD;AAAA,MACF;AAGA,iBAAW,CAAC,OAAO,YAAY,KAAK,cAAc;AAChD,YAAI,CAAC,aAAa,IAAI,KAAK,GAAG;AAE5B,cAAI,iBAAiB,GAAG;AACtB,mBAAO,KAAK,CAAC,CAAC,KAAK,KAAK,GAAG,YAAY,CAAC;AACxC,iBAAK,UAAU,SAAS,KAAK,CAAC,OAAO,YAAY,CAAC;AAAA,UACpD;AAAA,QACF;AAAA,MACF;AAGA,iBAAW,CAAC,OAAO,eAAe,KAAK,cAAc;AACnD,cAAM,kBAAkB,aAAa,IAAI,KAAK;AAC9C,YAAI,oBAAoB,QAAW;AACjC,gBAAM,QAAQ,kBAAkB;AAEhC,cAAI,UAAU,GAAG;AACf,mBAAO,KAAK,CAAC,CAAC,KAAK,KAAK,GAAG,KAAK,CAAC;AACjC,iBAAK,UAAU,SAAS,KAAK,CAAC,OAAO,KAAK,CAAC;AAAA,UAC7C;AAAA,QACF;AAAA,MACF;AAAA,IACF;AAEA,QAAI,OAAO,SAAS,GAAG;AACrB,WAAK,OAAO,SAAS,IAAIC,SAAAA,SAAS,MAAM,CAAC;AAAA,IAC3C;AAAA,EACF;AACF;AAKO,SAAS,OAKd,GAA4D;AAC5D,SAAO,CAAC,WAAkE;AACxE,UAAM,SAAS,IAAIC,GAAAA;AAAAA,MACjB,OAAO;AAAA,MACP,IAAIC,MAAAA,uBAAA;AAAA,IAA2C;AAEjD,UAAM,WAAW,IAAI;AAAA,MACnB,OAAO,MAAM,kBAAA;AAAA,MACb,OAAO,cAAA;AAAA,MACP,OAAO;AAAA,MACP;AAAA,IAAA;AAEF,WAAO,MAAM,YAAY,QAAQ;AACjC,WAAO;AAAA,EACT;AACF;;;"}