import { Readable } from 'node:stream' import type { Transform } from 'node:stream' import { pipeline } from 'node:stream/promises' import type { ReadableStream as WebReadableStream } from 'node:stream/web' import { createGzip, createUnzip, createZstdCompress, createZstdDecompress } from 'node:zlib' import type { ZlibOptions, ZstdOptions } from 'node:zlib' import { createAbortableSignal } from '@naturalcycles/js-lib' import { _passthroughPredicate } from '@naturalcycles/js-lib/types' import type { AbortableAsyncMapper, AsyncIndexedMapper, AsyncPredicate, END, IndexedMapper, Integer, NonNegativeInteger, PositiveInteger, Predicate, SKIP, } from '@naturalcycles/js-lib/types' import { fs2 } from '../fs/fs2.js' import { zip2 } from '../zip/zip2.js' import { createReadStreamAsNDJson } from './ndjson/createReadStreamAsNDJson.js' import { transformJsonParse } from './ndjson/transformJsonParse.js' import { transformToNDJson } from './ndjson/transformToNDJson.js' import { createReadableFromAsync } from './readable/createReadable.js' import type { ReadableTyped, TransformOptions, TransformTyped, WritableTyped, } from './stream.model.js' import { PIPELINE_GRACEFUL_ABORT } from './stream.util.js' import { transformChunk } from './transform/transformChunk.js' import { transformFilterSync } from './transform/transformFilter.js' import { transformFlatten, transformFlattenIfNeeded } from './transform/transformFlatten.js' // oxlint-disable-next-line import/no-cycle -- intentional cycle import { transformFork } from './transform/transformFork.js' import { transformLimit } from './transform/transformLimit.js' import { transformLogProgress } from './transform/transformLogProgress.js' import type { TransformLogProgressOptions } from './transform/transformLogProgress.js' import { transformMap } from './transform/transformMap.js' import type { TransformMapOptions } from './transform/transformMap.js' import { transformMapSimple } from './transform/transformMapSimple.js' import type { TransformMapSimpleOptions } from './transform/transformMapSimple.js' import { transformMapSync } from './transform/transformMapSync.js' import type { TransformMapSyncOptions } from './transform/transformMapSync.js' import { transformOffset } from './transform/transformOffset.js' import type { TransformOffsetOptions } from './transform/transformOffset.js' import { transformSplitOnNewline } from './transform/transformSplit.js' import { transformTap, transformTapSync } from './transform/transformTap.js' import { transformThrottle } from './transform/transformThrottle.js' import type { TransformThrottleOptions } from './transform/transformThrottle.js' import { transformThrottleByRSS } from './transform/transformThrottleByRSS.js' import type { TransformThrottleByRSSOptions } from './transform/transformThrottleByRSS.js' import { transformWarmup } from './transform/transformWarmup.js' import type { TransformWarmupOptions } from './transform/transformWarmup.js' import { writablePushToArray } from './writable/writablePushToArray.js' import { writableVoid } from './writable/writableVoid.js' export class Pipeline { private readonly source: Readable private transforms: NodeJS.ReadWriteStream[] = [] private destination?: NodeJS.WritableStream private readableLimit?: Integer private objectMode: boolean private abortableSignal = createAbortableSignal() private constructor(source: ReadableTyped, objectMode = true) { this.source = source this.objectMode = objectMode } static from(source: ReadableTyped): Pipeline { return new Pipeline(source) } /** * Useful in cases when Readable is not immediately available, * but only available after an async operation is completed. * Implemented via a proxy Transform, which should be transparent. */ static fromAsyncReadable(fn: () => Promise>): Pipeline { return new Pipeline(createReadableFromAsync(fn)) } static fromWeb(webReadableStream: WebReadableStream): Pipeline { return new Pipeline(Readable.fromWeb(webReadableStream)) } /** * Technically same as `fromIterable` (since Array is Iterable), * but named a bit friendlier. */ static fromArray(input: T[]): Pipeline { return new Pipeline(Readable.from(input)) } static fromIterable(input: Iterable | AsyncIterable): Pipeline { return new Pipeline(Readable.from(input)) } static fromNDJsonFile(sourceFilePath: string): Pipeline { // Important that createReadStreamAsNDJson function is used // (and not Pipeline set of individual transforms), // because createReadStreamAsNDJson returns a Readable, // hence it allows to apply .take(limit) on it // e.g like Pipeline.fromNDJsonFile().limitSource(limit) return new Pipeline(createReadStreamAsNDJson(sourceFilePath)) } static fromFile(sourceFilePath: string): Pipeline { return new Pipeline( fs2.createReadStream(sourceFilePath, { highWaterMark: 64 * 1024, // no observed speedup }), false, ) } /** * Limits the source Readable, but using `.take(limit)` on it. * This is THE preferred way of limiting the source. */ limitSource(limit: NonNegativeInteger | undefined): this { this.readableLimit = limit return this } /** * If possible - STRONGLY PREFER applying `.take(limit)` on the source Readable, * as it's a clean graceful way of limiting the Readable. Example: * * Pipeline.from(myReadable.take(10)) * * or * * Pipeline * .from(myReadable) * .limitSource(10) * * If applying `take` on Readable is not possible - use this method at your own risk. * Why warning? * The limit works by aborting the stream, and then catching the error - certainly * less clean than `.take()` on the source. */ limit(limit: NonNegativeInteger | undefined): this { if (!this.transforms.length) { console.warn( `Pipeline.limit was used as a very first Transfrom - please use Pipeline.limitSource instead`, ) this.limitSource(limit) return this } this.transforms.push( transformLimit({ limit, signal: this.abortableSignal, }), ) return this } chunk(chunkSize: PositiveInteger, opt?: TransformOptions): Pipeline { this.transforms.push(transformChunk(chunkSize, opt)) return this as any } flatten(this: Pipeline): Pipeline { this.transforms.push(transformFlatten()) return this as any } flattenIfNeeded(): Pipeline { this.transforms.push(transformFlattenIfNeeded()) return this as any } // TransformLogProgressOptions intentionally doesn't have passed, as it's inconvenient in many cases logProgress(opt?: TransformLogProgressOptions): this { this.transforms.push(transformLogProgress(opt)) return this } map( mapper: AbortableAsyncMapper, opt?: TransformMapOptions, ): Pipeline { this.transforms.push( transformMap(mapper, { ...opt, signal: this.abortableSignal, }), ) return this as any } mapSync( mapper: IndexedMapper, opt?: TransformMapSyncOptions, ): Pipeline { this.transforms.push( transformMapSync(mapper, { ...opt, signal: this.abortableSignal, }), ) return this as any } mapSimple(mapper: IndexedMapper, opt?: TransformMapSimpleOptions): Pipeline { this.transforms.push(transformMapSimple(mapper, opt)) return this as any } filter(asyncPredicate: AsyncPredicate, opt?: TransformMapOptions): this { this.transforms.push( transformMap(v => v, { asyncPredicate, ...opt, signal: this.abortableSignal, }), ) return this } filterSync(predicate: Predicate, opt?: TransformOptions): this { this.transforms.push(transformFilterSync(predicate, opt)) return this } offset(opt: TransformOffsetOptions): this { this.transforms.push(transformOffset(opt)) return this } tap(fn: AsyncIndexedMapper, opt?: TransformOptions): this { this.transforms.push(transformTap(fn, opt)) return this } tapSync(fn: IndexedMapper, opt?: TransformOptions): this { this.transforms.push(transformTapSync(fn, opt)) return this } throttle(opt: TransformThrottleOptions): this { this.transforms.push(transformThrottle(opt)) return this } throttleByRSS(opt: TransformThrottleByRSSOptions): this { this.transforms.push(transformThrottleByRSS(opt)) return this } /** * @experimental to be removed after transformMap2 is stable */ warmup(opt: TransformWarmupOptions): this { this.transforms.push(transformWarmup(opt)) return this } transform(transform: TransformTyped): Pipeline { this.transforms.push(transform) return this as any } /** * Helper method to add multiple transforms at once. * Not type safe! Prefer using singular `transform()` multiple times for type safety. */ transformMany(transforms: Transform[]): Pipeline { this.transforms.push(...transforms) return this as any } fork(fn: (pipeline: Pipeline) => Promise, opt?: TransformOptions): this { this.transforms.push(transformFork(fn, opt)) return this } /** * Utility method just to conveniently type-cast the current Pipeline type. * No runtime effect. */ typeCastAs(): Pipeline { return this as any } setObjectMode(objectMode: boolean): this { this.objectMode = objectMode return this } /** * Transform the stream of Objects into a stream of JSON lines. * Technically, it goes into objectMode=false, so it's a binary stream at the end. */ toNDJson(): Pipeline { this.transforms.push(transformToNDJson()) this.objectMode = false return this as any } parseNDJson(this: Pipeline): Pipeline { // It was said that transformJsonParse() separately is 10% or more slower than .map(line => JSON.parse(line)) // So, we can investigate a speedup this.transforms.push(transformSplitOnNewline(), transformJsonParse()) this.objectMode = true return this as any } splitOnNewline(this: Pipeline): Pipeline { // Input: objectMode=false - binary stream // Output: objectMode=true - stream of Buffer objects (which are also strings?) this.transforms.push(transformSplitOnNewline()) this.objectMode = true return this as any } parseJson( this: Pipeline | Pipeline | Pipeline, ): Pipeline { // Input: objectMode=false - takes a stream of strings one by one // Output: objectMode=true - stream of json-parsed Objects this.transforms.push(transformJsonParse()) this.objectMode = true return this as any } gzip(this: Pipeline, opt?: ZlibOptions): Pipeline { this.transforms.push( createGzip({ // chunkSize: 64 * 1024, // no observed speedup ...opt, }), ) this.objectMode = false return this as any } gunzip(this: Pipeline, opt?: ZlibOptions): Pipeline { this.transforms.push( createUnzip({ chunkSize: 64 * 1024, // speedup from ~3200 to 3800 rps! ...opt, }), ) this.objectMode = false return this as any } zstdCompress( this: Pipeline, level?: Integer, // defaults to 3 opt?: ZstdOptions, ): Pipeline { this.transforms.push(createZstdCompress(zip2.zstdLevelToOptions(level, opt))) this.objectMode = false return this as any } zstdDecompress(this: Pipeline, opt?: ZstdOptions): Pipeline { this.transforms.push( createZstdDecompress({ chunkSize: 64 * 1024, // todo: test it ...opt, }), ) this.objectMode = false return this as any } async toArray(opt?: TransformOptions): Promise { const arr: T[] = [] this.destination = writablePushToArray(arr, opt) await this.run() return arr } async toFile(outputFilePath: string): Promise { fs2.ensureFile(outputFilePath) this.destination = fs2.createWriteStream(outputFilePath) await this.run() } /** * level corresponds to zstd compression level (if filename ends with .zst), * or gzip compression level (if filename ends with .gz). * Default levels are: * gzip: 6 * zlib: 3 (optimized for throughput, not size, may be larger than gzip at its default level) */ async toNDJsonFile(outputFilePath: string, level?: Integer): Promise { fs2.ensureFile(outputFilePath) this.transforms.push(transformToNDJson()) if (outputFilePath.endsWith('.gz')) { this.transforms.push( createGzip({ level, // chunkSize: 64 * 1024, // no observed speedup }), ) } else if (outputFilePath.endsWith('.zst')) { this.transforms.push(createZstdCompress(zip2.zstdLevelToOptions(level))) } this.destination = fs2.createWriteStream(outputFilePath, { // highWaterMark: 64 * 1024, // no observed speedup }) await this.run() } async to(destination: WritableTyped): Promise { this.destination = destination await this.run() } async forEach( fn: AsyncIndexedMapper, opt: TransformMapOptions & TransformLogProgressOptions = {}, ): Promise { this.transforms.push( transformMap(fn, { predicate: opt.logEvery ? _passthroughPredicate : undefined, // for the logger to work ...opt, signal: this.abortableSignal, }), ) if (opt.logEvery) { this.transforms.push(transformLogProgress(opt)) } await this.run() } async forEachSync( fn: IndexedMapper, opt: TransformMapSyncOptions & TransformLogProgressOptions = {}, ): Promise { this.transforms.push( transformMapSync(fn, { predicate: opt.logEvery ? _passthroughPredicate : undefined, // for the logger to work ...opt, signal: this.abortableSignal, }), ) if (opt.logEvery) { this.transforms.push(transformLogProgress(opt)) } await this.run() } async run(): Promise { this.destination ||= writableVoid() let { source } = this if (this.readableLimit) { source = source.take(this.readableLimit) } try { await pipeline([source, ...this.transforms, this.destination], { signal: this.abortableSignal, }) } catch (err) { if (err instanceof Error && (err.cause as any)?.message === PIPELINE_GRACEFUL_ABORT) { console.log('pipeline gracefully aborted') // todo: this message may be removed later return } throw err } } }