import { Transform } from 'node:stream' import type { AbortableSignal } from '@naturalcycles/js-lib' import type { NonNegativeInteger } from '@naturalcycles/js-lib/types' import type { TransformOptions, TransformTyped } from '../stream.model.js' import { PIPELINE_GRACEFUL_ABORT } from '../stream.util.js' import { transformNoOp } from './transformNoOp.js' export interface TransformLimitOptions extends TransformOptions { /** * Nullish value (e.g 0 or undefined) would mean "no limit" */ limit?: NonNegativeInteger /** * Allows to abort (gracefully stop) the stream from inside the Transform. */ signal: AbortableSignal } export function transformLimit(opt: TransformLimitOptions): TransformTyped { const { limit, signal, objectMode = true, highWaterMark = 1 } = opt if (!limit) { return transformNoOp() } let i = 0 // so we start first chunk with 1 let ended = false return new Transform({ objectMode, highWaterMark, transform(chunk, _, cb) { if (ended) { return } i++ if (i === limit) { ended = true this.push(chunk) this.push(null) // tell downstream that we're done cb() queueMicrotask(() => { signal.abort(new Error(PIPELINE_GRACEFUL_ABORT)) }) return } cb(null, chunk) }, }) }