import { BinaryOperator, DifferenceStreamWriter } from '../graph.js' import { StreamBuilder } from '../d2.js' import type { IStreamBuilder, PipedOperator } from '../types.js' /** * Operator that concatenates two input streams */ export class ConcatOperator extends BinaryOperator { run(): void { for (const message of this.inputAMessages()) { this.output.sendData(message) } for (const message of this.inputBMessages()) { this.output.sendData(message) } } } /** * Concatenates two input streams * @param other - The other stream to concatenate */ export function concat( other: IStreamBuilder, ): PipedOperator { return (stream: IStreamBuilder): IStreamBuilder => { if (stream.graph !== other.graph) { throw new Error(`Cannot concat streams from different graphs`) } const output = new StreamBuilder( stream.graph, new DifferenceStreamWriter(), ) const operator = new ConcatOperator( stream.graph.getNextOperatorId(), stream.connectReader(), other.connectReader(), output.writer, ) stream.graph.addOperator(operator) return output } }