# stream-chain > A library for chaining functions, generators, and streams into a single Duplex stream with proper backpressure handling. Zero runtime dependencies. Works with Node.js and Bun. Supports both CommonJS and ESM consumers. - Chain regular functions, async functions, generator functions, async generator functions, and existing streams - Proper backpressure handling via Node.js stream infrastructure - Object mode by default — ideal for data processing pipelines - Web stream support (ReadableStream, WritableStream, duplex {readable, writable}) - Function grouping optimization for efficient pipelines - Special return values for flow control: skip, stop, emit multiple, final value, flush at end - Built-in JSONL (line-separated JSON) parser and stringer - Utility functions for common stream operations: take, skip, fold, scan, batch, lines - TypeScript support with typed streams ## Quick start Install: ```bash npm i stream-chain ``` Create a pipeline (`example.mjs`): ```js import chain from 'stream-chain'; import {Readable} from 'node:stream'; const pipeline = chain([ x => x * x, x => x % 2 ? x : null, x => x + 1 ]); const source = Readable.from([1, 2, 3, 4, 5]); source.pipe(pipeline); pipeline.on('data', x => console.log(x)); // Output: 2, 10, 26 ``` Run: `node example.mjs` ## Importing ```js // ESM (default export) import chain from 'stream-chain'; // ESM (named exports) import {chain, none, stop, many, finalValue, flushable, gen, asStream} from 'stream-chain'; // CommonJS const chain = require('stream-chain'); const {none, stop, many} = require('stream-chain'); // Individual modules (ESM) import gen from 'stream-chain/gen.js'; import fun from 'stream-chain/fun.js'; import asStream from 'stream-chain/asStream.js'; import {none, stop, many, finalValue, flushable} from 'stream-chain/defs.js'; // Utilities import take from 'stream-chain/utils/take.js'; import takeWhile from 'stream-chain/utils/takeWhile.js'; import takeWithSkip from 'stream-chain/utils/takeWithSkip.js'; import skip from 'stream-chain/utils/skip.js'; import skipWhile from 'stream-chain/utils/skipWhile.js'; import fold from 'stream-chain/utils/fold.js'; import reduce from 'stream-chain/utils/reduce.js'; import reduceStream from 'stream-chain/utils/reduceStream.js'; import scan from 'stream-chain/utils/scan.js'; import batch from 'stream-chain/utils/batch.js'; import lines from 'stream-chain/utils/lines.js'; import fixUtf8Stream from 'stream-chain/utils/fixUtf8Stream.js'; import readableFrom from 'stream-chain/utils/readableFrom.js'; // JSONL import parser from 'stream-chain/jsonl/parser.js'; import parserStream from 'stream-chain/jsonl/parserStream.js'; import stringerStream from 'stream-chain/jsonl/stringerStream.js'; // TypeScript helpers import {TypedReadable, TypedWritable, TypedDuplex, TypedTransform} from 'stream-chain/typed-streams.js'; ``` ## chain() API `chain(fns, options?)` — creates a Duplex stream from an array of pipeline steps. Arguments: - `fns` (array) — items can be: - **Functions**: regular, async, generator, async generator. - **Streams**: Transform, Duplex. First item can be Readable. Last item can be Writable. - **Web streams**: ReadableStream, WritableStream, `{readable, writable}` pair. Adapted automatically. - **Arrays**: flattened recursively; elements included verbatim. - **Falsy values**: ignored (filtered out). - `options` (object, optional) — extends `DuplexOptions` with: - `noGrouping` (boolean, default: false) — if true, each function becomes a separate stream. If false, consecutive functions are grouped via `gen()` for efficiency. - `skipEvents` (boolean, default: false) — if true, error events from internal streams are not forwarded. - Default: `{writableObjectMode: true, readableObjectMode: true}`. Returns: `Duplex` stream with additional properties: - `streams` — array of all internal streams created by the chain. - `input` — the first stream (write to it or attach event handlers). - `output` — the last stream (read from it or attach event handlers). ### How functions are called Functions receive `(chunk, encoding)`. The return value determines what happens: - **`null`, `undefined`, or `none`** — no value passed downstream (filter). Note: `null`/`undefined` are treated as `none` in `asStream()`/`chain()` because Node.js streams reserve them for end-of-stream signaling. `gen()` and `fun()` pass `null`/`undefined` through like any other value. - **`stop`** — no value passed and generator terminates (gen/fun only). - **Regular value** — passed to next step. - **`many(values)`** — all values in the array emitted sequentially. - **`finalValue(value)`** — value emitted directly, remaining steps skipped (gen/fun only). - **Promise** — awaited; resolved value handled as above. - **Generator/iterator** — iterated; each yielded value handled as above. - **Thrown exception** — caught, emitted as stream error event. ### chainUnchecked(fns, options?) Same as `chain()` in JavaScript. In TypeScript, bypasses type checking on `fns`. Accepts optional `` type parameters. ```ts import {chainUnchecked} from 'stream-chain'; const pipeline = chainUnchecked([x => String(x * x)]); ``` ## Special values (defs.js) All special values can be imported from `'stream-chain'` or `'stream-chain/defs.js'`. ### none `Symbol.for('object-stream.none')` — return from a function to skip producing output for this input. ```js chain([x => x > 0 ? x : none]); // filter: pass only positive values ``` ### stop `Symbol.for('object-stream.stop')` — return to skip and terminate the generator. Works only within `gen()` or `fun()` segments. ```js chain([ function* () { for (let i = 0; ; ++i) yield i; }, n => n > 100 ? stop : n ]); // produces 0..100 ``` ### many(values) Wraps an array to emit multiple values from a single input. ```js chain([x => many([x - 1, x, x + 1])]); // 5 → 4, 5, 6 ``` Helper functions: - `isMany(value)` — check if value is a Many wrapper. - `getManyValues(value)` — extract the array from a Many wrapper. - `toMany(value)` — convert any value to Many: `none` → `many([])`, `x` → `many([x])`, `many(arr)` → `many(arr)`. - `normalizeMany(value)` — `many([])` → `none`, `many([x])` → `x`, `many([...])` → `many([...])`. - `combineMany(...args)` — merge any number of values (any of none/value/many) into a single Many. Returns new Many. - `combineManyMut(a, ...args)` — like `combineMany` but may mutate `a` (the first argument) for performance. ### finalValue(value) Wraps a value to skip all remaining functions in the current gen/fun segment. The value is emitted directly. Does not work in native streams (treated as regular value). ```js chain([[ x => x * x, x => finalValue(x), // skip the next step x => x + 1 // never called ]]); ``` Helper functions: - `isFinalValue(value)` — check if value is a final wrapper. - `getFinalValue(value)` — extract the value. ### flushable(fn, final?) Marks a function to be called when the stream ends. When called at end-of-stream, `fn` receives `none` as its argument. Alternatively, provide a separate `final` function that is called with no arguments at end-of-stream. ```js let sum = 0; chain([ flushable(x => { if (x === none) return sum; // emit accumulated value at end sum += x; return none; }) ]); // input: 1, 2, 3 → output: 6 ``` Equivalent with separate final function: ```js let sum = 0; chain([ flushable( x => { sum += x; return none; }, () => sum ) ]); ``` Helper function: - `isFlushable(fn)` — check if function is marked as flushable. ### Function lists Used internally for optimization. When `chain()` encounters a function tagged with a function list, it can inline the underlying functions for better performance. - `setFunctionList(fn, fns)` — tag a function with its underlying function array. - `isFunctionList(fn)` — check if function has a function list. - `getFunctionList(fn)` — extract the function array. - `clearFunctionList(fn)` — remove the tag to prevent inlining. ```js import {gen, clearFunctionList} from 'stream-chain'; const inlined = gen(x => x + 1, x => x * x); // will be inlined by chain() const opaque = clearFunctionList(gen(x => x + 1)); // will NOT be inlined ``` ### Stop (exception class) Can be thrown instead of returning `stop`: ```js import {Stop} from 'stream-chain/defs.js'; chain([n => { if (n > 100) throw new Stop(); return n; }]); ``` ## gen(...fns) Creates an async generator pipeline from functions. Each input value is passed through all functions sequentially. Supports all special return values. - Functions can be regular, async, generator, or async generator. - Arrays in the argument list are flattened. Falsy values are ignored. - If any function is flushable, the result is also flushable. - The result is tagged with a function list for chain() optimization. - Unlike `asStream()`/`chain()`, `gen()` passes `null`/`undefined` through the pipeline like any other value. Use `none` for consistent skip behavior. ```js import gen from 'stream-chain/gen.js'; const pipeline = gen( function* (n) { for (let i = 0; i < n; ++i) yield i; }, x => x * x ); for await (const v of pipeline(3)) { console.log(v); // 0, 1, 4 } ``` ## fun(...fns) Like `gen()` but returns a function instead of a generator. Values from generators are collected into `many()` arrays. For purely synchronous pipelines it returns a synchronous result; for asynchronous pipelines it returns a `Promise`. Like `gen()`, passes `null`/`undefined` through the pipeline (unlike `asStream()`/`chain()`). ```js import fun from 'stream-chain/fun.js'; import {getManyValues} from 'stream-chain/defs.js'; const f = fun( function* (n) { for (let i = 0; i < n; ++i) yield i; }, x => x * x ); const result = await f(3); console.log(getManyValues(result)); // [0, 1, 4] ``` ## asStream(fn, options?) Wraps any function as a Duplex stream. Supports regular, async, generator, and async generator functions. Handles all special return values. Treats `null`/`undefined` as `none` (skip) because Node.js streams reserve them for end-of-stream signaling — this differs from `gen()`/`fun()` which pass them through. - `fn` — any function. - `options` — Duplex options (default: `{writableObjectMode: true, readableObjectMode: true}`). ```js import asStream from 'stream-chain/asStream.js'; const stream = asStream(x => x * x); source.pipe(stream).pipe(destination); ``` ## dataSource(fn) Takes a function or iterable and returns the underlying iterator function. - Function → returns the function as-is. - Async iterable → returns `Symbol.asyncIterator` bound to the object. - Iterable → returns `Symbol.iterator` bound to the object. ```js import {dataSource} from 'stream-chain'; const iter = dataSource([1, 2, 3]); // returns [].values bound to array ``` ## Utilities All utilities return functions suitable for use in `chain()`. Import from `'stream-chain/utils/.js'`. ### take(n, finalValue?) Take `n` items from the stream. After `n` items, returns `finalValue` (default: `none`). Use `stop` as `finalValue` to terminate the stream early. ```js import take from 'stream-chain/utils/take.js'; import {stop} from 'stream-chain/defs.js'; chain([ function* () { for (let i = 0; ; ++i) yield i; }, take(5, stop) // take 5 items then stop ]); ``` ### takeWhile(fn, finalValue?) Take items while `fn(item)` returns truthy. `fn` can be async. Once `fn` returns falsy, returns `finalValue` for all subsequent items. ```js import takeWhile from 'stream-chain/utils/takeWhile.js'; chain([takeWhile(x => x < 100, stop)]); ``` ### takeWithSkip(n, skip?, finalValue?) Skip `skip` items (default: 0), then take `n` items. ```js import takeWithSkip from 'stream-chain/utils/takeWithSkip.js'; chain([takeWithSkip(5, 2, stop)]); // skip 2, take 5 ``` ### skip(n) Skip `n` items from the beginning, pass all remaining. ```js import skip from 'stream-chain/utils/skip.js'; chain([skip(5)]); // skip first 5 items ``` ### skipWhile(fn) Skip items while `fn(item)` returns truthy. `fn` can be async. Once `fn` returns falsy, pass all remaining items. ```js import skipWhile from 'stream-chain/utils/skipWhile.js'; chain([skipWhile(x => x.status !== 'ready')]); ``` ### fold(fn, initial) Reduce the entire stream to a single value emitted at end. `fn(accumulator, item)` returns new accumulator. `fn` can be async. ```js import fold from 'stream-chain/utils/fold.js'; chain([fold((acc, x) => acc + x, 0)]); // sum all values ``` ### reduce(fn, initial) Alias for `fold`. ### scan(fn, initial) Running accumulator — emits the current accumulator after each item. `fn(accumulator, item)` returns new accumulator. `fn` can be async. ```js import scan from 'stream-chain/utils/scan.js'; chain([scan((acc, x) => acc + x, 0)]); // input: 1,2,3 → output: 1,3,6 ``` ### reduceStream(fn, initial) / reduceStream(options) Creates a Writable stream that reduces values. The current accumulator is available as `.accumulator`. - `reduceStream(fn, initial)` — simple form. - `reduceStream({reducer, initial, ...writableOptions})` — options form. ```js import reduceStream from 'stream-chain/utils/reduceStream.js'; const r = reduceStream((acc, x) => acc + x, 0); chain([r]); // After stream ends: r.accumulator === sum ``` ### batch(size) Group items into fixed-size arrays. Last batch may be smaller. ```js import batch from 'stream-chain/utils/batch.js'; chain([batch(3)]); // input: 1,2,3,4,5 → output: [1,2,3], [4,5] ``` ### lines() Split a byte/string stream into lines. ```js import lines from 'stream-chain/utils/lines.js'; chain([lines()]); // input: "a\nb\nc" → output: "a", "b", "c" ``` ### fixUtf8Stream() Repartition byte chunks so multi-byte UTF-8 characters are not split across chunks. ```js import fixUtf8Stream from 'stream-chain/utils/fixUtf8Stream.js'; chain([fixUtf8Stream(), lines()]); ``` ### readableFrom(options) Convert an iterable/iterator to a Readable stream. - `options.iterable` — iterable or iterator to read from. - `options.objectMode` — default: `true`. ```js import readableFrom from 'stream-chain/utils/readableFrom.js'; chain([readableFrom({iterable: [1, 2, 3]}), x => x * 2]); ``` ## JSONL support ### parser(reviver?) Returns a `gen()` pipeline that parses JSONL: `fixUtf8Stream → lines → JSON.parse`. - `reviver` — optional JSON.parse reviver function, or `{reviver, ignoreErrors}` object. - `ignoreErrors` (boolean, default: false) — if true, silently skip lines that fail to parse. ```js import chain from 'stream-chain'; import parser from 'stream-chain/jsonl/parser.js'; import fs from 'node:fs'; chain([ fs.createReadStream('data.jsonl'), parser(), obj => console.log(obj) ]); ``` ### parserStream(options?) Wraps `parser()` with `asStream()`. Options extend Duplex options: - `reviver` — JSON.parse reviver function. - `ignoreErrors` (boolean) — silently skip parse errors. - Default: `{writableObjectMode: false, readableObjectMode: true}`. ```js import parserStream from 'stream-chain/jsonl/parserStream.js'; chain([fs.createReadStream('data.jsonl'), parserStream()]); ``` ### stringerStream(options?) Duplex stream that serializes objects to JSONL. Options extend Duplex options: - `replacer` — JSON.stringify replacer function. - `space` — JSON.stringify space argument. - `prefix` (string, default: '') — prepended to output. - `suffix` (string, default: '') — appended to output. - `separator` (string, default: '\n') — between items. - `emptyValue` (string, default: prefix + suffix) — output when no values. - Default: `{writableObjectMode: true, readableObjectMode: false}`. ```js import stringerStream from 'stream-chain/jsonl/stringerStream.js'; chain([objectSource, stringerStream(), fs.createWriteStream('out.jsonl')]); ``` Output as JSON array: ```js stringerStream({prefix: '[', suffix: ']', separator: ','}); // input: 1, 2, 3 → output: [1,2,3] ``` ## TypeScript Typed stream wrappers for precise type inference in chains: - `TypedReadable` — Readable with typed output. - `TypedWritable` — Writable with typed input. - `TypedDuplex` — Duplex with typed input and output. - `TypedTransform` — Transform with typed input and output. ```ts import chain from 'stream-chain'; import {TypedTransform} from 'stream-chain/typed-streams.js'; const transform = new TypedTransform({ objectMode: true, transform(x, _, cb) { cb(null, String(x + 1)); } }); const pipeline = chain([transform] as const); // ChainOutput ``` ## Common patterns ### Data processing pipeline ```js import chain from 'stream-chain'; import fs from 'node:fs'; import zlib from 'node:zlib'; const pipeline = chain([ x => x * x, x => chain.many([x - 1, x, x + 1]), async x => await lookupInDb(x), function* (x) { for (let i = x; i > 0; --i) yield i; }, x => x % 2 ? x : null, x => '' + x, zlib.createGzip() ]); dataSource.pipe(pipeline).pipe(fs.createWriteStream('output.gz')); ``` ### Filter + transform ```js chain([ x => x > 0 ? x : null, // filter: positive only x => x * x, // transform: square x => ({value: x}), // transform: wrap in object ]); ``` ### Using input/output properties ```js const pipeline = chain([x => x * x, x => String(x)]); pipeline.output.pipe(destination); source.pipe(pipeline.input); ``` ### Web streams integration ```js const readable = new ReadableStream({ start(controller) { controller.enqueue(1); controller.enqueue(2); controller.close(); } }); chain([readable, x => x * 2]); // 2, 4 ``` ### JSONL file processing ```js import chain from 'stream-chain'; import parser from 'stream-chain/jsonl/parser.js'; import stringerStream from 'stream-chain/jsonl/stringerStream.js'; import fs from 'node:fs'; chain([ fs.createReadStream('input.jsonl'), parser(), obj => ({...obj, processed: true}), stringerStream(), fs.createWriteStream('output.jsonl') ]); ``` ### Accumulate with flush ```js import {none, flushable} from 'stream-chain/defs.js'; const items = []; chain([ flushable(x => { if (x === none) return many(items.sort()); items.push(x); return none; }) ]); // collects all items, sorts, emits at end ``` ### Paginated API consumption ```js import chain from 'stream-chain'; import take from 'stream-chain/utils/take.js'; import {stop} from 'stream-chain/defs.js'; chain([ async function* () { let page = 1; while (true) { const data = await fetchPage(page++); if (!data.length) return; for (const item of data) yield item; } }, take(100, stop), item => processItem(item) ]); ``` ## Links - Docs: https://github.com/uhop/stream-chain/wiki - npm: https://www.npmjs.com/package/stream-chain - Repository: https://github.com/uhop/stream-chain