{"version":3,"file":"join.cjs","sources":["../../../src/operators/join.ts"],"sourcesContent":["/**\n * # Direct Join Algorithms for Incremental View Maintenance\n *\n * High-performance join operations implementing all join types (inner, left, right, full, anti)\n * with minimal state and optimized performance.\n *\n * ## Algorithm\n *\n * For each tick, the algorithm processes incoming changes (deltas) and emits join results:\n *\n * 1. **Build deltas**: Create delta indexes from input messages using `Index.fromMultiSet()`\n * 2. **Inner results**: Emit `ΔA⋈B_old + A_old⋈ΔB + ΔA⋈ΔB` (matched pairs)\n * 3. **Outer results**: For unmatched rows, emit null-extended tuples:\n *    - New unmatched rows from deltas (when opposite side empty)\n *    - Presence transitions: when key goes `0→>0` (retract nulls) or `>0→0` (emit nulls)\n * 4. **Update state**: Append deltas to indexes (consolidated multiplicity tracking automatic)\n *\n * **Consolidated multiplicity tracking** enables O(1) presence checks instead of scanning index buckets.\n *\n * ## State\n *\n * **Indexes** store the actual data:\n * - `indexA: Index<K, V1>` - all left-side rows accumulated over time\n * - `indexB: Index<K, V2>` - all right-side rows accumulated over time\n *\n * **Consolidated multiplicity tracking** (built into Index):\n * - Each Index maintains sum of multiplicities per key internally\n * - Provides O(1) presence checks: `index.hasPresence(key)` and `index.getConsolidatedMultiplicity(key)`\n * - Avoids scanning entire index buckets just to check if key has any rows\n *\n * ## Join Types\n *\n * - **Inner**: Standard delta terms only\n * - **Outer**: Inner results + null-extended unmatched rows with transition handling\n * - **Anti**: Unmatched rows only (no inner results)\n *\n * ## Key Optimizations\n *\n * - **No temp copying**: Uses `(A⊎ΔA)⋈ΔB = A⋈ΔB ⊎ ΔA⋈ΔB` distributive property\n * - **Early-out checks**: Skip phases when no deltas present\n * - **Zero-entry pruning**: Keep maps compact, O(distinct keys) memory\n * - **Final presence logic**: Avoid emit→retract churn within same tick\n *\n * ## Correctness\n *\n * - **Ordering**: Pre-append snapshots for emissions, post-emit state updates\n * - **Presence**: Key matched iff mass ≠ 0, transitions trigger null handling\n * - **Bag semantics**: Proper multiplicity handling including negatives\n */\n\nimport { BinaryOperator, DifferenceStreamWriter } from '../graph.js'\nimport { StreamBuilder } from '../d2.js'\nimport { MultiSet } from '../multiset.js'\nimport { Index } from '../indexes.js'\nimport type { DifferenceStreamReader } from '../graph.js'\nimport type { IStreamBuilder, KeyValue, PipedOperator } from '../types.js'\n\n/**\n * Type of join to perform\n */\nexport type JoinType = `inner` | `left` | `right` | `full` | `anti`\n\n/**\n * Operator that joins two input streams using direct join algorithms\n */\nexport class JoinOperator<K, V1, V2> extends BinaryOperator<\n  [K, V1] | [K, V2] | [K, [V1, V2]] | [K, [V1 | null, V2 | null]]\n> {\n  #indexA = new Index<K, V1>()\n  #indexB = new Index<K, V2>()\n  #mode: JoinType\n\n  constructor(\n    id: number,\n    inputA: DifferenceStreamReader<[K, V1]>,\n    inputB: DifferenceStreamReader<[K, V2]>,\n    output: DifferenceStreamWriter<any>,\n    mode: JoinType = `inner`,\n  ) {\n    super(id, inputA, inputB, output)\n    this.#mode = mode\n  }\n\n  run(): void {\n    // Build deltas from input messages\n    const deltaA = Index.fromMultiSets<K, V1>(\n      this.inputAMessages() as Array<MultiSet<[K, V1]>>,\n    )\n    const deltaB = Index.fromMultiSets<K, V2>(\n      this.inputBMessages() as Array<MultiSet<[K, V2]>>,\n    )\n\n    // Early-out if nothing changed\n    if (deltaA.size === 0 && deltaB.size === 0) return\n\n    const results = new MultiSet<any>()\n\n    // Emit inner results (all modes except anti)\n    if (this.#mode !== `anti`) {\n      this.emitInnerResults(deltaA, deltaB, results)\n    }\n\n    // Emit left outer/anti results\n    if (\n      this.#mode === `left` ||\n      this.#mode === `full` ||\n      this.#mode === `anti`\n    ) {\n      this.emitLeftOuterResults(deltaA, deltaB, results)\n    }\n\n    // Emit right outer results\n    if (this.#mode === `right` || this.#mode === `full`) {\n      this.emitRightOuterResults(deltaA, deltaB, results)\n    }\n\n    // Update state and send results\n    // IMPORTANT: All emissions use pre-append snapshots of indexA/indexB.\n    // Now append ALL deltas to indices - this happens unconditionally for every key,\n    // regardless of whether presence flipped. Consolidated multiplicity tracking is automatic.\n    this.#indexA.append(deltaA)\n    this.#indexB.append(deltaB)\n\n    // Send results\n    if (results.getInner().length > 0) {\n      this.output.sendData(results)\n    }\n  }\n\n  private emitInnerResults(\n    deltaA: Index<K, V1>,\n    deltaB: Index<K, V2>,\n    results: MultiSet<any>,\n  ): void {\n    // Emit the three standard delta terms: ΔA⋈B_old, A_old⋈ΔB, ΔA⋈ΔB\n    if (deltaA.size > 0) results.extend(deltaA.join(this.#indexB))\n    if (deltaB.size > 0) results.extend(this.#indexA.join(deltaB))\n    if (deltaA.size > 0 && deltaB.size > 0) results.extend(deltaA.join(deltaB))\n  }\n\n  private emitLeftOuterResults(\n    deltaA: Index<K, V1>,\n    deltaB: Index<K, V2>,\n    results: MultiSet<any>,\n  ): void {\n    // Emit unmatched left rows from deltaA\n    if (deltaA.size > 0) {\n      for (const [key, valueIterator] of deltaA.entriesIterators()) {\n        const currentMultiplicityB =\n          this.#indexB.getConsolidatedMultiplicity(key)\n        const deltaMultiplicityB = deltaB.getConsolidatedMultiplicity(key)\n        const finalMultiplicityB = currentMultiplicityB + deltaMultiplicityB\n\n        if (finalMultiplicityB === 0) {\n          for (const [value, multiplicity] of valueIterator) {\n            if (multiplicity !== 0) {\n              results.add([key, [value, null]], multiplicity)\n            }\n          }\n        }\n      }\n    }\n\n    // Handle presence transitions from right side changes\n    if (deltaB.size > 0) {\n      for (const key of deltaB.getPresenceKeys()) {\n        const before = this.#indexB.getConsolidatedMultiplicity(key)\n        const deltaMult = deltaB.getConsolidatedMultiplicity(key)\n        if (deltaMult === 0) continue\n        const after = before + deltaMult\n\n        // Skip transition handling if presence doesn't flip (both zero or both non-zero)\n        // Note: Index updates happen later regardless - we're only skipping null-extension emissions here\n        if ((before === 0) === (after === 0)) continue\n\n        // Determine the type of transition:\n        // - 0 → non-zero: Right becomes non-empty, left rows transition from unmatched to matched\n        //   → RETRACT previously emitted null-extended rows (emit with negative multiplicity)\n        // - non-zero → 0: Right becomes empty, left rows transition from matched to unmatched\n        //   → EMIT new null-extended rows (emit with positive multiplicity)\n        const transitioningToMatched = before === 0\n\n        for (const [value, multiplicity] of this.#indexA.getIterator(key)) {\n          if (multiplicity !== 0) {\n            results.add(\n              [key, [value, null]],\n              transitioningToMatched ? -multiplicity : +multiplicity,\n            )\n          }\n        }\n      }\n    }\n  }\n\n  private emitRightOuterResults(\n    deltaA: Index<K, V1>,\n    deltaB: Index<K, V2>,\n    results: MultiSet<any>,\n  ): void {\n    // Emit unmatched right rows from deltaB\n    if (deltaB.size > 0) {\n      for (const [key, valueIterator] of deltaB.entriesIterators()) {\n        const currentMultiplicityA =\n          this.#indexA.getConsolidatedMultiplicity(key)\n        const deltaMultiplicityA = deltaA.getConsolidatedMultiplicity(key)\n        const finalMultiplicityA = currentMultiplicityA + deltaMultiplicityA\n\n        if (finalMultiplicityA === 0) {\n          for (const [value, multiplicity] of valueIterator) {\n            if (multiplicity !== 0) {\n              results.add([key, [null, value]], multiplicity)\n            }\n          }\n        }\n      }\n    }\n\n    // Handle presence transitions from left side changes\n    if (deltaA.size > 0) {\n      for (const key of deltaA.getPresenceKeys()) {\n        const before = this.#indexA.getConsolidatedMultiplicity(key)\n        const deltaMult = deltaA.getConsolidatedMultiplicity(key)\n        if (deltaMult === 0) continue\n        const after = before + deltaMult\n\n        // Skip transition handling if presence doesn't flip (both zero or both non-zero)\n        // Note: Index updates happen later regardless - we're only skipping null-extension emissions here\n        if ((before === 0) === (after === 0)) continue\n\n        // Determine the type of transition:\n        // - 0 → non-zero: Left becomes non-empty, right rows transition from unmatched to matched\n        //   → RETRACT previously emitted null-extended rows (emit with negative multiplicity)\n        // - non-zero → 0: Left becomes empty, right rows transition from matched to unmatched\n        //   → EMIT new null-extended rows (emit with positive multiplicity)\n        const transitioningToMatched = before === 0\n\n        for (const [value, multiplicity] of this.#indexB.getIterator(key)) {\n          if (multiplicity !== 0) {\n            results.add(\n              [key, [null, value]],\n              transitioningToMatched ? -multiplicity : +multiplicity,\n            )\n          }\n        }\n      }\n    }\n  }\n}\n\n/**\n * Joins two input streams\n * @param other - The other stream to join with\n * @param type - The type of join to perform\n */\nexport function join<\n  K,\n  V1 extends T extends KeyValue<infer _KT, infer VT> ? VT : never,\n  V2,\n  T,\n>(\n  other: IStreamBuilder<KeyValue<K, V2>>,\n  type: JoinType = `inner`,\n): PipedOperator<T, KeyValue<K, [V1 | null, V2 | null]>> {\n  return (\n    stream: IStreamBuilder<T>,\n  ): IStreamBuilder<KeyValue<K, [V1 | null, V2 | null]>> => {\n    if (stream.graph !== other.graph) {\n      throw new Error(`Cannot join streams from different graphs`)\n    }\n    const output = new StreamBuilder<KeyValue<K, [V1 | null, V2 | null]>>(\n      stream.graph,\n      new DifferenceStreamWriter<KeyValue<K, [V1 | null, V2 | null]>>(),\n    )\n    const operator = new JoinOperator<K, V1, V2>(\n      stream.graph.getNextOperatorId(),\n      stream.connectReader() as DifferenceStreamReader<KeyValue<K, V1>>,\n      other.connectReader(),\n      output.writer,\n      type,\n    )\n    stream.graph.addOperator(operator)\n    return output\n  }\n}\n\n/**\n * Joins two input streams (inner join)\n * @param other - The other stream to join with\n */\nexport function innerJoin<\n  K,\n  V1 extends T extends KeyValue<infer _KT, infer VT> ? VT : never,\n  V2,\n  T,\n>(\n  other: IStreamBuilder<KeyValue<K, V2>>,\n): PipedOperator<T, KeyValue<K, [V1, V2]>> {\n  return join(other, `inner`) as unknown as PipedOperator<\n    T,\n    KeyValue<K, [V1, V2]>\n  >\n}\n\n/**\n * Joins two input streams (anti join)\n * @param other - The other stream to join with\n */\nexport function antiJoin<\n  K,\n  V1 extends T extends KeyValue<infer _KT, infer VT> ? VT : never,\n  V2,\n  T,\n>(\n  other: IStreamBuilder<KeyValue<K, V2>>,\n): PipedOperator<T, KeyValue<K, [V1, null]>> {\n  return join(other, `anti`) as unknown as PipedOperator<\n    T,\n    KeyValue<K, [V1, null]>\n  >\n}\n\n/**\n * Joins two input streams (left join)\n * @param other - The other stream to join with\n */\nexport function leftJoin<\n  K,\n  V1 extends T extends KeyValue<infer _KT, infer VT> ? VT : never,\n  V2,\n  T,\n>(\n  other: IStreamBuilder<KeyValue<K, V2>>,\n): PipedOperator<T, KeyValue<K, [V1, V2 | null]>> {\n  return join(other, `left`) as unknown as PipedOperator<\n    T,\n    KeyValue<K, [V1, V2 | null]>\n  >\n}\n\n/**\n * Joins two input streams (right join)\n * @param other - The other stream to join with\n */\nexport function rightJoin<\n  K,\n  V1 extends T extends KeyValue<infer _KT, infer VT> ? VT : never,\n  V2,\n  T,\n>(\n  other: IStreamBuilder<KeyValue<K, V2>>,\n): PipedOperator<T, KeyValue<K, [V1 | null, V2]>> {\n  return join(other, `right`) as unknown as PipedOperator<\n    T,\n    KeyValue<K, [V1 | null, V2]>\n  >\n}\n\n/**\n * Joins two input streams (full join)\n * @param other - The other stream to join with\n */\nexport function fullJoin<\n  K,\n  V1 extends T extends KeyValue<infer _KT, infer VT> ? VT : never,\n  V2,\n  T,\n>(\n  other: IStreamBuilder<KeyValue<K, V2>>,\n): PipedOperator<T, KeyValue<K, [V1 | null, V2 | null]>> {\n  return join(other, `full`) as unknown as PipedOperator<\n    T,\n    KeyValue<K, [V1 | null, V2 | null]>\n  >\n}\n"],"names":["BinaryOperator","Index","MultiSet","StreamBuilder","DifferenceStreamWriter"],"mappings":";;;;;;AAiEO,MAAM,qBAAgCA,MAAAA,eAE3C;AAAA,EACA,UAAU,IAAIC,QAAAA,MAAA;AAAA,EACd,UAAU,IAAIA,QAAAA,MAAA;AAAA,EACd;AAAA,EAEA,YACE,IACA,QACA,QACA,QACA,OAAiB,SACjB;AACA,UAAM,IAAI,QAAQ,QAAQ,MAAM;AAChC,SAAK,QAAQ;AAAA,EACf;AAAA,EAEA,MAAY;AAEV,UAAM,SAASA,QAAAA,MAAM;AAAA,MACnB,KAAK,eAAA;AAAA,IAAe;AAEtB,UAAM,SAASA,QAAAA,MAAM;AAAA,MACnB,KAAK,eAAA;AAAA,IAAe;AAItB,QAAI,OAAO,SAAS,KAAK,OAAO,SAAS,EAAG;AAE5C,UAAM,UAAU,IAAIC,kBAAA;AAGpB,QAAI,KAAK,UAAU,QAAQ;AACzB,WAAK,iBAAiB,QAAQ,QAAQ,OAAO;AAAA,IAC/C;AAGA,QACE,KAAK,UAAU,UACf,KAAK,UAAU,UACf,KAAK,UAAU,QACf;AACA,WAAK,qBAAqB,QAAQ,QAAQ,OAAO;AAAA,IACnD;AAGA,QAAI,KAAK,UAAU,WAAW,KAAK,UAAU,QAAQ;AACnD,WAAK,sBAAsB,QAAQ,QAAQ,OAAO;AAAA,IACpD;AAMA,SAAK,QAAQ,OAAO,MAAM;AAC1B,SAAK,QAAQ,OAAO,MAAM;AAG1B,QAAI,QAAQ,WAAW,SAAS,GAAG;AACjC,WAAK,OAAO,SAAS,OAAO;AAAA,IAC9B;AAAA,EACF;AAAA,EAEQ,iBACN,QACA,QACA,SACM;AAEN,QAAI,OAAO,OAAO,EAAG,SAAQ,OAAO,OAAO,KAAK,KAAK,OAAO,CAAC;AAC7D,QAAI,OAAO,OAAO,EAAG,SAAQ,OAAO,KAAK,QAAQ,KAAK,MAAM,CAAC;AAC7D,QAAI,OAAO,OAAO,KAAK,OAAO,OAAO,EAAG,SAAQ,OAAO,OAAO,KAAK,MAAM,CAAC;AAAA,EAC5E;AAAA,EAEQ,qBACN,QACA,QACA,SACM;AAEN,QAAI,OAAO,OAAO,GAAG;AACnB,iBAAW,CAAC,KAAK,aAAa,KAAK,OAAO,oBAAoB;AAC5D,cAAM,uBACJ,KAAK,QAAQ,4BAA4B,GAAG;AAC9C,cAAM,qBAAqB,OAAO,4BAA4B,GAAG;AACjE,cAAM,qBAAqB,uBAAuB;AAElD,YAAI,uBAAuB,GAAG;AAC5B,qBAAW,CAAC,OAAO,YAAY,KAAK,eAAe;AACjD,gBAAI,iBAAiB,GAAG;AACtB,sBAAQ,IAAI,CAAC,KAAK,CAAC,OAAO,IAAI,CAAC,GAAG,YAAY;AAAA,YAChD;AAAA,UACF;AAAA,QACF;AAAA,MACF;AAAA,IACF;AAGA,QAAI,OAAO,OAAO,GAAG;AACnB,iBAAW,OAAO,OAAO,mBAAmB;AAC1C,cAAM,SAAS,KAAK,QAAQ,4BAA4B,GAAG;AAC3D,cAAM,YAAY,OAAO,4BAA4B,GAAG;AACxD,YAAI,cAAc,EAAG;AACrB,cAAM,QAAQ,SAAS;AAIvB,YAAK,WAAW,OAAQ,UAAU,GAAI;AAOtC,cAAM,yBAAyB,WAAW;AAE1C,mBAAW,CAAC,OAAO,YAAY,KAAK,KAAK,QAAQ,YAAY,GAAG,GAAG;AACjE,cAAI,iBAAiB,GAAG;AACtB,oBAAQ;AAAA,cACN,CAAC,KAAK,CAAC,OAAO,IAAI,CAAC;AAAA,cACnB,yBAAyB,CAAC,eAAe,CAAC;AAAA,YAAA;AAAA,UAE9C;AAAA,QACF;AAAA,MACF;AAAA,IACF;AAAA,EACF;AAAA,EAEQ,sBACN,QACA,QACA,SACM;AAEN,QAAI,OAAO,OAAO,GAAG;AACnB,iBAAW,CAAC,KAAK,aAAa,KAAK,OAAO,oBAAoB;AAC5D,cAAM,uBACJ,KAAK,QAAQ,4BAA4B,GAAG;AAC9C,cAAM,qBAAqB,OAAO,4BAA4B,GAAG;AACjE,cAAM,qBAAqB,uBAAuB;AAElD,YAAI,uBAAuB,GAAG;AAC5B,qBAAW,CAAC,OAAO,YAAY,KAAK,eAAe;AACjD,gBAAI,iBAAiB,GAAG;AACtB,sBAAQ,IAAI,CAAC,KAAK,CAAC,MAAM,KAAK,CAAC,GAAG,YAAY;AAAA,YAChD;AAAA,UACF;AAAA,QACF;AAAA,MACF;AAAA,IACF;AAGA,QAAI,OAAO,OAAO,GAAG;AACnB,iBAAW,OAAO,OAAO,mBAAmB;AAC1C,cAAM,SAAS,KAAK,QAAQ,4BAA4B,GAAG;AAC3D,cAAM,YAAY,OAAO,4BAA4B,GAAG;AACxD,YAAI,cAAc,EAAG;AACrB,cAAM,QAAQ,SAAS;AAIvB,YAAK,WAAW,OAAQ,UAAU,GAAI;AAOtC,cAAM,yBAAyB,WAAW;AAE1C,mBAAW,CAAC,OAAO,YAAY,KAAK,KAAK,QAAQ,YAAY,GAAG,GAAG;AACjE,cAAI,iBAAiB,GAAG;AACtB,oBAAQ;AAAA,cACN,CAAC,KAAK,CAAC,MAAM,KAAK,CAAC;AAAA,cACnB,yBAAyB,CAAC,eAAe,CAAC;AAAA,YAAA;AAAA,UAE9C;AAAA,QACF;AAAA,MACF;AAAA,IACF;AAAA,EACF;AACF;AAOO,SAAS,KAMd,OACA,OAAiB,SACsC;AACvD,SAAO,CACL,WACwD;AACxD,QAAI,OAAO,UAAU,MAAM,OAAO;AAChC,YAAM,IAAI,MAAM,2CAA2C;AAAA,IAC7D;AACA,UAAM,SAAS,IAAIC,GAAAA;AAAAA,MACjB,OAAO;AAAA,MACP,IAAIC,MAAAA,uBAAA;AAAA,IAA4D;AAElE,UAAM,WAAW,IAAI;AAAA,MACnB,OAAO,MAAM,kBAAA;AAAA,MACb,OAAO,cAAA;AAAA,MACP,MAAM,cAAA;AAAA,MACN,OAAO;AAAA,MACP;AAAA,IAAA;AAEF,WAAO,MAAM,YAAY,QAAQ;AACjC,WAAO;AAAA,EACT;AACF;AAMO,SAAS,UAMd,OACyC;AACzC,SAAO,KAAK,OAAO,OAAO;AAI5B;AAMO,SAAS,SAMd,OAC2C;AAC3C,SAAO,KAAK,OAAO,MAAM;AAI3B;AAMO,SAAS,SAMd,OACgD;AAChD,SAAO,KAAK,OAAO,MAAM;AAI3B;AAMO,SAAS,UAMd,OACgD;AAChD,SAAO,KAAK,OAAO,OAAO;AAI5B;AAMO,SAAS,SAMd,OACuD;AACvD,SAAO,KAAK,OAAO,MAAM;AAI3B;;;;;;;;"}