///
import {Readable, ReadableOptions} from 'node:stream';
import {TypedReadable} from 'stream-chain/typed-streams.js';
/**
* Combines N object-mode Readable streams into a single Readable by selecting one of N currently
* buffered slots per round, emitting it, refilling that slot from its source stream, and looping
* until every stream is exhausted (or `pick` signals stop).
*
* Different from `zip()`: `zip` advances all N streams per round and combines the values; `select`
* advances ONE stream per round and emits one item. K-way merge of sorted streams, priority-queue
* merge, and drift-tolerant merge are all `select` use cases.
*
* The output element type is the union of the input streams' value types (`SlotItemType`).
* For homogeneous input — e.g., `TypedReadable[]` — the output is `TypedReadable`. For
* heterogeneous input or untyped `Readable[]`, it's the union or `unknown`.
*
* Throws `TypeError` if `streams` is missing/empty, `options.pick` is missing or not a function,
* or `options.windowSize` is not a positive integer.
*
* @typeParam S — the tuple type of input streams.
* @param streams — non-empty array of object-mode Readable streams.
* @param options — required. Must include `pick`; may include `insert`, `remove`, `windowSize`,
* plus any `ReadableOptions`.
* @returns a `TypedReadable>` that emits one value per round. Always object-mode.
* Propagates `'error'` events from any input stream; ends when every input stream has ended
* (or `pick` returns a stop signal).
*/
declare function select(
streams: S,
options: select.SelectOptions
): TypedReadable>;
declare namespace select {
/**
* Resolves to the value type of a `Readable` — `R` for `TypedReadable`, otherwise `unknown`.
*
* @typeParam R — a `Readable` (or `TypedReadable`) whose value type to extract.
*/
export type StreamValue = R extends TypedReadable ? V : unknown;
/**
* Union of value types across the input streams tuple. For homogeneous inputs this resolves to
* the single shared value type; for heterogeneous inputs, it's the union.
*
* @typeParam S — the tuple type of input streams.
*/
export type SlotItemType = StreamValue;
/**
* An entry in the buffer the picker / inserter / remover sees.
*
* @typeParam T — the value type carried by this slot.
*/
export interface Slot {
/** The value pulled from the source stream. */
item: T;
/** The position of the source stream in the input `streams` array. */
index: number;
}
/**
* Options accepted by `select()`. Extends `ReadableOptions`; the output Readable is always
* created with `objectMode: true`.
*
* @typeParam S — the tuple type of input streams (used to type the slots passed to the hooks).
*/
export interface SelectOptions<
S extends readonly Readable[] = readonly Readable[]
> extends ReadableOptions {
/**
* Required. Given the current buffer of slots, returns the index of the slot to emit and refill.
*
* The return value MUST be an integer in `[0, items.length)`. Any other value
* (`undefined`, `null`, `NaN`, ±`Infinity`, negative, non-integer, ≥ length)
* stops the merge — useful for early termination.
*
* @param items — read-only view of the current buffer (length depends on `windowSize`,
* number of streams, and how many have exhausted).
* @returns the index of the slot to emit, or any non-integer / out-of-range value to stop.
*/
pick: (items: readonly Slot>[]) => number;
/**
* Optional. Places a freshly-pulled slot into `items` (mutates in place). Called both during
* the parallel initial fill (`lastPos === undefined`, length MAY grow) and during steady-state
* refill (`lastPos` defined, length MUST stay unchanged).
*
* Default: replace at `lastPos` (or `push` when `lastPos` is undefined).
*
* @param items — the mutable buffer to update in place.
* @param newSlot — the freshly-pulled slot to insert.
* @param lastPos — `undefined` during initial fill (grow the buffer); otherwise the index
* of the slot just emitted (refill in place, no length change).
* @returns nothing.
*/
insert?: (
items: Slot>[],
newSlot: Slot>,
lastPos?: number
) => void;
/**
* Optional. Called when the source stream of `items[lastPos]` has just exhausted; the hook
* MUST decrease `items.length` by 1. Default: `items.splice(lastPos, 1)`.
*
* @param items — the mutable buffer; the hook must shorten it by 1.
* @param lastPos — index of the slot whose source stream has exhausted.
* @returns nothing.
*/
remove?: (items: Slot>[], lastPos: number) => void;
/**
* Optional. Per-stream buffer depth — how many items from each stream the initial fill
* pre-buffers, and the steady-state buffer length per stream. Default `1`. Must be a
* positive integer.
*/
windowSize?: number;
}
}
type StreamValue = select.StreamValue;
type SlotItemType = select.SlotItemType;
type Slot = select.Slot;
type SelectOptions = select.SelectOptions;
export default select;
export {select};
export type {StreamValue, SlotItemType, Slot, SelectOptions};