# stream-join > A toolkit of N→1 stream combinators — combine values from multiple streams into one, with proper backpressure handling. Four primitives cover the four useful control-flow shapes. Two flavors share a single algorithm per component: Node Streams (default entry, `stream-join`) and Web Streams (`stream-join/web`). ESM-only, Node 22+. Built on `stream-chain` ^4.0.2. - `zip(streams, options?)` — symmetric advance: every round pulls one value per non-ended stream and combines them via `joinItems` - `select(streams, options)` — asymmetric advance: buffered pick-one-of-N per round, with `windowSize` for drift tolerance - `race(streams, options?)` — emit-as-ready: whichever stream resolves first wins each round, no buffering - `concat(streams, options?)` — sequential drain: stream 0, then stream 1, …, then stream N-1 - Helpers under `utils/` for composing common merge patterns: `pickFirst`, `pickMin`, `sortedInsert`, `mergeSorted` All four primitives: - Take an array of streams (`Readable[]` on the Node side, `ReadableStream[]` on the Web side) as the first argument; throw `TypeError` on an empty/missing array. - Return a stream of the matching flavor — Node `Readable` via `stream-chain`'s `readableFrom`, or Web `ReadableStream` via an internal `fromAsyncIterable` shim. Node side is always `objectMode: true`. - Read input streams through `stream-chain` v4's pullers (`streamPuller` / `webStreamPuller`), which preserve original `'error'` values and survive consumer-side early exit. - Handle backpressure pull-based, end-to-end. ## Quick start Install: ```bash npm i stream-join ``` Usage (`example.mjs`): ```js import zip from 'stream-join'; import {Readable} from 'node:stream'; const s1 = Readable.from([1, 2, 3]); const s2 = Readable.from(['a', 'b', 'c']); zip([s1, s2]).on('data', x => console.log(x)); // [1, 'a'], [2, 'b'], [3, 'c'] ``` ## Importing ```js // Node default = zip (back-compat with 1.x naming `join`) import zip from 'stream-join'; import {zip, select, race, concat} from 'stream-join'; // Per-component subpaths (Node side) import zip from 'stream-join/zip.js'; import select from 'stream-join/select.js'; import race from 'stream-join/race.js'; import concat from 'stream-join/concat.js'; // Web Streams variant import zip from 'stream-join/web'; import {zip, select, race, concat} from 'stream-join/web'; import select from 'stream-join/web/select.js'; // Helpers (Node + Web share these except merge-sorted) import pickFirst from 'stream-join/utils/pick-first.js'; import pickMin from 'stream-join/utils/pick-min.js'; import sortedInsert from 'stream-join/utils/sorted-insert.js'; import mergeSorted from 'stream-join/utils/merge-sorted.js'; // Node merge import mergeSorted from 'stream-join/web/utils/merge-sorted.js'; // Web merge ``` The package is ESM-only (`"type": "module"`). `import` syntax is required; subpath imports include the explicit `.js` extension. ## API ### `zip(streams[, options])` Symmetric N→1 combine. Per round, pulls one value from every non-ended stream; passes the per-round items array (with `null` for ended streams) to `joinItems`; yields the collected outputs. Arguments: - `streams` — non-empty array of object-mode Readable streams. Throws `TypeError` if missing/empty. - `options` — optional. Readable options plus: - `joinItems?(sink, items) => void | Promise` — combine callback. `sink` is `{push(value): void}`; call `sink.push(value)` 0+ times per round. `items` is the per-stream values array (length = `streams.length`; `null` for ended streams). May be async. Default: `(sink, items) => sink.push(items)`. - `skipEvents?: boolean` — legacy 1.x flag; accepted as no-op. Returns: an object-mode `Readable` emitting the combined values. Ends when every input stream has ended; propagates input-stream `'error'` events with the original value preserved. Output cardinality = longest input stream's length × the `joinItems` fan. ### `select(streams, options)` Asymmetric N→1 select. Parallel initial fill of up to `windowSize` items per stream; per round, the user's `pick(items)` returns the index of the slot to emit; the source stream is refilled (`insert`) or removed (`remove`). Arguments: - `streams` — non-empty array of object-mode Readable streams. Throws `TypeError` if missing/empty. - `options` — **required**. Must include `pick`; plus any Readable options. - `pick(items): number` — **required**. `items` is a `readonly Slot[]` where `Slot = {item: T, index: number}`. Returns the index of the slot to emit. **Stop signal**: any value not in `[0, items.length)` (negative, `NaN`, `undefined`, `null`, ±`Infinity`, non-integer, ≥ length) ends the merge. Throws `TypeError` if missing or not a function. - `insert?(items, newSlot, lastPos?): void` — optional. Mutates `items` in place. `lastPos === undefined` during initial fill (length MAY grow); `lastPos` defined during steady-state refill (length MUST stay unchanged). Default: replace at `lastPos`, or `push` when undefined. - `remove?(items, lastPos): void` — optional. Called when the source stream of `items[lastPos]` has exhausted; must decrease `items.length` by 1. Default: `items.splice(lastPos, 1)`. - `windowSize?: number` — optional positive integer. Per-stream buffer depth. Default `1`. Throws `TypeError` if not a positive integer. Returns: an object-mode `Readable` emitting one value per round (`items[pos].item`); output element type is the union of the input streams' value types. Ends when every input stream has exhausted or `pick` returns a stop signal; propagates input-stream `'error'` events with the original value preserved. Output cardinality = sum of all input streams' lengths (assuming no early stop). ### `race(streams[, options])` Emit-as-ready. All N streams have a pull in flight; `Promise.race` selects whichever resolves first; restart the pull on that stream. Arguments: - `streams` — non-empty array of object-mode Readable streams. Throws `TypeError` if missing/empty. - `options` — optional Readable options (no `race`-specific fields). Returns: an object-mode `Readable` emitting values in event-loop-arrival order; output element type is the union of the input streams' value types. Ends when every input stream has ended; propagates input-stream `'error'` events with the original value preserved. Order is non-deterministic. ### `concat(streams[, options])` Sequential drain. Stream 0, then 1, …, then N-1. Lazy puller creation — only one stream's puller exists at a time. Arguments: - `streams` — non-empty array of object-mode Readable streams. Drained left-to-right. Throws `TypeError` if missing/empty. - `options` — optional Readable options (no `concat`-specific fields). Returns: an object-mode `Readable` emitting stream 0's values, then stream 1's, …, then stream N-1's. Output element type is the union of the input streams' value types. Ends when every input stream has ended; propagates input-stream `'error'` events with the original value preserved. ## Helpers ```js import pickFirst from 'stream-join/utils/pick-first.js'; import pickMin from 'stream-join/utils/pick-min.js'; import sortedInsert from 'stream-join/utils/sorted-insert.js'; import mergeSorted from 'stream-join/utils/merge-sorted.js'; ``` ### `pickFirst()` Arguments: none. Returns: the integer `0`. Constant-time, no allocations. Pair with `sortedInsert` for k-way merge of sorted streams (the smallest slot is always at index 0). ### `pickMin(lessFn)` Arguments: - `lessFn(a, b): boolean` — comparator on item values. Returns `true` if `a` should come before `b`. Must be a strict-weak-order predicate. Returns: a picker function `(items: readonly Slot[]) => number` suitable for `select`'s `pick`. Returns the index of the slot whose `item` is smallest per `lessFn`. Ties resolve to the first occurrence. O(items.length) per call. ### `sortedInsert(lessFn)` Arguments: - `lessFn(a, b): boolean` — comparator on item values (same shape as `pickMin`). Returns: an `insert` callback `(items: Slot[], newSlot: Slot, lastPos?: number) => void` suitable for `select`'s `insert`. Mutates `items` in place to maintain sorted order. Built on `nano-binary-search`. Smart-replace optimization: when the new slot belongs at the same position as the just-removed one, replaces in place (one assignment, no splice). ### `mergeSorted(streams, lessFn[, options])` Umbrella combining `select + pickFirst + sortedInsert(lessFn)`. K-way merge of sorted streams in one line. Arguments: - `streams` — non-empty array of object-mode Readable streams, each sorted per `lessFn` (or locally disordered within `windowSize`). - `lessFn(a, b): boolean` — comparator on item values. - `options` — optional Readable options plus `windowSize?: number` (default `1`; larger values tolerate local disorder). Returns: an object-mode `Readable` emitting the merged sequence in sorted order per `lessFn`. ```js import mergeSorted from 'stream-join/utils/merge-sorted.js'; mergeSorted([sortedStream1, sortedStream2, sortedStream3], (a, b) => a < b).on('data', x => console.log(x) ); ``` ## Common patterns ### Symmetric zip with custom combine ```js zip([s1, s2], { joinItems(sink, items) { sink.push(items.join('-')); } }); ``` ### K-way merge of sorted streams ```js mergeSorted([s1, s2, s3], (a, b) => a.timestamp < b.timestamp); ``` ### Drift-tolerant merge ```js mergeSorted([s1, s2], (a, b) => a < b, {windowSize: 4}); ``` ### Priority-queue merge (no pre-sort) ```js import select from 'stream-join/select.js'; import pickMin from 'stream-join/utils/pick-min.js'; select([s1, s2, s3], {pick: pickMin((a, b) => a.priority < b.priority)}); ``` ### Emit-as-ready (live event streams) ```js import race from 'stream-join/race.js'; race([liveStream1, liveStream2, liveStream3]).on('data', event => console.log(event)); ``` ### Sequential drain ```js import concat from 'stream-join/concat.js'; concat([part1, part2, part3]).on('data', chunk => console.log(chunk)); ``` ### Web Streams flavor ```js import zip from 'stream-join/web'; const out = zip([webReadable1, webReadable2]); // ReadableStream<[T1, T2]> const reader = out.getReader(); for (;;) { const {value, done} = await reader.read(); if (done) break; console.log(value); } ``` ### Composition with stream-chain Every Node-side component returns a plain Readable, so each composes naturally as the first item in a `stream-chain` pipeline: ```js import chain from 'stream-chain'; import mergeSorted from 'stream-join/utils/merge-sorted.js'; chain([ mergeSorted([s1, s2], (a, b) => a < b), x => x * 2, x => x.toString() ]); ``` ## Notable changes in 2.x - ESM-only (`"type": "module"`). Was CommonJS in 1.x. - Built on `stream-chain` ^4.0.2 (runtime dep). Was zero-dep in 1.x. - Node 22+ required. - `select`, `race`, `concat` added alongside `zip` (formerly the single function `join`). - Web Streams variant at `stream-join/web` — same API surface, `ReadableStream` in/out. - Internal stream reads go through stream-chain v4's `streamPuller` (Node) / `webStreamPuller` (Web), not the package's previous in-tree `src/stream-puller.js`. Original `'error'` values preserved. - `skipEvents` from 1.x is accepted as a no-op. ## Links - Repository: https://github.com/uhop/stream-join - npm: https://www.npmjs.com/package/stream-join - Full LLM reference: https://raw.githubusercontent.com/uhop/stream-join/master/llms-full.txt