import { DifferenceStreamWriter, UnaryOperator } from '../graph.js' import { StreamBuilder } from '../d2.js' import type { IStreamBuilder, PipedOperator } from '../types.js' import type { DifferenceStreamReader } from '../graph.js' import type { MultiSet } from '../multiset.js' /** * Operator that outputs the messages in the stream */ export class OutputOperator extends UnaryOperator { #fn: (data: MultiSet) => void constructor( id: number, inputA: DifferenceStreamReader, outputWriter: DifferenceStreamWriter, fn: (data: MultiSet) => void, ) { super(id, inputA, outputWriter) this.#fn = fn } run(): void { for (const message of this.inputMessages()) { this.#fn(message) this.output.sendData(message) } } } /** * Outputs the messages in the stream * @param fn - The function to call with each message */ export function output( fn: (data: MultiSet) => void, ): PipedOperator { return (stream: IStreamBuilder): IStreamBuilder => { const outputStream = new StreamBuilder( stream.graph, new DifferenceStreamWriter(), ) const operator = new OutputOperator( stream.graph.getNextOperatorId(), stream.connectReader(), outputStream.writer, fn, ) stream.graph.addOperator(operator) return outputStream } }