import { AsyncIterableStream, WriterStreamOptions, PipeStreamOptions, PipeStreamResult, ReadStreamOptions, AppendStreamOptions, RealtimeDefinedStream, InferStreamType, type RealtimeDefinedInputStream, type InferInputStreamType } from "@trigger.dev/core/v3"; /** * Pipes data to a realtime stream using the default stream key (`"default"`). * * This is a convenience overload that allows you to pipe data without specifying a stream key. * The stream will be created/accessed with the key `"default"`. * * @template T - The type of data chunks in the stream * @param value - The stream of data to pipe from. Can be an `AsyncIterable` or `ReadableStream`. * @param options - Optional configuration for the stream operation * @returns A promise that resolves to an object containing: * - `stream`: The original stream (can be consumed in your task) * - `waitUntilComplete`: A function that returns a promise resolving when the stream is fully sent * * @example * ```ts * import { streams } from "@trigger.dev/sdk"; * * // Stream OpenAI completion chunks to the default stream * const completion = await openai.chat.completions.create({ * model: "gpt-4", * messages: [{ role: "user", content: "Hello" }], * stream: true, * }); * * const { waitUntilComplete } = await streams.pipe(completion); * * // Process the stream locally * for await (const chunk of completion) { * console.log(chunk); * } * * // Or alternatievely wait for all chunks to be sent to the realtime stream * await waitUntilComplete(); * ``` */ declare function pipe(value: AsyncIterable | ReadableStream, options?: PipeStreamOptions): PipeStreamResult; /** * Pipes data to a realtime stream with a specific stream key. * * Use this overload when you want to use a custom stream key instead of the default. * * @template T - The type of data chunks in the stream * @param key - The unique identifier for this stream. If multiple streams use the same key, * they will be merged into a single stream. Defaults to `"default"` if not provided. * @param value - The stream of data to pipe from. Can be an `AsyncIterable` or `ReadableStream`. * @param options - Optional configuration for the stream operation * @returns A promise that resolves to an object containing: * - `stream`: The original stream (can be consumed in your task) * - `waitUntilComplete`: A function that returns a promise resolving when the stream is fully sent * * @example * ```ts * import { streams } from "@trigger.dev/sdk"; * * // Stream data to a specific stream key * const myStream = createAsyncGenerator(); * const { waitUntilComplete } = await streams.pipe("my-custom-stream", myStream); * * // Process the stream locally * for await (const chunk of myStream) { * console.log(chunk); * } * * // Wait for all chunks to be sent * await waitUntilComplete(); * ``` * * @example * ```ts * // Stream to a parent run * await streams.pipe("output", myStream, { * target: "parent", * }); * ``` */ declare function pipe(key: string, value: AsyncIterable | ReadableStream, options?: PipeStreamOptions): PipeStreamResult; /** * Reads data from a realtime stream using the default stream key (`"default"`). * * This is a convenience overload that allows you to read from the default stream without * specifying a stream key. The stream will be accessed with the key `"default"`. * * @template T - The type of data chunks in the stream * @param runId - The unique identifier of the run to read the stream from * @param options - Optional configuration for reading the stream * @returns A promise that resolves to an `AsyncIterableStream` that can be consumed * using `for await...of` or as a `ReadableStream`. * * @example * ```ts * import { streams } from "@trigger.dev/sdk/v3"; * * // Read from the default stream * const stream = await streams.read(runId); * * for await (const chunk of stream) { * console.log("Received chunk:", chunk); * } * ``` * * @example * ```ts * // Read with custom timeout and starting position * const stream = await streams.read(runId, { * timeoutInSeconds: 120, * startIndex: 10, // Start from the 10th chunk * }); * ``` */ declare function read(runId: string, options?: ReadStreamOptions): Promise>; /** * Reads data from a realtime stream with a specific stream key. * * Use this overload when you want to read from a stream with a custom key. * * @template T - The type of data chunks in the stream * @param runId - The unique identifier of the run to read the stream from * @param key - The unique identifier of the stream to read from. Defaults to `"default"` if not provided. * @param options - Optional configuration for reading the stream * @returns A promise that resolves to an `AsyncIterableStream` that can be consumed * using `for await...of` or as a `ReadableStream`. * * @example * ```ts * import { streams } from "@trigger.dev/sdk"; * * // Read from a specific stream key * const stream = await streams.read(runId, "my-custom-stream"); * * for await (const chunk of stream) { * console.log("Received chunk:", chunk); * } * ``` * * @example * ```ts * // Read with signal for cancellation * const controller = new AbortController(); * const stream = await streams.read(runId, "my-stream", { * signal: controller.signal, * timeoutInSeconds: 30, * }); * * // Cancel after 5 seconds * setTimeout(() => controller.abort(), 5000); * ``` */ declare function read(runId: string, key: string, options?: ReadStreamOptions): Promise>; declare function append(value: TPart, options?: AppendStreamOptions): Promise; declare function append(key: string, value: TPart, options?: AppendStreamOptions): Promise; /** * Writes data to a realtime stream using the default stream key (`"default"`). * * This is a convenience overload that allows you to write to the default stream without * specifying a stream key. The stream will be created/accessed with the key `"default"`. * * @template TPart - The type of data chunks in the stream * @param options - The options for writing to the stream * @returns A promise that resolves to an object containing: * - `stream`: The original stream (can be consumed in your task) * - `waitUntilComplete`: A function that returns a promise resolving when the stream is fully sent * * @example * ```ts * import { streams } from "@trigger.dev/sdk"; * * // Write to the default stream * const { waitUntilComplete } = await streams.writer({ * execute: ({ write, merge }) => { * write("chunk 1"); * write("chunk 2"); * write("chunk 3"); * }, * }); * * // Wait for all chunks to be written * await waitUntilComplete(); * ``` * * @example * ```ts * // Write to a specific stream key * const { waitUntilComplete } = await streams.writer("my-custom-stream", { * execute: ({ write, merge }) => { * write("chunk 1"); * write("chunk 2"); * write("chunk 3"); * }, * }); * * // Wait for all chunks to be written * await waitUntilComplete(); * ``` * * @example * ```ts * // Write to a parent run * await streams.writer("output", { * execute: ({ write, merge }) => { * write("chunk 1"); * write("chunk 2"); * write("chunk 3"); * }, * }); * * // Wait for all chunks to be written * await waitUntilComplete(); * ``` * * @example * ```ts * // Write to a specific stream key * await streams.writer("my-custom-stream", { * execute: ({ write, merge }) => { * write("chunk 1"); * write("chunk 2"); * write("chunk 3"); * }, * }); * * // Wait for all chunks to be written * await waitUntilComplete(); * ``` */ declare function writer(options: WriterStreamOptions): PipeStreamResult; /** * Writes data to a realtime stream with a specific stream key. * * @template TPart - The type of data chunks in the stream * @param key - The unique identifier of the stream to write to. Defaults to `"default"` if not provided. * @param options - The options for writing to the stream * @returns A promise that resolves to an object containing: * - `stream`: The original stream (can be consumed in your task) * - `waitUntilComplete`: A function that returns a promise resolving when the stream is fully sent * * @example * ```ts * import { streams } from "@trigger.dev/sdk"; * * // Write to a specific stream key * const { waitUntilComplete } = await streams.writer("my-custom-stream", { * execute: ({ write, merge }) => { * write("chunk 1"); * write("chunk 2"); * write("chunk 3"); * }, * }); * * // Wait for all chunks to be written * await waitUntilComplete(); * ``` */ declare function writer(key: string, options: WriterStreamOptions): PipeStreamResult; export type RealtimeDefineStreamOptions = { id: string; }; declare function define(opts: RealtimeDefineStreamOptions): RealtimeDefinedStream; export type { InferStreamType, InferInputStreamType }; /** * Define an input stream that can receive typed data from external callers. * * Inside a task, use `.on()`, `.once()`, or `.peek()` to receive data. * Outside a task (e.g., from your backend), use `.send(runId, data)` to send data. * * @template TData - The type of data this input stream receives * @param opts - Options including a unique `id` for this input stream * * @example * ```ts * import { streams, task } from "@trigger.dev/sdk"; * * const approval = streams.input<{ approved: boolean; reviewer: string }>({ id: "approval" }); * * export const myTask = task({ * id: "my-task", * run: async (payload) => { * // Wait for the next approval * const data = await approval.once().unwrap(); * console.log(data.approved, data.reviewer); * }, * }); * * // From your backend: * // await approval.send(runId, { approved: true, reviewer: "alice" }); * ``` */ declare function input(opts: { id: string; }): RealtimeDefinedInputStream; export declare const streams: { pipe: typeof pipe; read: typeof read; append: typeof append; writer: typeof writer; define: typeof define; input: typeof input; };