# stream-join > A toolkit of N→1 stream combinators with Node Streams **and** Web Streams flavors. Four primitives — `zip`, `select`, `race`, `concat` — cover the useful control-flow shapes. A small helper layer under `utils/` composes them into common merge patterns (k-way merge of sorted streams, priority-queue merge, drift-tolerant merge). Built on `stream-chain` ^4.0.2 and sibling to the rest of the `object-stream` family. - Two runtime deps: `stream-chain` (^4.0.2, provides `readableFrom`, `streamPuller`, `webStreamPuller`) and `nano-binary-search` (used by `sortedInsert`). - ESM-only (`"type": "module"`). Node 22+. - Node-side entry (`stream-join`) returns `Readable` (always `objectMode: true`). Web-side entry (`stream-join/web`) returns `ReadableStream`. - Each component is a thin wrapper over a shared runtime-neutral generator factory in `src/generators/`. The algorithm lives once; the two trees adapt their runtime's stream type to the same async-iterator pullers. - Input-stream reads go through stream-chain v4's `streamPuller` (Node) / `webStreamPuller` (Web). Both preserve original `'error'` values and survive consumer-side early exit. ## Quick start ```bash npm i stream-join ``` ```js import zip from 'stream-join'; import {Readable} from 'node:stream'; const s1 = Readable.from([1, 2, 3]); const s2 = Readable.from(['a', 'b', 'c']); zip([s1, s2]).on('data', x => console.log(x)); // [1, 'a'], [2, 'b'], [3, 'c'] ``` ## Importing ```js // Node default = zip (back-compat with 1.x naming `join`) import zip from 'stream-join'; import {zip, select, race, concat} from 'stream-join'; // Per-component subpaths (Node side) import zip from 'stream-join/zip.js'; import select from 'stream-join/select.js'; import race from 'stream-join/race.js'; import concat from 'stream-join/concat.js'; // Web Streams variant — same names, ReadableStream in/out import zip from 'stream-join/web'; import {zip, select, race, concat} from 'stream-join/web'; import select from 'stream-join/web/select.js'; // Helpers (Node + Web share the picker / insert primitives; merge-sorted has a Web mirror) import pickFirst from 'stream-join/utils/pick-first.js'; import pickMin from 'stream-join/utils/pick-min.js'; import sortedInsert from 'stream-join/utils/sorted-insert.js'; import mergeSorted from 'stream-join/utils/merge-sorted.js'; // Node merge import mergeSorted from 'stream-join/web/utils/merge-sorted.js'; // Web merge ``` The package is ESM-only (`"type": "module"`); subpath imports take the explicit `.js` extension. ## API — main components The four primitives correspond to the four useful "N inputs to 1 output" control-flow shapes: | Primitive | Per-round advance | Output cardinality | Order | | ------------ | --------------------- | ----------------------------------- | -------------------------------- | | `zip` | All streams | Longest input × joinItems' fan | Per-round grouped | | `select` | One stream (user pick) | Sum of all inputs | User's pick decides | | `race` | One stream (race winner) | Sum of all inputs | Non-deterministic | | `concat` | One stream sequentially | Sum of all inputs | Stream-major (s0…s_N-1) | ### `zip(streams[, options])` Symmetric N→1 combine. Every round pulls one value from each non-ended stream concurrently via `Promise.all`; values from ended streams are represented as `null`. The user's `joinItems(sink, items)` callback combines per-round values into 0 or more output values. Signature: ```ts function zip( streams: S, options?: zip.JoinOptions ): TypedReadable; ``` Parameters: - `streams` — non-empty array of object-mode `Readable` streams. Throws `TypeError` on missing/empty. - `options.joinItems(sink, items)` — optional. Called once per round with the per-stream values (positionally; `null` for ended streams). `sink` is `{push(value): void}` — call `sink.push(value)` 0+ times per round. May be `async`: a returned thenable is awaited before the next round starts; synchronous throws and rejected return-promises both propagate as `'error'` events. Default: `(sink, items) => sink.push(items)` (emits the items array as a single output value). - `options.skipEvents` — accepted for backwards compatibility with 1.x; no-op. - Any other `ReadableOptions` are passed to the output Readable. Returns: a `TypedReadable` (or a `TypedReadable` when no `T` is supplied), `objectMode: true`. Ends when every input stream has ended; propagates input-stream `'error'` events with the original value preserved. Output cardinality = longest input stream's length × the `joinItems` fan. Examples: ```js // Default: array per round zip([Readable.from([1, 2, 3]), Readable.from(['a', 'b', 'c'])]).on('data', x => console.log(x) ); // [1, 'a'], [2, 'b'], [3, 'c'] // Mismatched lengths — ended streams contribute null zip([Readable.from([1, 2, 3, 4]), Readable.from(['a', 'b'])]); // [1, 'a'], [2, 'b'], [3, null], [4, null] // Custom joinItems zip([s1, s2], { joinItems(sink, items) { if (items[0] !== null && items[1] !== null) sink.push(items.join('-')); } }); ``` ### `select(streams, options)` Asymmetric N→1 selection. Parallel initial fill of up to `windowSize` items per stream; per round, the user's `pick(items)` returns the index of the slot to emit. That slot's source stream is refilled (`insert`, default: replace in place) or removed (`remove`, default: splice) if exhausted. Useful for k-way merge of sorted streams, priority-queue merge, drift-tolerant merge. Signature: ```ts function select( streams: S, options: select.SelectOptions ): TypedReadable>; ``` Slot shape (what `pick` / `insert` / `remove` see): ```ts interface Slot { item: T; // the value pulled from the source stream index: number; // position of the source stream in the streams array } ``` `items` is **read-only by contract**. Modify values via a downstream `chain()` step, not by mutating slots inside callbacks. Parameters: - `streams` — non-empty array of object-mode Readable streams. Throws `TypeError` on missing/empty. - `options.pick(items) => number` — **required**. `items: readonly Slot[]`. Returns the index in `items` of the slot to emit. Stop signals: `undefined`, `null`, `NaN`, ±`Infinity`, negative, non-integer, or `>= items.length` — any of these end the merge. Throws `TypeError` if missing or not a function. - `options.insert(items, newSlot, lastPos?)` — optional. Mutates `items` in place. `lastPos === undefined` during initial fill (length MAY grow); `lastPos` defined during steady-state refill (length MUST stay unchanged). Default: replace at `lastPos`, or `push` when `lastPos` undefined. - `options.remove(items, lastPos)` — optional. Called when the source stream of `items[lastPos]` has just exhausted. Must decrease `items.length` by 1. Default: `items.splice(lastPos, 1)`. - `options.windowSize` — optional. Per-stream buffer depth. Default 1. Must be a positive integer; throws `TypeError` otherwise. - Any other `ReadableOptions` are passed to the output Readable. Returns: a `TypedReadable>` (union of input value types), `objectMode: true`. Emits one value per round (`items[pos].item`). Ends when every input stream has exhausted or `pick` returns a stop signal; propagates input-stream `'error'` events with the original value preserved. Output cardinality = sum of all input streams' lengths assuming no early stop. Examples: ```js // K-way merge of sorted streams import select from 'stream-join/select.js'; import pickFirst from 'stream-join/utils/pick-first.js'; import sortedInsert from 'stream-join/utils/sorted-insert.js'; select([s1, s2, s3], { pick: pickFirst, insert: sortedInsert((a, b) => a < b) }); // Drift-tolerant merge: tolerate up to 4 items of local disorder per stream select([s1, s2], { pick: pickFirst, insert: sortedInsert((a, b) => a.timestamp < b.timestamp), windowSize: 4 }); // Priority-queue merge with default insert (replace at lastPos) import pickMin from 'stream-join/utils/pick-min.js'; select([s1, s2, s3], {pick: pickMin((a, b) => a.priority < b.priority)}); // Early termination let emitted = 0; select([s1, s2], { pick: items => { if (++emitted > 100) return -1; // stop after 100 emissions return 0; } }); ``` ### `race(streams[, options])` Emit-as-ready. All N streams have a pull in flight; `Promise.race` selects whichever resolves first; emit that value and restart the pull on that stream. No buffering across rounds. Natural fit for merging live event streams where the output should not be bounded by the slowest source. Signature: ```ts function race( streams: S, options?: race.RaceOptions ): TypedReadable>; ``` Parameters: - `streams` — non-empty array of object-mode Readable streams. Throws `TypeError` on missing/empty. - `options` — optional `ReadableOptions` (no `race`-specific fields; output is forced to `objectMode: true`). Returns a `TypedReadable` that emits values in event-loop-arrival order. Ends when every input stream has ended; propagates input-stream `'error'` events with the original value preserved. Order is non-deterministic — depends on how the input streams' data events interleave. Example: ```js import race from 'stream-join/race.js'; race([logStreamA, logStreamB, logStreamC]).on('data', event => process(event)); // Whichever stream produces an event first gets that event emitted first. ``` ### `concat(streams[, options])` Sequential drain. Stream 0 is fully consumed, then stream 1, …, then stream N-1. Pullers are created lazily — only one stream's puller is active at a time — so streams that haven't started yet don't buffer data prematurely. Signature: ```ts function concat( streams: S, options?: concat.ConcatOptions ): TypedReadable>; ``` Parameters: - `streams` — non-empty array of object-mode Readable streams. Drained left-to-right. Throws `TypeError` on missing/empty. - `options` — optional `ReadableOptions` (no `concat`-specific fields; output is forced to `objectMode: true`). Returns a `TypedReadable` emitting stream 0's values, then stream 1's, …, then stream N-1's. Ends when every input stream has ended; propagates input-stream `'error'` events with the original value preserved (later streams that haven't been started yet are never read). Example: ```js import concat from 'stream-join/concat.js'; concat([part1, part2, part3]).on('data', chunk => out.push(chunk)); // All of part1, then all of part2, then all of part3. ``` ## Helpers (`utils/`) The helpers compose with `select()` to express common merge patterns. They live under `src/utils/` and import as `stream-join/utils/`. ### `pickFirst()` ```js const pickFirst = () => 0; ``` Signature: ```ts const pickFirst: () => number; ``` Parameters: none. Returns: the integer `0`, regardless of input. Constant-time, no allocations. Pair with `sortedInsert` for k-way merge of sorted streams — the smallest slot is always at index 0, so the picker is O(1) with zero comparisons. ### `pickMin(lessFn)` ```js const pickMin = lessFn => items => { let min = 0; for (let i = 1; i < items.length; ++i) { if (lessFn(items[i].item, items[min].item)) min = i; } return min; }; ``` Signature: ```ts function pickMin( lessFn: (a: T, b: T) => boolean ): (items: readonly Slot[]) => number; ``` Parameters: - `lessFn(a, b)` — comparator on item values. Returns `true` if `a` should come before `b`. Must be a strict-weak-order predicate. Returns: a picker function suitable for `select`'s `pick` option. The picker takes a `readonly Slot[]` and returns the index of the slot whose `item` is smallest per `lessFn`. Ties resolve to the first occurrence. O(items.length) per call; no allocations. Operates on item values (helper unwraps `slot.item` internally), so the same comparator can be reused with `sortedInsert`. ### `sortedInsert(lessFn)` Signature: ```ts function sortedInsert( lessFn: (a: T, b: T) => boolean ): (items: Slot[], newSlot: Slot, lastPos?: number) => void; ``` Parameters: - `lessFn(a, b)` — comparator on item values; same shape and convention as `pickMin`. Returns: an `insert` callback suitable for `select`'s `insert` option. The callback takes: - `items: Slot[]` — the mutable buffer maintained in sorted order; - `newSlot: Slot` — the freshly-pulled slot to insert; - `lastPos?: number` — `undefined` during initial fill (grow the buffer), or the index of the slot just emitted (refill in place; length unchanged). …and returns nothing (mutates `items` in place). Uses `nano-binary-search` to find the insertion point; on post-pick refill, detects when the new slot belongs at the same position as the just-removed one and does a single in-place replace (no splice) — the common case when source streams are themselves locally sorted. ### `mergeSorted(streams, lessFn[, options])` Umbrella that wires `select + pickFirst + sortedInsert(lessFn)`. K-way merge of sorted streams in one line. Signature: ```ts function mergeSorted( streams: readonly Readable[], lessFn: (a: T, b: T) => boolean, options?: MergeSortedOptions ): TypedReadable; interface MergeSortedOptions extends ReadableOptions { windowSize?: number; // default 1 } ``` Parameters: - `streams` — non-empty array of object-mode Readable streams, each sorted per `lessFn` (or locally disordered within `windowSize`). - `lessFn(a, b)` — comparator on item values; same shape and convention as `pickMin` / `sortedInsert`. - `options` — optional. `ReadableOptions` plus `windowSize` (default `1`, positive integer; larger values tolerate that many items of local disorder per stream). Returns a `TypedReadable` emitting the merged sequence in sorted order per `lessFn`. Propagates input-stream `'error'` events with the original value preserved; ends when every input stream has ended. Example: ```js import mergeSorted from 'stream-join/utils/merge-sorted.js'; mergeSorted( [Readable.from([1, 4, 7]), Readable.from([2, 5, 8]), Readable.from([3, 6, 9])], (a, b) => a < b ).on('data', x => console.log(x)); // 1, 2, 3, 4, 5, 6, 7, 8, 9 ``` Pass `windowSize` via `options` for drift tolerance: ```js mergeSorted([s1, s2], (a, b) => a < b, {windowSize: 4}); ``` ## Web Streams variant The `stream-join/web` subpath mirrors the Node side for `ReadableStream` consumers — same algorithm, same options, same helpers, only the I/O type changes: ```js import zip from 'stream-join/web'; const out = zip([webReadable1, webReadable2]); // ReadableStream<[T1, T2]> const reader = out.getReader(); for (;;) { const {value, done} = await reader.read(); if (done) break; process(value); } ``` The Web tree pulls in no `node:stream` code, so it stays bundleable for browsers and edge runtimes. The four wrappers — `web/zip.js`, `web/select.js`, `web/race.js`, `web/concat.js` — adapt `ReadableStream` inputs via `stream-chain/utils/webStreamPuller.js`, share the same generator factories with the Node side (`src/generators/`), and wrap the output via an internal `fromAsyncIterable` shim (a portable equivalent of `ReadableStream.from` for runtimes that don't yet ship the static method). ## Pullers (from `stream-chain` v4) The Node and Web wrappers read input streams through `stream-chain` v4's pullers: - **Node:** `streamPuller(stream)` is `stream.iterator({destroyOnReturn: false})` — preserves the original `'error'` value (no `AbortError` wrap), surfaces premature destroy as `Error('Premature close')`, and survives consumer-side early exit (`break` out of `for await` doesn't destroy the source). - **Web:** `webStreamPuller(stream)` is `stream[Symbol.asyncIterator]({preventCancel: true})` plus an explicit `cancel(reason)` method (the iterator protocol's `return()` can't convey a cancel reason cleanly). Both satisfy the standard `AsyncIterator` shape, so the runtime-neutral generators in `src/generators/.js` consume them through the same interface. ## Composition with stream-chain Every Node-side component returns a plain Readable, so it composes as the first item in a `stream-chain` pipeline: ```js import chain from 'stream-chain'; import mergeSorted from 'stream-join/utils/merge-sorted.js'; chain([ mergeSorted([s1, s2, s3], (a, b) => a.timestamp < b.timestamp), event => ({...event, processed: Date.now()}), event => JSON.stringify(event), fs.createWriteStream('out.jsonl') ]); ``` The Web-side components likewise produce a `ReadableStream` and can be piped with the standard `.pipeTo(...)` / `.pipeThrough(...)` API. ## Error handling If any input stream errors while a main component is consuming it, the error is propagated to the output with the **original value preserved**. On the Node side this surfaces as the output Readable's `'error'` event; on the Web side it appears via the output `ReadableStream`'s controller `error()` (visible to readers as a rejected `read()`). The runtime pullers from `stream-chain` v4 are non-destructive and preserve raw error values — no `AbortError` wrapping. ## What is NOT in stream-join - **Sort.** Sorting streams is the job of `stream-sorting` (forthcoming). This package treats input order as given. - **SQL-style key-based join.** That's `mergeJoin` in `stream-sorting`. This package's primitives don't know about keys or sortedness. - **Set operations (union, intersection, difference) on sorted streams.** Live in `stream-sorting`. - **1→N operations (tee, fork).** That's `stream-fork`. ## Notable changes 1.x → 2.x - **ESM-only** (`"type": "module"`). 1.x was CommonJS. - **Built on `stream-chain`** ^4.0.2 — a runtime dependency now (was zero-dep in 1.x). The 4.x line provides the runtime pullers (`streamPuller`, `webStreamPuller`) the components consume. - **Node 22+** required. - **Multi-component package.** 1.x exported a single function. 2.x exports four (`zip`, `select`, `race`, `concat`) plus helpers under `utils/`. - **Web Streams variant** at `stream-join/web` — same surface, same algorithm, `ReadableStream` in/out. - **Default export** remains the `zip` function (formerly named `join` in 1.x); `import zip from 'stream-join'` returns it for back-compat. - **`skipEvents` option** from 1.x is accepted as a no-op. - The package's previous in-tree `src/stream-puller.js` was removed — its job moved into `stream-chain` v4 as `streamPuller` / `webStreamPuller`. ## Links - Repository: https://github.com/uhop/stream-join - npm: https://www.npmjs.com/package/stream-join - Wiki: https://github.com/uhop/stream-join/wiki