# stream-chain > Chain functions, generators, and streams into a single Duplex stream with proper backpressure handling. Zero dependencies. ## Install npm i stream-chain ## Quick start ```js import chain from 'stream-chain'; const pipeline = chain([ x => x * x, x => x % 2 ? x : null, async x => await process(x) ]); dataSource.pipe(pipeline).pipe(destination); ``` ## API ### chain(fns[, options]) Creates a Duplex stream from an array of functions, streams, or arrays (flattened). - `fns` (array) — functions, streams, or nested arrays. Falsy values are ignored. - `options` (object, optional) — Duplex options plus: - `noGrouping` (boolean) — disable function grouping optimization (default: false). - `skipEvents` (boolean) — disable error event forwarding (default: false). - Default: `{writableObjectMode: true, readableObjectMode: true}`. - Returns: `Duplex` stream with `.streams`, `.input`, `.output` properties. Supported function types: regular, async, generator, async generator. ### chainUnchecked(fns[, options]) Same as `chain()` but bypasses TypeScript type checking on the `fns` parameter. ```js import {chainUnchecked} from 'stream-chain'; const pipeline = chainUnchecked([x => x * x]); ``` ### Special return values - `none` — skip, produce no value (same as returning `null`/`undefined`). - `stop` — skip and terminate the generator pipeline. - `many(values)` — emit multiple values from a single input. - `finalValue(value)` — skip remaining chain steps, emit value directly (gen/fun only). - `flushable(fn, final?)` — mark function to be called at stream end. ```js import chain from 'stream-chain'; import {none, stop, many, finalValue, flushable} from 'stream-chain/defs.js'; chain([ x => x % 2 ? x : none, x => many([x, x * 10]), ]); ``` ### gen(...fns) Creates an async generator pipeline from functions. Used internally by `chain()` for grouping. ```js import gen from 'stream-chain/gen.js'; const g = gen(x => x + 1, x => x * x); for await (const v of g(2)) console.log(v); // 9 ``` ### fun(...fns) Like `gen()` but returns an async function. Generator results are collected into `many()`. ```js import fun from 'stream-chain/fun.js'; const f = fun(x => x + 1, x => x * x); console.log(await f(2)); // 9 ``` ### asStream(fn[, options]) Wraps any function as a Duplex stream. ```js import asStream from 'stream-chain/asStream.js'; const stream = asStream(x => x * x); ``` ### dataSource(fn) Takes a function or iterable and returns the underlying iterator function. ```js import {dataSource} from 'stream-chain'; const iter = dataSource([1, 2, 3]); ``` ## Utilities All utilities return functions for use in `chain()`. ### Slicing - `take(n, finalValue?)` — take N items then stop. - `takeWhile(fn, finalValue?)` — take while predicate is true. - `takeWithSkip(n, skip?, finalValue?)` — skip then take. - `skip(n)` — skip N items. - `skipWhile(fn)` — skip while predicate is true. ### Folding - `fold(fn, initial)` — reduce stream to single value at end. - `reduce(fn, initial)` — alias for fold. - `scan(fn, initial)` — emit running accumulator after each item. - `reduceStream(fn, initial)` — reduce as Writable stream with `.accumulator`. ### Stream helpers - `batch(size)` — group items into arrays of `size`. - `lines()` — split byte stream into lines. - `fixUtf8Stream()` — repartition chunks for valid UTF-8. - `readableFrom({iterable})` — convert iterable to Readable stream. ```js import take from 'stream-chain/utils/take.js'; import fold from 'stream-chain/utils/fold.js'; import batch from 'stream-chain/utils/batch.js'; chain([ take(10, stop), batch(3), fold((acc, x) => acc + x.length, 0) ]); ``` ## JSONL support - `parser(reviver?)` — JSONL parser function (returns gen() pipeline). - `parserStream(options?)` — JSONL parser as a stream. - `stringerStream(options?)` — JSONL stringer as a stream. ```js import chain from 'stream-chain'; import parser from 'stream-chain/jsonl/parser.js'; import fs from 'node:fs'; chain([ fs.createReadStream('data.jsonl'), parser(), obj => console.log(obj) ]); ``` ## Common patterns ### Object processing pipeline ```js import chain from 'stream-chain'; const pipeline = chain([ x => x * x, x => chain.many([x - 1, x, x + 1]), x => x % 2 ? x : null, ]); dataSource.pipe(pipeline); pipeline.on('data', x => console.log(x)); ``` ### Async pipeline with filtering ```js chain([ async x => await fetchData(x), x => x.status === 200 ? x.body : null, x => JSON.parse(x), ]); ``` ### Generator producing multiple values ```js chain([ function* (x) { for (let i = 0; i < x; ++i) yield i; }, x => x * x, ]); ``` ### Accumulate and emit at end ```js import {none, flushable} from 'stream-chain/defs.js'; let sum = 0; chain([ flushable(x => { if (x === none) return sum; sum += x; return none; }) ]); ``` ### Web streams ```js const readable = new ReadableStream({ /* ... */ }); const writable = new WritableStream({ /* ... */ }); chain([readable, x => x * 2, writable]); ``` ## TypeScript ```ts import chain from 'stream-chain'; import {TypedTransform} from 'stream-chain/typed-streams.js'; const transform = new TypedTransform({ objectMode: true, transform(x, _, cb) { cb(null, String(x)); } }); const pipeline = chain([transform] as const); ``` ## Links - Docs: https://github.com/uhop/stream-chain/wiki - npm: https://www.npmjs.com/package/stream-chain - Full LLM reference: https://github.com/uhop/stream-chain/blob/master/llms-full.txt