import { Writable } from 'stream' import { readableCreate, ReadableTyped, _pipeline } from '../..' import { TransformOptions, WritableTyped } from '../stream.model' /** * Allows "forking" a stream inside pipeline into a number of pipeline chains (2 or more). * Currently does NOT (!) maintain backpressure. * Error in the forked pipeline will propagate up to the main pipeline (and log error, to be sure). * Will wait until all forked pipelines are completed before completing the stream. */ export function writableFork( chains: NodeJS.WritableStream[][], opt?: TransformOptions, ): WritableTyped { const readables: ReadableTyped[] = [] const allChainsDone = Promise.all( chains.map(async chain => { const readable = readableCreate() readables.push(readable) return await _pipeline([readable, ...chain]) }), ).catch(err => { console.error(err) // ensure the error is logged throw err }) return new Writable({ objectMode: true, ...opt, write(chunk: T, _encoding, cb) { // Push/fork to all sub-streams // No backpressure is ensured here, it'll push regardless of the readables.forEach(readable => readable.push(chunk)) cb() }, async final(cb) { try { // Push null (complete) to all sub-streams readables.forEach(readable => readable.push(null)) console.log(`writableFork.final is waiting for all chains to be done`) await allChainsDone console.log(`writableFork.final all chains done`) cb() } catch (err) { cb(err) } }, }) }