import { Readable, Writable } from 'stream' import { WritableTyped } from '../stream.model' /** * Allows to stop the Readable stream after the pipeline has processed X number of rows. * It counts OUTPUT rows (not input), because this Writable is always at the end of the Pipeline. * It ensures that everything has been processed before issuing a STOP on the readable. */ export function writableLimit(readable: Readable, limit: number): WritableTyped { let i = 0 return new Writable({ objectMode: true, write(chunk, _, cb) { if (limit === 0) return cb() // no limit, just passthrough i++ if (i === limit) { console.log(`writableLimit of ${limit} reached`) readable.destroy() cb() // do we need it? } else { cb() // passthrough } }, }) }