{"version":3,"file":"groupBy.cjs","sources":["../../../src/operators/groupBy.ts"],"sourcesContent":["import { serializeValue } from '../utils.js'\nimport { map } from './map.js'\nimport { reduce } from './reduce.js'\nimport type { IStreamBuilder, KeyValue } from '../types.js'\n\ntype GroupKey = Record<string, unknown>\n\ntype BasicAggregateFunction<T, R, V = unknown> = {\n  preMap: (data: T) => V\n  reduce: (values: Array<[V, number]>) => V\n  postMap?: (result: V) => R\n}\n\ntype PipedAggregateFunction<T, R> = {\n  pipe: (stream: IStreamBuilder<T>) => IStreamBuilder<KeyValue<string, R>>\n}\n\ntype AggregateFunction<T, R, V = unknown> =\n  | BasicAggregateFunction<T, R, V>\n  | PipedAggregateFunction<T, R>\n\ntype ExtractAggregateReturnType<T, A> =\n  A extends AggregateFunction<T, infer R, any> ? R : never\n\ntype AggregatesReturnType<T, A> = {\n  [K in keyof A]: ExtractAggregateReturnType<T, A[K]>\n}\n\nfunction isPipedAggregateFunction<T, R>(\n  aggregate: AggregateFunction<T, R>,\n): aggregate is PipedAggregateFunction<T, R> {\n  return `pipe` in aggregate\n}\n\n/**\n * Groups data by key and applies multiple aggregate operations\n * @param keyExtractor Function to extract grouping key from data\n * @param aggregates Object mapping aggregate names to aggregate functions\n */\nexport function groupBy<\n  T,\n  K extends GroupKey,\n  A extends Record<string, AggregateFunction<T, any, any>>,\n>(keyExtractor: (data: T) => K, aggregates: A = {} as A) {\n  type ResultType = K & AggregatesReturnType<T, A>\n\n  const basicAggregates = Object.fromEntries(\n    Object.entries(aggregates).filter(\n      ([_, aggregate]) => !isPipedAggregateFunction(aggregate),\n    ),\n  ) as Record<string, BasicAggregateFunction<T, any, any>>\n\n  // @ts-expect-error - TODO: we don't use this yet, but we will\n  // eslint-disable-next-line @typescript-eslint/no-unused-vars\n  const pipedAggregates = Object.fromEntries(\n    Object.entries(aggregates).filter(([_, aggregate]) =>\n      isPipedAggregateFunction(aggregate),\n    ),\n  ) as Record<string, PipedAggregateFunction<T, any>>\n\n  return (\n    stream: IStreamBuilder<T>,\n  ): IStreamBuilder<KeyValue<string, ResultType>> => {\n    // Special key to store the original key object\n    const KEY_SENTINEL = `__original_key__`\n\n    // First map to extract keys and pre-aggregate values\n    const withKeysAndValues = stream.pipe(\n      map((data) => {\n        const key = keyExtractor(data)\n        const keyString = serializeValue(key)\n\n        // Create values object with pre-aggregated values\n        const values: Record<string, unknown> = {}\n\n        // Store the original key object\n        values[KEY_SENTINEL] = key\n\n        // Add pre-aggregated values\n        for (const [name, aggregate] of Object.entries(basicAggregates)) {\n          values[name] = aggregate.preMap(data)\n        }\n\n        return [keyString, values] as KeyValue<string, Record<string, unknown>>\n      }),\n    )\n\n    // Then reduce to compute aggregates\n    const reduced = withKeysAndValues.pipe(\n      reduce((values) => {\n        // Calculate total multiplicity to check if the group should exist\n        let totalMultiplicity = 0\n        for (const [_, multiplicity] of values) {\n          totalMultiplicity += multiplicity\n        }\n\n        // If total multiplicity is 0 or negative, the group should be removed completely\n        if (totalMultiplicity <= 0) {\n          return []\n        }\n\n        const result: Record<string, unknown> = {}\n\n        // Get the original key from first value in group\n        const originalKey = values[0]?.[0]?.[KEY_SENTINEL]\n        result[KEY_SENTINEL] = originalKey\n\n        // Apply each aggregate function\n        for (const [name, aggregate] of Object.entries(basicAggregates)) {\n          const preValues = values.map(\n            ([v, m]) => [v[name], m] as [any, number],\n          )\n          result[name] = aggregate.reduce(preValues)\n        }\n\n        return [[result, 1]]\n      }),\n    )\n\n    // Finally map to extract the key and include all values\n    return reduced.pipe(\n      map(([keyString, values]) => {\n        // Extract the original key\n        const key = values[KEY_SENTINEL] as K\n\n        // Create intermediate result with key values and aggregate results\n        const result: Record<string, unknown> = {}\n\n        // Add key properties to result\n        Object.assign(result, key)\n\n        // Apply postMap if provided\n        for (const [name, aggregate] of Object.entries(basicAggregates)) {\n          if (aggregate.postMap) {\n            result[name] = aggregate.postMap(values[name])\n          } else {\n            result[name] = values[name]\n          }\n        }\n\n        // Return with the string key instead of the object\n        return [keyString, result] as KeyValue<string, ResultType>\n      }),\n    )\n  }\n}\n\n/**\n * Creates a sum aggregate function\n */\nexport function sum<T>(\n  valueExtractor: (value: T) => number = (v) => v as unknown as number,\n): AggregateFunction<T, number, number> {\n  return {\n    preMap: (data: T) => valueExtractor(data),\n    reduce: (values: Array<[number, number]>) => {\n      let total = 0\n      for (const [value, multiplicity] of values) {\n        total += value * multiplicity\n      }\n      return total\n    },\n  }\n}\n\n/**\n * Creates a count aggregate function\n */\nexport function count<T>(\n  valueExtractor: (value: T) => any = (v) => v,\n): AggregateFunction<T, number, number> {\n  return {\n    // Count only not-null values (the `== null` comparison gives true for both null and undefined)\n    preMap: (data: T) => (valueExtractor(data) == null ? 0 : 1),\n    reduce: (values: Array<[number, number]>) => {\n      let totalCount = 0\n      for (const [nullMultiplier, multiplicity] of values) {\n        totalCount += nullMultiplier * multiplicity\n      }\n      return totalCount\n    },\n  }\n}\n\n/**\n * Creates an average aggregate function\n */\nexport function avg<T>(\n  valueExtractor: (value: T) => number = (v) => v as unknown as number,\n): AggregateFunction<T, number, { sum: number; count: number }> {\n  return {\n    preMap: (data: T) => ({\n      sum: valueExtractor(data),\n      count: 0,\n    }),\n    reduce: (values: Array<[{ sum: number; count: number }, number]>) => {\n      let totalSum = 0\n      let totalCount = 0\n      for (const [value, multiplicity] of values) {\n        totalSum += value.sum * multiplicity\n        totalCount += multiplicity\n      }\n      return {\n        sum: totalSum,\n        count: totalCount,\n      }\n    },\n    postMap: (result: { sum: number; count: number }) => {\n      return result.sum / result.count\n    },\n  }\n}\n\ntype CanMinMax = number | Date | bigint | string\n\n/**\n * Creates a min aggregate function that computes the minimum value in a group\n * @param valueExtractor Function to extract a comparable value from each data entry\n */\nexport function min<T extends CanMinMax>(): AggregateFunction<\n  T,\n  T | undefined,\n  T | undefined\n>\nexport function min<T, V extends CanMinMax>(\n  valueExtractor: (value: T) => V,\n): AggregateFunction<T, V | undefined, V | undefined>\nexport function min<T, V extends CanMinMax>(\n  valueExtractor?: (value: T) => V,\n): AggregateFunction<T, V | undefined, V | undefined> {\n  const extractor = valueExtractor ?? ((v: T) => v as unknown as V)\n  return {\n    preMap: (data: T) => extractor(data),\n    reduce: (values) => {\n      let minValue: V | undefined\n      for (const [value, _multiplicity] of values) {\n        if (!minValue || (value && value < minValue)) {\n          minValue = value\n        }\n      }\n      return minValue\n    },\n  }\n}\n\n/**\n * Creates a max aggregate function that computes the maximum value in a group\n * @param valueExtractor Function to extract a comparable value from each data entry\n */\nexport function max<T extends CanMinMax>(): AggregateFunction<\n  T,\n  T | undefined,\n  T | undefined\n>\nexport function max<T, V extends CanMinMax>(\n  valueExtractor: (value: T) => V,\n): AggregateFunction<T, V | undefined, V | undefined>\nexport function max<T, V extends CanMinMax>(\n  valueExtractor?: (value: T) => V,\n): AggregateFunction<T, V | undefined, V | undefined> {\n  const extractor = valueExtractor ?? ((v: T) => v as unknown as V)\n  return {\n    preMap: (data: T) => extractor(data),\n    reduce: (values) => {\n      let maxValue: V | undefined\n      for (const [value, _multiplicity] of values) {\n        if (!maxValue || (value && value > maxValue)) {\n          maxValue = value\n        }\n      }\n      return maxValue\n    },\n  }\n}\n\n/**\n * Creates a median aggregate function that computes the middle value in a sorted group\n * If there's an even number of values, returns the average of the two middle values\n * @param valueExtractor Function to extract a numeric value from each data entry\n */\nexport function median<T>(\n  valueExtractor: (value: T) => number = (v) => v as unknown as number,\n): AggregateFunction<T, number, Array<number>> {\n  return {\n    preMap: (data: T) => [valueExtractor(data)],\n    reduce: (values: Array<[Array<number>, number]>) => {\n      // Flatten all values, taking multiplicity into account\n      const allValues: Array<number> = []\n      for (const [valueArray, multiplicity] of values) {\n        for (const value of valueArray) {\n          // Add each value multiple times based on multiplicity\n          for (let i = 0; i < multiplicity; i++) {\n            allValues.push(value)\n          }\n        }\n      }\n\n      // Return empty array if no values\n      if (allValues.length === 0) {\n        return []\n      }\n\n      // Sort values\n      allValues.sort((a, b) => a - b)\n\n      return allValues\n    },\n    postMap: (result: Array<number>) => {\n      if (result.length === 0) return 0\n\n      const mid = Math.floor(result.length / 2)\n\n      // If even number of values, average the two middle values\n      if (result.length % 2 === 0) {\n        return (result[mid - 1]! + result[mid]!) / 2\n      }\n\n      // If odd number of values, return the middle value\n      return result[mid]!\n    },\n  }\n}\n\n/**\n * Creates a mode aggregate function that computes the most frequent value in a group\n * If multiple values have the same highest frequency, returns the first one encountered\n * @param valueExtractor Function to extract a value from each data entry\n */\nexport function mode<T>(\n  valueExtractor: (value: T) => number = (v) => v as unknown as number,\n): AggregateFunction<T, number, Map<number, number>> {\n  return {\n    preMap: (data: T) => {\n      const value = valueExtractor(data)\n      const frequencyMap = new Map<number, number>()\n      frequencyMap.set(value, 1)\n      return frequencyMap\n    },\n    reduce: (values: Array<[Map<number, number>, number]>) => {\n      // Combine all frequency maps\n      const combinedMap = new Map<number, number>()\n\n      for (const [frequencyMap, multiplicity] of values) {\n        for (const [value, frequencyCount] of frequencyMap.entries()) {\n          const currentCount = combinedMap.get(value) || 0\n          combinedMap.set(value, currentCount + frequencyCount * multiplicity)\n        }\n      }\n\n      return combinedMap\n    },\n    postMap: (result: Map<number, number>) => {\n      if (result.size === 0) return 0\n\n      let modeValue = 0\n      let maxFrequency = 0\n\n      for (const [value, frequency] of result.entries()) {\n        if (frequency > maxFrequency) {\n          maxFrequency = frequency\n          modeValue = value\n        }\n      }\n\n      return modeValue\n    },\n  }\n}\n\nexport const groupByOperators = {\n  sum,\n  count,\n  avg,\n  min,\n  max,\n  median,\n  mode,\n}\n"],"names":["map","serializeValue","reduce"],"mappings":";;;;;AA4BA,SAAS,yBACP,WAC2C;AAC3C,SAAO,UAAU;AACnB;AAOO,SAAS,QAId,cAA8B,aAAgB,IAAS;AAGvD,QAAM,kBAAkB,OAAO;AAAA,IAC7B,OAAO,QAAQ,UAAU,EAAE;AAAA,MACzB,CAAC,CAAC,GAAG,SAAS,MAAM,CAAC,yBAAyB,SAAS;AAAA,IAAA;AAAA,EACzD;AAKsB,SAAO;AAAA,IAC7B,OAAO,QAAQ,UAAU,EAAE;AAAA,MAAO,CAAC,CAAC,GAAG,SAAS,MAC9C,yBAAyB,SAAS;AAAA,IAAA;AAAA,EACpC;AAGF,SAAO,CACL,WACiD;AAEjD,UAAM,eAAe;AAGrB,UAAM,oBAAoB,OAAO;AAAA,MAC/BA,IAAAA,IAAI,CAAC,SAAS;AACZ,cAAM,MAAM,aAAa,IAAI;AAC7B,cAAM,YAAYC,MAAAA,eAAe,GAAG;AAGpC,cAAM,SAAkC,CAAA;AAGxC,eAAO,YAAY,IAAI;AAGvB,mBAAW,CAAC,MAAM,SAAS,KAAK,OAAO,QAAQ,eAAe,GAAG;AAC/D,iBAAO,IAAI,IAAI,UAAU,OAAO,IAAI;AAAA,QACtC;AAEA,eAAO,CAAC,WAAW,MAAM;AAAA,MAC3B,CAAC;AAAA,IAAA;AAIH,UAAM,UAAU,kBAAkB;AAAA,MAChCC,OAAAA,OAAO,CAAC,WAAW;AAEjB,YAAI,oBAAoB;AACxB,mBAAW,CAAC,GAAG,YAAY,KAAK,QAAQ;AACtC,+BAAqB;AAAA,QACvB;AAGA,YAAI,qBAAqB,GAAG;AAC1B,iBAAO,CAAA;AAAA,QACT;AAEA,cAAM,SAAkC,CAAA;AAGxC,cAAM,cAAc,OAAO,CAAC,IAAI,CAAC,IAAI,YAAY;AACjD,eAAO,YAAY,IAAI;AAGvB,mBAAW,CAAC,MAAM,SAAS,KAAK,OAAO,QAAQ,eAAe,GAAG;AAC/D,gBAAM,YAAY,OAAO;AAAA,YACvB,CAAC,CAAC,GAAG,CAAC,MAAM,CAAC,EAAE,IAAI,GAAG,CAAC;AAAA,UAAA;AAEzB,iBAAO,IAAI,IAAI,UAAU,OAAO,SAAS;AAAA,QAC3C;AAEA,eAAO,CAAC,CAAC,QAAQ,CAAC,CAAC;AAAA,MACrB,CAAC;AAAA,IAAA;AAIH,WAAO,QAAQ;AAAA,MACbF,IAAAA,IAAI,CAAC,CAAC,WAAW,MAAM,MAAM;AAE3B,cAAM,MAAM,OAAO,YAAY;AAG/B,cAAM,SAAkC,CAAA;AAGxC,eAAO,OAAO,QAAQ,GAAG;AAGzB,mBAAW,CAAC,MAAM,SAAS,KAAK,OAAO,QAAQ,eAAe,GAAG;AAC/D,cAAI,UAAU,SAAS;AACrB,mBAAO,IAAI,IAAI,UAAU,QAAQ,OAAO,IAAI,CAAC;AAAA,UAC/C,OAAO;AACL,mBAAO,IAAI,IAAI,OAAO,IAAI;AAAA,UAC5B;AAAA,QACF;AAGA,eAAO,CAAC,WAAW,MAAM;AAAA,MAC3B,CAAC;AAAA,IAAA;AAAA,EAEL;AACF;AAKO,SAAS,IACd,iBAAuC,CAAC,MAAM,GACR;AACtC,SAAO;AAAA,IACL,QAAQ,CAAC,SAAY,eAAe,IAAI;AAAA,IACxC,QAAQ,CAAC,WAAoC;AAC3C,UAAI,QAAQ;AACZ,iBAAW,CAAC,OAAO,YAAY,KAAK,QAAQ;AAC1C,iBAAS,QAAQ;AAAA,MACnB;AACA,aAAO;AAAA,IACT;AAAA,EAAA;AAEJ;AAKO,SAAS,MACd,iBAAoC,CAAC,MAAM,GACL;AACtC,SAAO;AAAA;AAAA,IAEL,QAAQ,CAAC,SAAa,eAAe,IAAI,KAAK,OAAO,IAAI;AAAA,IACzD,QAAQ,CAAC,WAAoC;AAC3C,UAAI,aAAa;AACjB,iBAAW,CAAC,gBAAgB,YAAY,KAAK,QAAQ;AACnD,sBAAc,iBAAiB;AAAA,MACjC;AACA,aAAO;AAAA,IACT;AAAA,EAAA;AAEJ;AAKO,SAAS,IACd,iBAAuC,CAAC,MAAM,GACgB;AAC9D,SAAO;AAAA,IACL,QAAQ,CAAC,UAAa;AAAA,MACpB,KAAK,eAAe,IAAI;AAAA,MACxB,OAAO;AAAA,IAAA;AAAA,IAET,QAAQ,CAAC,WAA4D;AACnE,UAAI,WAAW;AACf,UAAI,aAAa;AACjB,iBAAW,CAAC,OAAO,YAAY,KAAK,QAAQ;AAC1C,oBAAY,MAAM,MAAM;AACxB,sBAAc;AAAA,MAChB;AACA,aAAO;AAAA,QACL,KAAK;AAAA,QACL,OAAO;AAAA,MAAA;AAAA,IAEX;AAAA,IACA,SAAS,CAAC,WAA2C;AACnD,aAAO,OAAO,MAAM,OAAO;AAAA,IAC7B;AAAA,EAAA;AAEJ;AAgBO,SAAS,IACd,gBACoD;AACpD,QAAM,YAAY,mBAAmB,CAAC,MAAS;AAC/C,SAAO;AAAA,IACL,QAAQ,CAAC,SAAY,UAAU,IAAI;AAAA,IACnC,QAAQ,CAAC,WAAW;AAClB,UAAI;AACJ,iBAAW,CAAC,OAAO,aAAa,KAAK,QAAQ;AAC3C,YAAI,CAAC,YAAa,SAAS,QAAQ,UAAW;AAC5C,qBAAW;AAAA,QACb;AAAA,MACF;AACA,aAAO;AAAA,IACT;AAAA,EAAA;AAEJ;AAcO,SAAS,IACd,gBACoD;AACpD,QAAM,YAAY,mBAAmB,CAAC,MAAS;AAC/C,SAAO;AAAA,IACL,QAAQ,CAAC,SAAY,UAAU,IAAI;AAAA,IACnC,QAAQ,CAAC,WAAW;AAClB,UAAI;AACJ,iBAAW,CAAC,OAAO,aAAa,KAAK,QAAQ;AAC3C,YAAI,CAAC,YAAa,SAAS,QAAQ,UAAW;AAC5C,qBAAW;AAAA,QACb;AAAA,MACF;AACA,aAAO;AAAA,IACT;AAAA,EAAA;AAEJ;AAOO,SAAS,OACd,iBAAuC,CAAC,MAAM,GACD;AAC7C,SAAO;AAAA,IACL,QAAQ,CAAC,SAAY,CAAC,eAAe,IAAI,CAAC;AAAA,IAC1C,QAAQ,CAAC,WAA2C;AAElD,YAAM,YAA2B,CAAA;AACjC,iBAAW,CAAC,YAAY,YAAY,KAAK,QAAQ;AAC/C,mBAAW,SAAS,YAAY;AAE9B,mBAAS,IAAI,GAAG,IAAI,cAAc,KAAK;AACrC,sBAAU,KAAK,KAAK;AAAA,UACtB;AAAA,QACF;AAAA,MACF;AAGA,UAAI,UAAU,WAAW,GAAG;AAC1B,eAAO,CAAA;AAAA,MACT;AAGA,gBAAU,KAAK,CAAC,GAAG,MAAM,IAAI,CAAC;AAE9B,aAAO;AAAA,IACT;AAAA,IACA,SAAS,CAAC,WAA0B;AAClC,UAAI,OAAO,WAAW,EAAG,QAAO;AAEhC,YAAM,MAAM,KAAK,MAAM,OAAO,SAAS,CAAC;AAGxC,UAAI,OAAO,SAAS,MAAM,GAAG;AAC3B,gBAAQ,OAAO,MAAM,CAAC,IAAK,OAAO,GAAG,KAAM;AAAA,MAC7C;AAGA,aAAO,OAAO,GAAG;AAAA,IACnB;AAAA,EAAA;AAEJ;AAOO,SAAS,KACd,iBAAuC,CAAC,MAAM,GACK;AACnD,SAAO;AAAA,IACL,QAAQ,CAAC,SAAY;AACnB,YAAM,QAAQ,eAAe,IAAI;AACjC,YAAM,mCAAmB,IAAA;AACzB,mBAAa,IAAI,OAAO,CAAC;AACzB,aAAO;AAAA,IACT;AAAA,IACA,QAAQ,CAAC,WAAiD;AAExD,YAAM,kCAAkB,IAAA;AAExB,iBAAW,CAAC,cAAc,YAAY,KAAK,QAAQ;AACjD,mBAAW,CAAC,OAAO,cAAc,KAAK,aAAa,WAAW;AAC5D,gBAAM,eAAe,YAAY,IAAI,KAAK,KAAK;AAC/C,sBAAY,IAAI,OAAO,eAAe,iBAAiB,YAAY;AAAA,QACrE;AAAA,MACF;AAEA,aAAO;AAAA,IACT;AAAA,IACA,SAAS,CAAC,WAAgC;AACxC,UAAI,OAAO,SAAS,EAAG,QAAO;AAE9B,UAAI,YAAY;AAChB,UAAI,eAAe;AAEnB,iBAAW,CAAC,OAAO,SAAS,KAAK,OAAO,WAAW;AACjD,YAAI,YAAY,cAAc;AAC5B,yBAAe;AACf,sBAAY;AAAA,QACd;AAAA,MACF;AAEA,aAAO;AAAA,IACT;AAAA,EAAA;AAEJ;AAEO,MAAM,mBAAmB;AAAA,EAC9B;AAAA,EACA;AAAA,EACA;AAAA,EACA;AAAA,EACA;AAAA,EACA;AAAA,EACA;AACF;;;;;;;;;;"}