{"version":3,"file":"distinct.cjs","sources":["../../../src/operators/distinct.ts"],"sourcesContent":["import { DifferenceStreamWriter, UnaryOperator } from '../graph.js'\nimport { StreamBuilder } from '../d2.js'\nimport { hash } from '../hashing/index.js'\nimport { MultiSet } from '../multiset.js'\nimport type { Hash } from '../hashing/index.js'\nimport type { DifferenceStreamReader } from '../graph.js'\nimport type { IStreamBuilder, KeyValue } from '../types.js'\n\ntype Multiplicity = number\n\ntype GetValue<T> = T extends KeyValue<any, infer V> ? V : never\n\n/**\n * Operator that removes duplicates\n */\nexport class DistinctOperator<\n  T extends KeyValue<any, any>,\n> extends UnaryOperator<T, KeyValue<number, GetValue<T>>> {\n  #by: (value: T) => any\n  #values: Map<Hash, Multiplicity> // keeps track of the number of times each value has been seen\n\n  constructor(\n    id: number,\n    input: DifferenceStreamReader<T>,\n    output: DifferenceStreamWriter<KeyValue<number, GetValue<T>>>,\n    by: (value: T) => any = (value: T) => value,\n  ) {\n    super(id, input, output)\n    this.#by = by\n    this.#values = new Map()\n  }\n\n  run(): void {\n    const updatedValues = new Map<Hash, [Multiplicity, T]>()\n\n    // Compute the new multiplicity for each value\n    for (const message of this.inputMessages()) {\n      for (const [value, diff] of message.getInner()) {\n        const hashedValue = hash(this.#by(value))\n\n        const oldMultiplicity =\n          updatedValues.get(hashedValue)?.[0] ??\n          this.#values.get(hashedValue) ??\n          0\n        const newMultiplicity = oldMultiplicity + diff\n        updatedValues.set(hashedValue, [newMultiplicity, value])\n      }\n    }\n\n    const result: Array<[KeyValue<number, GetValue<T>>, number]> = []\n\n    // Check which values became visible or disappeared\n    for (const [\n      hashedValue,\n      [newMultiplicity, value],\n    ] of updatedValues.entries()) {\n      const oldMultiplicity = this.#values.get(hashedValue) ?? 0\n\n      if (newMultiplicity === 0) {\n        this.#values.delete(hashedValue)\n      } else {\n        this.#values.set(hashedValue, newMultiplicity)\n      }\n\n      if (oldMultiplicity <= 0 && newMultiplicity > 0) {\n        // The value wasn't present in the stream\n        // but with this change it is now present in the stream\n        result.push([[hash(this.#by(value)), value[1]], 1])\n      } else if (oldMultiplicity > 0 && newMultiplicity <= 0) {\n        // The value was present in the stream\n        // but with this change it is no longer present in the stream\n        result.push([[hash(this.#by(value)), value[1]], -1])\n      }\n    }\n\n    if (result.length > 0) {\n      this.output.sendData(new MultiSet(result))\n    }\n  }\n}\n\n/**\n * Removes duplicate values\n */\nexport function distinct<T extends KeyValue<any, any>>(\n  by: (value: T) => any = (value: T) => value,\n) {\n  return (stream: IStreamBuilder<T>): IStreamBuilder<T> => {\n    const output = new StreamBuilder<T>(\n      stream.graph,\n      new DifferenceStreamWriter<T>(),\n    )\n    const operator = new DistinctOperator<T>(\n      stream.graph.getNextOperatorId(),\n      stream.connectReader(),\n      output.writer,\n      by,\n    )\n    stream.graph.addOperator(operator)\n    return output\n  }\n}\n"],"names":["UnaryOperator","hash","MultiSet","StreamBuilder","DifferenceStreamWriter"],"mappings":";;;;;;AAeO,MAAM,yBAEHA,MAAAA,cAAgD;AAAA,EACxD;AAAA,EACA;AAAA;AAAA,EAEA,YACE,IACA,OACA,QACA,KAAwB,CAAC,UAAa,OACtC;AACA,UAAM,IAAI,OAAO,MAAM;AACvB,SAAK,MAAM;AACX,SAAK,8BAAc,IAAA;AAAA,EACrB;AAAA,EAEA,MAAY;AACV,UAAM,oCAAoB,IAAA;AAG1B,eAAW,WAAW,KAAK,iBAAiB;AAC1C,iBAAW,CAAC,OAAO,IAAI,KAAK,QAAQ,YAAY;AAC9C,cAAM,cAAcC,KAAAA,KAAK,KAAK,IAAI,KAAK,CAAC;AAExC,cAAM,kBACJ,cAAc,IAAI,WAAW,IAAI,CAAC,KAClC,KAAK,QAAQ,IAAI,WAAW,KAC5B;AACF,cAAM,kBAAkB,kBAAkB;AAC1C,sBAAc,IAAI,aAAa,CAAC,iBAAiB,KAAK,CAAC;AAAA,MACzD;AAAA,IACF;AAEA,UAAM,SAAyD,CAAA;AAG/D,eAAW;AAAA,MACT;AAAA,MACA,CAAC,iBAAiB,KAAK;AAAA,IAAA,KACpB,cAAc,WAAW;AAC5B,YAAM,kBAAkB,KAAK,QAAQ,IAAI,WAAW,KAAK;AAEzD,UAAI,oBAAoB,GAAG;AACzB,aAAK,QAAQ,OAAO,WAAW;AAAA,MACjC,OAAO;AACL,aAAK,QAAQ,IAAI,aAAa,eAAe;AAAA,MAC/C;AAEA,UAAI,mBAAmB,KAAK,kBAAkB,GAAG;AAG/C,eAAO,KAAK,CAAC,CAACA,KAAAA,KAAK,KAAK,IAAI,KAAK,CAAC,GAAG,MAAM,CAAC,CAAC,GAAG,CAAC,CAAC;AAAA,MACpD,WAAW,kBAAkB,KAAK,mBAAmB,GAAG;AAGtD,eAAO,KAAK,CAAC,CAACA,KAAAA,KAAK,KAAK,IAAI,KAAK,CAAC,GAAG,MAAM,CAAC,CAAC,GAAG,EAAE,CAAC;AAAA,MACrD;AAAA,IACF;AAEA,QAAI,OAAO,SAAS,GAAG;AACrB,WAAK,OAAO,SAAS,IAAIC,SAAAA,SAAS,MAAM,CAAC;AAAA,IAC3C;AAAA,EACF;AACF;AAKO,SAAS,SACd,KAAwB,CAAC,UAAa,OACtC;AACA,SAAO,CAAC,WAAiD;AACvD,UAAM,SAAS,IAAIC,GAAAA;AAAAA,MACjB,OAAO;AAAA,MACP,IAAIC,MAAAA,uBAAA;AAAA,IAA0B;AAEhC,UAAM,WAAW,IAAI;AAAA,MACnB,OAAO,MAAM,kBAAA;AAAA,MACb,OAAO,cAAA;AAAA,MACP,OAAO;AAAA,MACP;AAAA,IAAA;AAEF,WAAO,MAAM,YAAY,QAAQ;AACjC,WAAO;AAAA,EACT;AACF;;;"}