import { AsyncMapper, ErrorMode } from '@naturalcycles/js-lib' import * as fs from 'fs' import { createUnzip } from 'zlib' import { requireFileToExist, transformJsonParse, transformLogProgress, TransformLogProgressOptions, transformMap, TransformMapOptions, transformSplit, writableVoid, _pipeline, } from '../..' export interface NDJSONStreamForEachOptions extends TransformMapOptions, TransformLogProgressOptions { inputFilePath: string } /** * Convenience function to `forEach` through an ndjson file. */ export async function ndjsonStreamForEach( mapper: AsyncMapper, opt: NDJSONStreamForEachOptions, ): Promise { requireFileToExist(opt.inputFilePath) const transformUnzip = opt.inputFilePath.endsWith('.gz') ? [createUnzip()] : [] await _pipeline([ fs.createReadStream(opt.inputFilePath), ...transformUnzip, transformSplit(), transformJsonParse(), transformMap(mapper, { errorMode: ErrorMode.THROW_AGGREGATED, ...opt, predicate: () => true, // to log progress properly }), transformLogProgress(opt), writableVoid(), ]) }