{"version":3,"file":"topKWithFractionalIndex.cjs","sources":["../../../src/operators/topKWithFractionalIndex.ts"],"sourcesContent":["import { DifferenceStreamWriter, UnaryOperator } from '../graph.js'\nimport { StreamBuilder } from '../d2.js'\nimport { MultiSet } from '../multiset.js'\nimport { TopKState, handleMoveIn, handleMoveOut } from './topKState.js'\nimport { TopKArray, createKeyedComparator } from './topKArray.js'\nimport type { IndexedValue, TopK } from './topKArray.js'\nimport type { DifferenceStreamReader } from '../graph.js'\nimport type { IStreamBuilder, PipedOperator } from '../types.js'\n\nexport interface TopKWithFractionalIndexOptions {\n  limit?: number\n  offset?: number\n  setSizeCallback?: (getSize: () => number) => void\n  setWindowFn?: (\n    windowFn: (options: { offset?: number; limit?: number }) => void,\n  ) => void\n}\n\n/**\n * Operator for fractional indexed topK operations\n * This operator maintains fractional indices for sorted elements\n * and only updates indices when elements move position\n */\nexport class TopKWithFractionalIndexOperator<\n  K extends string | number,\n  T,\n> extends UnaryOperator<[K, T], [K, IndexedValue<T>]> {\n  #state: TopKState<K, T>\n\n  constructor(\n    id: number,\n    inputA: DifferenceStreamReader<[K, T]>,\n    output: DifferenceStreamWriter<[K, IndexedValue<T>]>,\n    comparator: (a: T, b: T) => number,\n    options: TopKWithFractionalIndexOptions,\n  ) {\n    super(id, inputA, output)\n    const limit = options.limit ?? Infinity\n    const offset = options.offset ?? 0\n    const topK = this.createTopK(\n      offset,\n      limit,\n      createKeyedComparator(comparator),\n    )\n    this.#state = new TopKState(topK)\n    options.setSizeCallback?.(() => this.#state.size)\n    options.setWindowFn?.(this.moveTopK.bind(this))\n  }\n\n  protected createTopK(\n    offset: number,\n    limit: number,\n    comparator: (a: [K, T], b: [K, T]) => number,\n  ): TopK<[K, T]> {\n    return new TopKArray(offset, limit, comparator)\n  }\n\n  /**\n   * Moves the topK window based on the provided offset and limit.\n   * Any changes to the topK are sent to the output.\n   */\n  moveTopK({ offset, limit }: { offset?: number; limit?: number }) {\n    const result: Array<[[K, IndexedValue<T>], number]> = []\n    const diff = this.#state.move({ offset, limit })\n\n    diff.moveIns.forEach((moveIn) => handleMoveIn(moveIn, result))\n    diff.moveOuts.forEach((moveOut) => handleMoveOut(moveOut, result))\n\n    if (diff.changes) {\n      // There are changes to the topK\n      // it could be that moveIns and moveOuts are empty\n      // because the collection is lazy, so we will run the graph again to load the data\n      this.output.sendData(new MultiSet(result))\n    }\n  }\n\n  run(): void {\n    const result: Array<[[K, IndexedValue<T>], number]> = []\n    for (const message of this.inputMessages()) {\n      for (const [item, multiplicity] of message.getInner()) {\n        const [key, value] = item\n        this.processElement(key, value, multiplicity, result)\n      }\n    }\n\n    if (result.length > 0) {\n      this.output.sendData(new MultiSet(result))\n    }\n  }\n\n  processElement(\n    key: K,\n    value: T,\n    multiplicity: number,\n    result: Array<[[K, IndexedValue<T>], number]>,\n  ): void {\n    const changes = this.#state.processElement(key, value, multiplicity)\n    handleMoveIn(changes.moveIn, result)\n    handleMoveOut(changes.moveOut, result)\n  }\n}\n\n/**\n * Limits the number of results based on a comparator, with optional offset.\n * Uses fractional indexing to minimize the number of changes when elements move positions.\n * Each element is assigned a fractional index that is lexicographically sortable.\n * When elements move, only the indices of the moved elements are updated, not all elements.\n *\n * @param comparator - A function that compares two elements\n * @param options - An optional object containing limit and offset properties\n * @returns A piped operator that orders the elements and limits the number of results\n */\nexport function topKWithFractionalIndex<KType extends string | number, T>(\n  comparator: (a: T, b: T) => number,\n  options?: TopKWithFractionalIndexOptions,\n): PipedOperator<[KType, T], [KType, IndexedValue<T>]> {\n  const opts = options || {}\n\n  return (\n    stream: IStreamBuilder<[KType, T]>,\n  ): IStreamBuilder<[KType, IndexedValue<T>]> => {\n    const output = new StreamBuilder<[KType, IndexedValue<T>]>(\n      stream.graph,\n      new DifferenceStreamWriter<[KType, IndexedValue<T>]>(),\n    )\n    const operator = new TopKWithFractionalIndexOperator<KType, T>(\n      stream.graph.getNextOperatorId(),\n      stream.connectReader(),\n      output.writer,\n      comparator,\n      opts,\n    )\n    stream.graph.addOperator(operator)\n    return output\n  }\n}\n"],"names":["UnaryOperator","createKeyedComparator","TopKState","TopKArray","handleMoveIn","handleMoveOut","MultiSet","StreamBuilder","DifferenceStreamWriter"],"mappings":";;;;;;;AAuBO,MAAM,wCAGHA,MAAAA,cAA4C;AAAA,EACpD;AAAA,EAEA,YACE,IACA,QACA,QACA,YACA,SACA;AACA,UAAM,IAAI,QAAQ,MAAM;AACxB,UAAM,QAAQ,QAAQ,SAAS;AAC/B,UAAM,SAAS,QAAQ,UAAU;AACjC,UAAM,OAAO,KAAK;AAAA,MAChB;AAAA,MACA;AAAA,MACAC,UAAAA,sBAAsB,UAAU;AAAA,IAAA;AAElC,SAAK,SAAS,IAAIC,UAAAA,UAAU,IAAI;AAChC,YAAQ,kBAAkB,MAAM,KAAK,OAAO,IAAI;AAChD,YAAQ,cAAc,KAAK,SAAS,KAAK,IAAI,CAAC;AAAA,EAChD;AAAA,EAEU,WACR,QACA,OACA,YACc;AACd,WAAO,IAAIC,UAAAA,UAAU,QAAQ,OAAO,UAAU;AAAA,EAChD;AAAA;AAAA;AAAA;AAAA;AAAA,EAMA,SAAS,EAAE,QAAQ,SAA8C;AAC/D,UAAM,SAAgD,CAAA;AACtD,UAAM,OAAO,KAAK,OAAO,KAAK,EAAE,QAAQ,OAAO;AAE/C,SAAK,QAAQ,QAAQ,CAAC,WAAWC,uBAAa,QAAQ,MAAM,CAAC;AAC7D,SAAK,SAAS,QAAQ,CAAC,YAAYC,wBAAc,SAAS,MAAM,CAAC;AAEjE,QAAI,KAAK,SAAS;AAIhB,WAAK,OAAO,SAAS,IAAIC,SAAAA,SAAS,MAAM,CAAC;AAAA,IAC3C;AAAA,EACF;AAAA,EAEA,MAAY;AACV,UAAM,SAAgD,CAAA;AACtD,eAAW,WAAW,KAAK,iBAAiB;AAC1C,iBAAW,CAAC,MAAM,YAAY,KAAK,QAAQ,YAAY;AACrD,cAAM,CAAC,KAAK,KAAK,IAAI;AACrB,aAAK,eAAe,KAAK,OAAO,cAAc,MAAM;AAAA,MACtD;AAAA,IACF;AAEA,QAAI,OAAO,SAAS,GAAG;AACrB,WAAK,OAAO,SAAS,IAAIA,SAAAA,SAAS,MAAM,CAAC;AAAA,IAC3C;AAAA,EACF;AAAA,EAEA,eACE,KACA,OACA,cACA,QACM;AACN,UAAM,UAAU,KAAK,OAAO,eAAe,KAAK,OAAO,YAAY;AACnEF,2BAAa,QAAQ,QAAQ,MAAM;AACnCC,4BAAc,QAAQ,SAAS,MAAM;AAAA,EACvC;AACF;AAYO,SAAS,wBACd,YACA,SACqD;AACrD,QAAM,OAAO,WAAW,CAAA;AAExB,SAAO,CACL,WAC6C;AAC7C,UAAM,SAAS,IAAIE,GAAAA;AAAAA,MACjB,OAAO;AAAA,MACP,IAAIC,MAAAA,uBAAA;AAAA,IAAiD;AAEvD,UAAM,WAAW,IAAI;AAAA,MACnB,OAAO,MAAM,kBAAA;AAAA,MACb,OAAO,cAAA;AAAA,MACP,OAAO;AAAA,MACP;AAAA,MACA;AAAA,IAAA;AAEF,WAAO,MAAM,YAAY,QAAQ;AACjC,WAAO;AAAA,EACT;AACF;;;"}