import type { EnvelopedMessage } from "./envelope.js"; import type { Serialization } from "./serialization.js"; import type { Compression } from "./compression.js"; /** * A function that takes an asynchronous iterable as a source, and returns a * transformed asynchronous iterable. * * The following function is a simple no-op implementation that yields every * element from the source: * * ```ts * async function* t(input) { * yield* input; * } * ``` * * The following function takes fetch responses as a source, and yields the * text body of each: * * ```ts * async function* t(input: AsyncIterable): AsyncIterable { * for await (const r of input) { * yield await r.text(); * } * } * ``` * * Transformation functions can be passed to pipe() and pipeTo(). * * @private Internal code, does not follow semantic versioning. */ export type AsyncIterableTransform = (data: AsyncIterable) => AsyncIterable; /** * A function that takes an asynchronous iterable as a source and consumes it * to the end, optionally returning a cumulative value. * * Sinks are the used with pipeTo(). * * @private Internal code, does not follow semantic versioning. */ export type AsyncIterableSink = (iterable: AsyncIterable) => Promise; /** * Options for pipe() and pipeTo(). * * @private Internal code, does not follow semantic versioning. */ interface PipeOptions { /** * Set to true to abort the source iterable on downstream errors. * The source iterable must not swallow errors raised by yield. * * Why? If iterators are chained, any error raised by the source or any * transform travels down the stream. But if an error happens downstream, the * source and transformations are left dangling: * * ```ts * async function source*() { * const conn = await dbConn(); * yield await conn.query("SELECT 1"); // consumed downstream * yield await conn.query("SELECT 2"); // never consumed * conn.close(); // never runs * } * for await (const element of source()) { * // let's say we try to write the element to disk, but the disk is full * throw "err"; * } * ``` * * If this option is set to true, an error raised by the sink function given * to pipeTo() will raise the same error in the source iterable. * * ```ts * async function source*() { * const conn = await dbConn(); * try { * yield await conn.query("SELECT 1"); // consumed downstream * yield await conn.query("SELECT 2"); // never consumed * } finally { * conn.close(); // runs! * } * } * await pipeTo(source(), async iterable => { * for await (const element of source()) { * // let's say we try to write the element to disk, but the disk is full * throw "err"; * } * }, { propagateDownStreamError: true }); * ``` * * If this option is set to true with pipe(), the downstream consumer of the * iterable returned by pipe() can abort the source iterable by calling throw() * on the iterator. */ propagateDownStreamError?: boolean; } /** * ParsedEnvelopedMessage is the deserialized counterpart to an * EnvelopedMessage. * * It is either a deserialized message M, or a deserialized end-of-stream * message E, typically distinguished by a flag on an enveloped message. * * @private Internal code, does not follow semantic versioning. */ type ParsedEnvelopedMessage = { end: false; value: M; } | { end: true; value: E; }; /** * Takes an asynchronous iterable as a source, and passes it to a sink. * * @private Internal code, does not follow semantic versioning. */ export declare function pipeTo(iterable: AsyncIterable, sink: AsyncIterableSink, options?: PipeOptions): Promise; /** * Takes an asynchronous iterable as a source, applies transformations, and * passes it to a sink. * * @private Internal code, does not follow semantic versioning. */ export declare function pipeTo(iterable: AsyncIterable, transform: AsyncIterableTransform, sink: AsyncIterableSink, options?: PipeOptions): Promise; /** * Takes an asynchronous iterable as a source, applies transformations, and * passes it to a sink. * * @private Internal code, does not follow semantic versioning. */ export declare function pipeTo(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, sink: AsyncIterableSink, options?: PipeOptions): Promise; /** * Takes an asynchronous iterable as a source, applies transformations, and * passes it to a sink. * * @private Internal code, does not follow semantic versioning. */ export declare function pipeTo(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, sink: AsyncIterableSink, options?: PipeOptions): Promise; /** * Takes an asynchronous iterable as a source, applies transformations, and * passes it to a sink. * * @private Internal code, does not follow semantic versioning. */ export declare function pipeTo(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, sink: AsyncIterableSink, options?: PipeOptions): Promise; /** * Takes an asynchronous iterable as a source, applies transformations, and * passes it to a sink. * * @private Internal code, does not follow semantic versioning. */ export declare function pipeTo(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, transform5: AsyncIterableTransform, sink: AsyncIterableSink, options?: PipeOptions): Promise; /** * Takes an asynchronous iterable as a source, applies transformations, and * passes it to a sink. * * @private Internal code, does not follow semantic versioning. */ export declare function pipeTo(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, transform5: AsyncIterableTransform, transform6: AsyncIterableTransform, sink: AsyncIterableSink, options?: PipeOptions): Promise; /** * Takes an asynchronous iterable as a source, applies transformations, and * passes it to a sink. * * @private Internal code, does not follow semantic versioning. */ export declare function pipeTo(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, transform5: AsyncIterableTransform, transform6: AsyncIterableTransform, transform7: AsyncIterableTransform, sink: AsyncIterableSink, options?: PipeOptions): Promise; /** * Takes an asynchronous iterable as a source, applies transformations, and * passes it to a sink. * * @private Internal code, does not follow semantic versioning. */ export declare function pipeTo(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, transform5: AsyncIterableTransform, transform6: AsyncIterableTransform, transform7: AsyncIterableTransform, transform8: AsyncIterableTransform, sink: AsyncIterableSink, options?: PipeOptions): Promise; /** * Takes an asynchronous iterable as a source, applies transformations, and * passes it to a sink. * * @private Internal code, does not follow semantic versioning. */ export declare function pipeTo(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, transform5: AsyncIterableTransform, transform6: AsyncIterableTransform, transform7: AsyncIterableTransform, transform8: AsyncIterableTransform, transform9: AsyncIterableTransform, sink: AsyncIterableSink, options?: PipeOptions): Promise; /** * Takes an asynchronous iterable as a source, applies transformations, and * passes it to a sink. * * @private Internal code, does not follow semantic versioning. */ export declare function pipeTo(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, transform5: AsyncIterableTransform, transform6: AsyncIterableTransform, transform7: AsyncIterableTransform, transform8: AsyncIterableTransform, transform9: AsyncIterableTransform, transform10: AsyncIterableTransform, sink: AsyncIterableSink, options?: PipeOptions): Promise; /** * Creates an AsyncIterableSink that concatenates all elements from the input. * * @private Internal code, does not follow semantic versioning. */ export declare function sinkAll(): AsyncIterableSink; /** * Creates an AsyncIterableSink that concatenates all chunks from the input into * a single Uint8Array. * * The iterable raises an error if the more than readMaxBytes are read. * * An optional length hint can be provided to optimize allocation and validation. * If more or less bytes are present in the source that the length hint indicates, * and error is raised. * If the length hint is larger than readMaxBytes, an error is raised. * If the length hint is not a positive integer, it is ignored. * * @private Internal code, does not follow semantic versioning. */ export declare function sinkAllBytes(readMaxBytes: number, lengthHint?: number | string | null): AsyncIterableSink; /** * Apply one or more transformations to an asynchronous iterable. * * @private Internal code, does not follow semantic versioning. */ export declare function pipe(iterable: AsyncIterable, transform: AsyncIterableTransform, options?: PipeOptions): AsyncIterable; /** * Apply one or more transformations to an asynchronous iterable. * * @private Internal code, does not follow semantic versioning. */ export declare function pipe(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, options?: PipeOptions): AsyncIterable; /** * Apply one or more transformations to an asynchronous iterable. * * @private Internal code, does not follow semantic versioning. */ export declare function pipe(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, options?: PipeOptions): AsyncIterable; /** * Apply one or more transformations to an asynchronous iterable. * * @private Internal code, does not follow semantic versioning. */ export declare function pipe(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, options?: PipeOptions): AsyncIterable; /** * Apply one or more transformations to an asynchronous iterable. * * @private Internal code, does not follow semantic versioning. */ export declare function pipe(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, transform5: AsyncIterableTransform, options?: PipeOptions): AsyncIterable; /** * Apply one or more transformations to an asynchronous iterable. * * @private Internal code, does not follow semantic versioning. */ export declare function pipe(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, transform5: AsyncIterableTransform, transform6: AsyncIterableTransform, options?: PipeOptions): AsyncIterable; /** * Apply one or more transformations to an asynchronous iterable. * * @private Internal code, does not follow semantic versioning. */ export declare function pipe(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, transform5: AsyncIterableTransform, transform6: AsyncIterableTransform, transform7: AsyncIterableTransform, options?: PipeOptions): AsyncIterable; /** * Apply one or more transformations to an asynchronous iterable. * * @private Internal code, does not follow semantic versioning. */ export declare function pipe(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, transform5: AsyncIterableTransform, transform6: AsyncIterableTransform, transform7: AsyncIterableTransform, transform8: AsyncIterableTransform, options?: PipeOptions): AsyncIterable; /** * Apply one or more transformations to an asynchronous iterable. * * @private Internal code, does not follow semantic versioning. */ export declare function pipe(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, transform5: AsyncIterableTransform, transform6: AsyncIterableTransform, transform7: AsyncIterableTransform, transform8: AsyncIterableTransform, transform9: AsyncIterableTransform, options?: PipeOptions): AsyncIterable; /** * Apply one or more transformations to an asynchronous iterable. * * @private Internal code, does not follow semantic versioning. */ export declare function pipe(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, transform5: AsyncIterableTransform, transform6: AsyncIterableTransform, transform7: AsyncIterableTransform, transform8: AsyncIterableTransform, transform9: AsyncIterableTransform, transform10: AsyncIterableTransform, options?: PipeOptions): AsyncIterable; /** * Apply one or more transformations to an asynchronous iterable. * * @private Internal code, does not follow semantic versioning. */ export declare function pipe(iterable: AsyncIterable, transform1: AsyncIterableTransform, transform2: AsyncIterableTransform, transform3: AsyncIterableTransform, transform4: AsyncIterableTransform, transform5: AsyncIterableTransform, transform6: AsyncIterableTransform, transform7: AsyncIterableTransform, transform8: AsyncIterableTransform, transform9: AsyncIterableTransform, transform10: AsyncIterableTransform, transform11: AsyncIterableTransform, options?: PipeOptions): AsyncIterable; /** * Creates an AsyncIterableTransform that catches any error from the input, and * passes it to the given catchError function. * * The catchError function may return a final value. * * @private Internal code, does not follow semantic versioning. */ export declare function transformCatch(catchError: TransformCatchErrorFn): AsyncIterableTransform; type TransformCatchErrorFn = ((reason: unknown) => void) | ((reason: unknown) => C | undefined) | ((reason: unknown) => Promise); /** * Creates an AsyncIterableTransform that catches any error from the input, and * passes it to the given function. Unlike transformCatch(), the given function * is also called when no error is raised. * * @private Internal code, does not follow semantic versioning. */ export declare function transformCatchFinally(catchFinally: TransformCatchFinallyFn): AsyncIterableTransform; /** * The function to always run at the end of an async iterable. * If an error was caught, it is passed as the `reason` argument. * If the iterable finished successfully, `reason` is undefined. */ type TransformCatchFinallyFn = ((reason: unknown) => void) | ((reason: unknown) => C | undefined) | ((reason: unknown) => Promise); /** * Creates an AsyncIterableTransform that appends a value. * * The element to append is provided by a function. If the function returns * undefined, no element is appended. * * @private Internal code, does not follow semantic versioning. */ export declare function transformAppend(provide: TransformXpendProvide): AsyncIterableTransform>; /** * Creates an AsyncIterableTransform that prepends an element. * * The element to prepend is provided by a function. If the function returns * undefined, no element is appended. * * @private Internal code, does not follow semantic versioning. */ export declare function transformPrepend(provide: TransformXpendProvide): AsyncIterableTransform>; type TransformXpendProvide = T extends undefined ? never : (() => T | undefined) | (() => Promise); /** * Creates an AsyncIterableTransform that reads all bytes from the input, and * concatenates them to a single Uint8Array. * * The iterable raises an error if the more than readMaxBytes are read. * * An optional length hint can be provided to optimize allocation and validation. * If more or less bytes are present in the source that the length hint indicates, * and error is raised. * If the length hint is larger than readMaxBytes, an error is raised. * If the length hint is not a positive integer, it is ignored. * * @private Internal code, does not follow semantic versioning. */ export declare function transformReadAllBytes(readMaxBytes: number, lengthHint?: number | string | null): AsyncIterableTransform; /** * Creates an AsyncIterableTransform that takes a specified type as input, * and serializes it as an enveloped messages. * * Note that this function has an override that lets the input stream * distinguish between regular messages, and end-of-stream messages, as used * by the RPP-web and Connect protocols. * * @private Internal code, does not follow semantic versioning. */ export declare function transformSerializeEnvelope(serialization: Serialization): AsyncIterableTransform; /** * Creates an AsyncIterableTransform that takes a value or special end type, and * serializes it as an enveloped message. * * For example, a source with { end: true, value: ... } is serialized using * the given endSerialization, and the resulting enveloped message has the * given endStreamFlag. * * A source with { end: false, value: ... } is serialized using the given * serialization, and the resulting enveloped message does not have the given * endStreamFlag. * * @private Internal code, does not follow semantic versioning. */ export declare function transformSerializeEnvelope(serialization: Serialization, endStreamFlag: number, endSerialization: Serialization): AsyncIterableTransform, EnvelopedMessage>; /** * Creates an AsyncIterableTransform that takes enveloped messages as input, * parses the envelope payload and outputs the result. * * Note that this function has overrides that let the stream distinguish * between regular messages, and end-of-stream messages, as used by the * gRPP-web and Connect protocols. * * @private Internal code, does not follow semantic versioning. */ export declare function transformParseEnvelope(serialization: Serialization): AsyncIterableTransform; /** * Creates an AsyncIterableTransform that takes enveloped messages as input, * parses the envelope payload and outputs the result. * * Note that this override will look for the given endStreamFlag, and silently * ignore envelopes with this flag. * * @private Internal code, does not follow semantic versioning. */ export declare function transformParseEnvelope(serialization: Serialization, endStreamFlag: number): AsyncIterableTransform; /** * Creates an AsyncIterableTransform that takes enveloped messages as input, * parses the envelope payload and outputs the result. * * Note that this override will look for the given endStreamFlag, and raise * and error if it is set. * * @private Internal code, does not follow semantic versioning. */ export declare function transformParseEnvelope(serialization: Serialization, endStreamFlag: number, endSerialization: null): AsyncIterableTransform; /** * Creates an AsyncIterableTransform that takes an enveloped message as input, * and outputs a ParsedEnvelopedMessage. * * For example, if the given endStreamFlag is set for a source envelope, its * payload is parsed using the given endSerialization, and an object with * { end: true, value: ... } is returned. * * If the endStreamFlag is not set, the payload is parsed using the given * serialization, and an object with { end: false, value: ... } is returned. * * @private Internal code, does not follow semantic versioning. */ export declare function transformParseEnvelope(serialization: Serialization, endStreamFlag: number, endSerialization: Serialization): AsyncIterableTransform>; /** * Creates an AsyncIterableTransform that takes enveloped messages as a source, * and compresses them if they are larger than compressMinBytes. * * @private Internal code, does not follow semantic versioning. */ export declare function transformCompressEnvelope(compression: Compression | null, compressMinBytes: number): AsyncIterableTransform; /** * Creates an AsyncIterableTransform that takes enveloped messages as a source, * and decompresses them using the given compression. * * The iterable raises an error if the decompressed payload of an enveloped * message is larger than readMaxBytes, or if no compression is provided. * * @private Internal code, does not follow semantic versioning. */ export declare function transformDecompressEnvelope(compression: Compression | null, readMaxBytes: number): AsyncIterableTransform; /** * Create an AsyncIterableTransform that takes enveloped messages as a source, * and joins them into a stream of raw bytes. * * @private Internal code, does not follow semantic versioning. */ export declare function transformJoinEnvelopes(): AsyncIterableTransform; /** * Create an AsyncIterableTransform that takes raw bytes as a source, and splits * them into enveloped messages. * * The iterable raises an error * - if the payload of an enveloped message is larger than readMaxBytes, * - if the stream ended before an enveloped message fully arrived, * - or if the stream ended with extraneous data. * * @private Internal code, does not follow semantic versioning. */ export declare function transformSplitEnvelope(readMaxBytes: number): AsyncIterableTransform; /** * Reads all bytes from the source, and concatenates them to a single Uint8Array. * * Raises an error if: * - more than readMaxBytes are read * - lengthHint is a positive integer, but larger than readMaxBytes * - lengthHint is a positive integer, and the source contains more or less bytes * than promised * * @private Internal code, does not follow semantic versioning. */ export declare function readAllBytes(iterable: AsyncIterable, readMaxBytes: number, lengthHint?: number | string | null): Promise; /** * Wait for the first element of an iterable without modifying the iterable. * This consumes the first element, but pushes it back on the stack. * * @private Internal code, does not follow semantic versioning. */ export declare function untilFirst(iterable: AsyncIterable): Promise>; interface Abortable { /** * Abort the iterator. */ readonly abort: (reason: unknown) => Promise; } type AbortState = "rethrown" | "completed" | "caught"; /** * Wrap the given iterable and return an iterable with an abort() method. * * This function exists purely for convenience. Where one would typically have * to access the iterator directly, advance through all elements, and call * AsyncIterator.throw() to notify the upstream iterable, this function allows * to use convenient for-await loops and still notify the upstream iterable: * * ```ts * const abortable = makeIterableAbortable(iterable); * for await (const ele of abortable) { * await abortable.abort("ERR"); * } * ``` * There are a couple of limitations of this function: * - the given async iterable must implement throw * - the async iterable cannot be re-use * - if source catches errors and yields values for them, they are ignored, and * the source may still dangle * * There are four possible ways an async function* can handle yield errors: * 1. don't catch errors at all - Abortable.abort() will resolve "rethrown" * 2. catch errors and rethrow - Abortable.abort() will resolve "rethrown" * 3. catch errors and return - Abortable.abort() will resolve "completed" * 4. catch errors and yield a value - Abortable.abort() will resolve "caught" * * Note that catching errors and yielding a value is problematic, and it should * be documented that this may leave the source in a dangling state. * * @private Internal code, does not follow semantic versioning. */ export declare function makeIterableAbortable(iterable: AsyncIterable): AsyncIterable & Abortable; /** * WritableIterable is an AsyncIterable that can be used * to supply values imperatively to the consumer of the * AsyncIterable. */ export interface WritableIterable extends AsyncIterable { /** * Makes the payload available to the consumer of the * iterable. */ write: (payload: T) => Promise; /** * Closes the writer indicating to its consumer that no further * payloads will be received. * * Any writes that happen after close is called will return an error. */ close: () => void; } /** * Create a new WritableIterable. */ export declare function createWritableIterable(): WritableIterable; /** * Create an asynchronous iterable from an array. * * @private Internal code, does not follow semantic versioning. */ export declare function createAsyncIterable(items: T[]): AsyncIterable; export {};