import { DifferenceStreamWriter, UnaryOperator } from '../graph.js' import { StreamBuilder } from '../d2.js' import { MultiSet } from '../multiset.js' import type { IStreamBuilder, PipedOperator } from '../types.js' /** * Operator that consolidates collections */ export class ConsolidateOperator extends UnaryOperator { run(): void { const messages = this.inputMessages() if (messages.length === 0) { return } // Combine all messages into a single MultiSet const combined = new MultiSet() for (const message of messages) { combined.extend(message) } // Consolidate the combined MultiSet const consolidated = combined.consolidate() // Only send if there are results if (consolidated.getInner().length > 0) { this.output.sendData(consolidated) } } } /** * Consolidates the elements in the stream */ export function consolidate(): PipedOperator { return (stream: IStreamBuilder): IStreamBuilder => { const output = new StreamBuilder( stream.graph, new DifferenceStreamWriter(), ) const operator = new ConsolidateOperator( stream.graph.getNextOperatorId(), stream.connectReader(), output.writer, ) stream.graph.addOperator(operator) return output } }