import type { StepResult, StreamTextResult, ToolSet, UIMessage as AIUIMessage, } from "ai"; import { streamText as streamTextAi } from "ai"; import { compressUIMessageChunks, DeltaStreamer, mergeTransforms, type StreamingOptions, } from "./streaming.js"; import type { ActionCtx, AgentComponent, AgentPrompt, GenerationOutputMetadata, Options, Output, } from "./types.js"; import { startGeneration } from "./start.js"; import type { Agent } from "./index.js"; import { getModelName, getProviderName } from "../shared.js"; import { errorToString, willContinue } from "./utils.js"; /** * This behaves like {@link streamText} from the "ai" package except that * it add context based on the userId and threadId and saves the input and * resulting messages to the thread, if specified. * Use {@link continueThread} to get a version of this function already scoped * to a thread (and optionally userId). */ export async function streamText< TOOLS extends ToolSet, OUTPUT extends Output = never, >( ctx: ActionCtx, component: AgentComponent, /** * The arguments to the streamText function, similar to the ai sdk's * {@link streamText} function, along with Agent prompt options. */ streamTextArgs: AgentPrompt & Omit< Parameters>[0], "model" | "prompt" | "messages" > & { /** * The tools to use for the tool calls. This will override tools specified * in the Agent constructor or createThread / continueThread. */ tools?: TOOLS; }, /** * The {@link ContextOptions} and {@link StorageOptions} * options to use for fetching contextual messages and saving input/output messages. */ options: Options & { agentName: string; userId?: string | null; threadId?: string; /** * Whether to save incremental data (deltas) from streaming responses. * Defaults to false. * If false, it will not save any deltas to the database. * If true, it will save deltas with {@link DEFAULT_STREAMING_OPTIONS}. * * Regardless of this option, when streaming you are able to use this * `streamText` function as you would with the "ai" package's version: * iterating over the text, streaming it over HTTP, etc. */ saveStreamDeltas?: boolean | StreamingOptions; agentForToolCtx?: Agent; }, ): Promise & GenerationOutputMetadata> { const { threadId } = options ?? {}; const { args, userId, order, stepOrder, promptMessageId, ...call } = await startGeneration(ctx, component, streamTextArgs, options); const steps: StepResult[] = []; // Track the final step for atomic save with stream finish (issue #181) let pendingFinalStep: StepResult | undefined; const streamer = threadId && options.saveStreamDeltas ? new DeltaStreamer( component, ctx, { throttleMs: typeof options.saveStreamDeltas === "object" ? options.saveStreamDeltas.throttleMs : undefined, onAsyncAbort: call.fail, compress: compressUIMessageChunks, abortSignal: args.abortSignal, }, { threadId, userId, agentName: options?.agentName, model: getModelName(args.model), provider: getProviderName(args.model), providerOptions: args.providerOptions, format: "UIMessageChunk", order, stepOrder, }, ) : undefined; const result = streamTextAi({ ...args, abortSignal: streamer?.abortController.signal ?? args.abortSignal, experimental_transform: mergeTransforms( options?.saveStreamDeltas, streamTextArgs.experimental_transform, ), onError: async (error) => { console.error("onError", error); await call.fail(errorToString(error.error)); await streamer?.fail(errorToString(error.error)); return streamTextArgs.onError?.(error); }, prepareStep: async (options) => { const result = await streamTextArgs.prepareStep?.(options); if (result) { const model = result.model ?? options.model; call.updateModel(model); // streamer?.updateMetadata({ // model: getModelName(model), // provider: getProviderName(model), // providerOptions: options.messages.at(-1)?.providerOptions, // }); return result; } return undefined; }, onStepFinish: async (step) => { steps.push(step); const createPendingMessage = await willContinue(steps, args.stopWhen); if (!createPendingMessage && streamer) { // This is the final step with streaming enabled. // Defer saving until stream consumption completes for atomic finish (issue #181). streamer.markFinishedExternally(); pendingFinalStep = step; } else { await call.save({ step }, createPendingMessage); } return args.onStepFinish?.(step); }, }) as StreamTextResult; const stream = streamer?.consumeStream( result.toUIMessageStream>(), ); if ( (typeof options?.saveStreamDeltas === "object" && !options.saveStreamDeltas.returnImmediately) || options?.saveStreamDeltas === true ) { try { await stream; await result.consumeStream(); } catch (e) { // If the stream errored (e.g. onStepFinish threw), the DeltaStreamer's // finish() was never called, leaving the streaming message stuck in // "streaming" state. Clean it up by marking it as aborted. await streamer?.fail(e instanceof Error ? e.message : String(e)); // Save the deferred final step if it was already generated but not yet persisted if (pendingFinalStep) { try { await call.save({ step: pendingFinalStep }, false); } catch (saveError) { console.error("Failed to save deferred final step:", saveError); } pendingFinalStep = undefined; } throw e; } } // If we deferred the final step save, do it now with atomic stream finish. if (pendingFinalStep && streamer) { const finishStreamId = await streamer.getOrCreateStreamId(); await call.save({ step: pendingFinalStep }, false, finishStreamId); } const metadata: GenerationOutputMetadata = { promptMessageId, order, savedMessages: call.getSavedMessages(), messageId: promptMessageId, }; return Object.assign(result, metadata); }