{"version":3,"file":"groupedTopKWithFractionalIndex.cjs","sources":["../../../src/operators/groupedTopKWithFractionalIndex.ts"],"sourcesContent":["import { DifferenceStreamWriter, UnaryOperator } from '../graph.js'\nimport { StreamBuilder } from '../d2.js'\nimport { MultiSet } from '../multiset.js'\nimport { TopKState, handleMoveIn, handleMoveOut } from './topKState.js'\nimport { TopKArray, createKeyedComparator } from './topKArray.js'\nimport type { IndexedValue, TopK } from './topKArray.js'\nimport type { DifferenceStreamReader } from '../graph.js'\nimport type { IStreamBuilder, PipedOperator } from '../types.js'\n\nexport interface GroupedTopKWithFractionalIndexOptions<K, T> {\n  limit?: number\n  offset?: number\n  setSizeCallback?: (getSize: () => number) => void\n  setWindowFn?: (\n    windowFn: (options: { offset?: number; limit?: number }) => void,\n  ) => void\n  /**\n   * Function to extract a group key from the element's key and value.\n   * Elements with the same group key will be sorted and limited together.\n   */\n  groupKeyFn: (key: K, value: T) => unknown\n}\n\n/**\n * Operator for grouped fractional indexed topK operations.\n * This operator maintains separate topK windows for each group,\n * allowing per-group limits and ordering.\n *\n * The input is a keyed stream [K, T] and outputs [K, IndexedValue<T>].\n * Elements are grouped by the groupKeyFn, and each group maintains\n * its own sorted collection with independent limit/offset.\n */\nexport class GroupedTopKWithFractionalIndexOperator<\n  K extends string | number,\n  T,\n> extends UnaryOperator<[K, T], [K, IndexedValue<T>]> {\n  #groupStates: Map<unknown, TopKState<K, T>> = new Map()\n  #groupKeyFn: (key: K, value: T) => unknown\n  #comparator: (a: [K, T], b: [K, T]) => number\n  #offset: number\n  #limit: number\n\n  constructor(\n    id: number,\n    inputA: DifferenceStreamReader<[K, T]>,\n    output: DifferenceStreamWriter<[K, IndexedValue<T>]>,\n    comparator: (a: T, b: T) => number,\n    options: GroupedTopKWithFractionalIndexOptions<K, T>,\n  ) {\n    super(id, inputA, output)\n    this.#groupKeyFn = options.groupKeyFn\n    this.#limit = options.limit ?? Infinity\n    this.#offset = options.offset ?? 0\n    this.#comparator = createKeyedComparator(comparator)\n    options.setSizeCallback?.(() => this.#getTotalSize())\n    options.setWindowFn?.(this.#moveTopK.bind(this))\n  }\n\n  /**\n   * Creates a new TopK data structure for a group.\n   * Can be overridden in subclasses to use different implementations (e.g., B+ tree).\n   */\n  protected createTopK(\n    offset: number,\n    limit: number,\n    comparator: (a: [K, T], b: [K, T]) => number,\n  ): TopK<[K, T]> {\n    return new TopKArray(offset, limit, comparator)\n  }\n\n  #getTotalSize(): number {\n    let size = 0\n    for (const state of this.#groupStates.values()) {\n      size += state.size\n    }\n    return size\n  }\n\n  #getOrCreateGroupState(groupKey: unknown): TopKState<K, T> {\n    let state = this.#groupStates.get(groupKey)\n    if (!state) {\n      const topK = this.createTopK(this.#offset, this.#limit, this.#comparator)\n      state = new TopKState(topK)\n      this.#groupStates.set(groupKey, state)\n    }\n    return state\n  }\n\n  #cleanupGroupIfEmpty(groupKey: unknown, state: TopKState<K, T>): void {\n    if (state.isEmpty) {\n      this.#groupStates.delete(groupKey)\n    }\n  }\n\n  /**\n   * Moves the topK window for all groups based on the provided offset and limit.\n   * Any changes to the topK are sent to the output.\n   */\n  #moveTopK({ offset, limit }: { offset?: number; limit?: number }): void {\n    if (offset !== undefined) {\n      this.#offset = offset\n    }\n    if (limit !== undefined) {\n      this.#limit = limit\n    }\n\n    const result: Array<[[K, IndexedValue<T>], number]> = []\n    let hasChanges = false\n\n    for (const state of this.#groupStates.values()) {\n      const diff = state.move({ offset: this.#offset, limit: this.#limit }) // TODO: think we should just pass offset and limit\n\n      diff.moveIns.forEach((moveIn) => handleMoveIn(moveIn, result))\n      diff.moveOuts.forEach((moveOut) => handleMoveOut(moveOut, result))\n\n      if (diff.changes) {\n        hasChanges = true\n      }\n    }\n\n    if (hasChanges) {\n      this.output.sendData(new MultiSet(result))\n    }\n  }\n\n  run(): void {\n    const result: Array<[[K, IndexedValue<T>], number]> = []\n    for (const message of this.inputMessages()) {\n      for (const [item, multiplicity] of message.getInner()) {\n        const [key, value] = item\n        this.#processElement(key, value, multiplicity, result)\n      }\n    }\n\n    if (result.length > 0) {\n      this.output.sendData(new MultiSet(result))\n    }\n  }\n\n  #processElement(\n    key: K,\n    value: T,\n    multiplicity: number,\n    result: Array<[[K, IndexedValue<T>], number]>,\n  ): void {\n    const groupKey = this.#groupKeyFn(key, value)\n    const state = this.#getOrCreateGroupState(groupKey)\n\n    const changes = state.processElement(key, value, multiplicity)\n    handleMoveIn(changes.moveIn, result)\n    handleMoveOut(changes.moveOut, result)\n\n    // Cleanup empty groups to prevent memory leaks\n    this.#cleanupGroupIfEmpty(groupKey, state)\n  }\n}\n\n/**\n * Limits the number of results per group based on a comparator, with optional offset.\n * Uses fractional indexing to minimize the number of changes when elements move positions.\n * Each element is assigned a fractional index that is lexicographically sortable.\n * When elements move, only the indices of the moved elements are updated, not all elements.\n *\n * This operator groups elements by the provided groupKeyFn and applies the limit/offset\n * independently to each group.\n *\n * @param comparator - A function that compares two elements for ordering\n * @param options - Configuration including groupKeyFn, limit, and offset\n * @returns A piped operator that orders elements per group and limits results per group\n */\nexport function groupedTopKWithFractionalIndex<K extends string | number, T>(\n  comparator: (a: T, b: T) => number,\n  options: GroupedTopKWithFractionalIndexOptions<K, T>,\n): PipedOperator<[K, T], [K, IndexedValue<T>]> {\n  return (\n    stream: IStreamBuilder<[K, T]>,\n  ): IStreamBuilder<[K, IndexedValue<T>]> => {\n    const output = new StreamBuilder<[K, IndexedValue<T>]>(\n      stream.graph,\n      new DifferenceStreamWriter<[K, IndexedValue<T>]>(),\n    )\n    const operator = new GroupedTopKWithFractionalIndexOperator<K, T>(\n      stream.graph.getNextOperatorId(),\n      stream.connectReader(),\n      output.writer,\n      comparator,\n      options,\n    )\n    stream.graph.addOperator(operator)\n    return output\n  }\n}\n"],"names":["UnaryOperator","createKeyedComparator","TopKArray","TopKState","handleMoveIn","handleMoveOut","MultiSet","StreamBuilder","DifferenceStreamWriter"],"mappings":";;;;;;;AAgCO,MAAM,+CAGHA,MAAAA,cAA4C;AAAA,EACpD,mCAAkD,IAAA;AAAA,EAClD;AAAA,EACA;AAAA,EACA;AAAA,EACA;AAAA,EAEA,YACE,IACA,QACA,QACA,YACA,SACA;AACA,UAAM,IAAI,QAAQ,MAAM;AACxB,SAAK,cAAc,QAAQ;AAC3B,SAAK,SAAS,QAAQ,SAAS;AAC/B,SAAK,UAAU,QAAQ,UAAU;AACjC,SAAK,cAAcC,UAAAA,sBAAsB,UAAU;AACnD,YAAQ,kBAAkB,MAAM,KAAK,cAAA,CAAe;AACpD,YAAQ,cAAc,KAAK,UAAU,KAAK,IAAI,CAAC;AAAA,EACjD;AAAA;AAAA;AAAA;AAAA;AAAA,EAMU,WACR,QACA,OACA,YACc;AACd,WAAO,IAAIC,UAAAA,UAAU,QAAQ,OAAO,UAAU;AAAA,EAChD;AAAA,EAEA,gBAAwB;AACtB,QAAI,OAAO;AACX,eAAW,SAAS,KAAK,aAAa,OAAA,GAAU;AAC9C,cAAQ,MAAM;AAAA,IAChB;AACA,WAAO;AAAA,EACT;AAAA,EAEA,uBAAuB,UAAoC;AACzD,QAAI,QAAQ,KAAK,aAAa,IAAI,QAAQ;AAC1C,QAAI,CAAC,OAAO;AACV,YAAM,OAAO,KAAK,WAAW,KAAK,SAAS,KAAK,QAAQ,KAAK,WAAW;AACxE,cAAQ,IAAIC,UAAAA,UAAU,IAAI;AAC1B,WAAK,aAAa,IAAI,UAAU,KAAK;AAAA,IACvC;AACA,WAAO;AAAA,EACT;AAAA,EAEA,qBAAqB,UAAmB,OAA8B;AACpE,QAAI,MAAM,SAAS;AACjB,WAAK,aAAa,OAAO,QAAQ;AAAA,IACnC;AAAA,EACF;AAAA;AAAA;AAAA;AAAA;AAAA,EAMA,UAAU,EAAE,QAAQ,SAAoD;AACtE,QAAI,WAAW,QAAW;AACxB,WAAK,UAAU;AAAA,IACjB;AACA,QAAI,UAAU,QAAW;AACvB,WAAK,SAAS;AAAA,IAChB;AAEA,UAAM,SAAgD,CAAA;AACtD,QAAI,aAAa;AAEjB,eAAW,SAAS,KAAK,aAAa,OAAA,GAAU;AAC9C,YAAM,OAAO,MAAM,KAAK,EAAE,QAAQ,KAAK,SAAS,OAAO,KAAK,QAAQ;AAEpE,WAAK,QAAQ,QAAQ,CAAC,WAAWC,uBAAa,QAAQ,MAAM,CAAC;AAC7D,WAAK,SAAS,QAAQ,CAAC,YAAYC,wBAAc,SAAS,MAAM,CAAC;AAEjE,UAAI,KAAK,SAAS;AAChB,qBAAa;AAAA,MACf;AAAA,IACF;AAEA,QAAI,YAAY;AACd,WAAK,OAAO,SAAS,IAAIC,SAAAA,SAAS,MAAM,CAAC;AAAA,IAC3C;AAAA,EACF;AAAA,EAEA,MAAY;AACV,UAAM,SAAgD,CAAA;AACtD,eAAW,WAAW,KAAK,iBAAiB;AAC1C,iBAAW,CAAC,MAAM,YAAY,KAAK,QAAQ,YAAY;AACrD,cAAM,CAAC,KAAK,KAAK,IAAI;AACrB,aAAK,gBAAgB,KAAK,OAAO,cAAc,MAAM;AAAA,MACvD;AAAA,IACF;AAEA,QAAI,OAAO,SAAS,GAAG;AACrB,WAAK,OAAO,SAAS,IAAIA,SAAAA,SAAS,MAAM,CAAC;AAAA,IAC3C;AAAA,EACF;AAAA,EAEA,gBACE,KACA,OACA,cACA,QACM;AACN,UAAM,WAAW,KAAK,YAAY,KAAK,KAAK;AAC5C,UAAM,QAAQ,KAAK,uBAAuB,QAAQ;AAElD,UAAM,UAAU,MAAM,eAAe,KAAK,OAAO,YAAY;AAC7DF,2BAAa,QAAQ,QAAQ,MAAM;AACnCC,4BAAc,QAAQ,SAAS,MAAM;AAGrC,SAAK,qBAAqB,UAAU,KAAK;AAAA,EAC3C;AACF;AAeO,SAAS,+BACd,YACA,SAC6C;AAC7C,SAAO,CACL,WACyC;AACzC,UAAM,SAAS,IAAIE,GAAAA;AAAAA,MACjB,OAAO;AAAA,MACP,IAAIC,MAAAA,uBAAA;AAAA,IAA6C;AAEnD,UAAM,WAAW,IAAI;AAAA,MACnB,OAAO,MAAM,kBAAA;AAAA,MACb,OAAO,cAAA;AAAA,MACP,OAAO;AAAA,MACP;AAAA,MACA;AAAA,IAAA;AAEF,WAAO,MAAM,YAAY,QAAQ;AACjC,WAAO;AAAA,EACT;AACF;;;"}