import { DifferenceStreamWriter } from '../graph.js' import { StreamBuilder } from '../d2.js' import { ReduceOperator } from './reduce.js' import type { DifferenceStreamReader } from '../graph.js' import type { IStreamBuilder, KeyValue } from '../types.js' /** * Operator that counts elements by key (version-free) */ export class CountOperator extends ReduceOperator { constructor( id: number, inputA: DifferenceStreamReader<[K, V]>, output: DifferenceStreamWriter<[K, number]>, ) { const countInner = (vals: Array<[V, number]>): Array<[number, number]> => { let totalCount = 0 for (const [_, diff] of vals) { totalCount += diff } return [[totalCount, 1]] } super(id, inputA, output, countInner) } } /** * Counts the number of elements by key (version-free) */ export function count< KType extends T extends KeyValue ? K : never, VType extends T extends KeyValue ? V : never, T, >() { return ( stream: IStreamBuilder, ): IStreamBuilder> => { const output = new StreamBuilder>( stream.graph, new DifferenceStreamWriter>(), ) const operator = new CountOperator( stream.graph.getNextOperatorId(), stream.connectReader() as DifferenceStreamReader>, output.writer, ) stream.graph.addOperator(operator) return output } }