import type { LanguageModelV2Prompt, LanguageModelV2CallWarning } from '../_types/@ai-sdk_provider-v5/dist/index.d.ts'; import type { StepResult } from '../_types/@internal_ai-sdk-v5/dist/index.d.ts'; import type { MastraDBMessage } from '../agent/message-list/index.js'; import { MessageList } from '../agent/message-list/index.js'; import type { AgentSignalInput, CreatedAgentSignal } from '../agent/signals.js'; import type { TripWireOptions } from '../agent/trip-wire.js'; import type { IMastraLogger } from '../logger/index.js'; import { SpanType } from '../observability/index.js'; import type { ObservabilityContext, Span } from '../observability/index.js'; import type { TracingContext } from '../observability/types/index.js'; import type { RequestContext } from '../request-context/index.js'; import type { ChunkType } from '../stream/index.js'; import type { MastraModelOutput } from '../stream/base/output.js'; import type { LanguageModelUsage } from '../stream/types.js'; import type { CachedLLMStepChunk, CachedLLMStepResponse, ErrorProcessorOrWorkflow, OutputResult, ProcessInputStepResult, Processor, ProcessorMessageResult, ProcessorStreamWriter, ProcessorWorkflow, RunProcessInputStepArgs, RunProcessInputStepResult, ToolCallInfo } from './index.js'; /** * Implementation of processor state management */ /** * Tracks state for stream processing across chunks. * Used by both legacy processors and workflow processors. */ export declare class ProcessorState { private inputAccumulatedText; private outputAccumulatedText; private outputChunkCount; customState: Record; streamParts: ChunkType[]; span?: Span; constructor(options?: { processorName?: string; processorIndex?: number; createSpan?: boolean; } & Partial); /** Track incoming chunk (before processor transformation) */ addInputPart(part: ChunkType): void; /** Track outgoing chunk (after processor transformation) */ addOutputPart(part: ChunkType | null | undefined): void; /** Get final output for span */ getFinalOutput(): { totalChunks: number; accumulatedText: string; }; } /** * Union type for processor or workflow that can be used as a processor */ type ProcessorOrWorkflow = Processor | ProcessorWorkflow; export declare function createProcessorSendSignal(args: { messageList: MessageList; writer?: ProcessorStreamWriter; rotateResponseMessageId?: () => string; }): (signalInput: AgentSignalInput) => Promise; export declare class ProcessorRunner { readonly inputProcessors: ProcessorOrWorkflow[]; readonly outputProcessors: ProcessorOrWorkflow[]; readonly errorProcessors: ErrorProcessorOrWorkflow[]; private readonly logger; private readonly agentName; /** * Shared processor state that persists across loop iterations. * Used by all processor methods (input and output) to share state. * Keyed by processor ID. */ private readonly processorStates; constructor({ inputProcessors, outputProcessors, errorProcessors, logger, agentName, processorStates, }: { inputProcessors?: ProcessorOrWorkflow[]; outputProcessors?: ProcessorOrWorkflow[]; errorProcessors?: ErrorProcessorOrWorkflow[]; logger: IMastraLogger; agentName: string; processorStates?: Map; }); /** * Get or create ProcessorState for the given processor ID. * This state persists across loop iterations and is shared between * all processor methods (input and output). */ private getProcessorState; private runComputeStateSignal; private runWorkflowComputeStateSignals; /** * Execute a workflow as a processor and handle the result. * Returns the processed messages and any tripwire information. */ private executeWorkflowAsProcessor; runOutputProcessors(messageList: MessageList, observabilityContext?: ObservabilityContext, requestContext?: RequestContext, retryCount?: number, writer?: ProcessorStreamWriter, result?: OutputResult): Promise; /** * Process a stream part through all output processors with state management */ processPart(part: ChunkType, processorStates: Map>, observabilityContext?: ObservabilityContext, requestContext?: RequestContext, messageList?: MessageList, retryCount?: number, writer?: ProcessorStreamWriter): Promise<{ part: ChunkType | null | undefined; blocked: boolean; reason?: string; tripwireOptions?: TripWireOptions; processorId?: string; }>; /** * Re-drive any parts that stream processors stashed for reprocessing through * the full output processor chain. * * A stream processor can only return one part from `processOutputStream`, but * some processors (e.g. `BatchPartsProcessor`) need to emit a second part for * one input — it returns the first part and stashes the second under * `REPROCESS_PART_KEY` on its state. After the primary part has been emitted, * callers invoke this to push each stashed part back through the whole chain * (so it receives downstream processing) and emit the results in order. * * Returns the processed results in emission order. Reprocessing can itself * stash more parts, so this drains until none remain. */ drainReprocessParts(processorStates: Map>, observabilityContext?: ObservabilityContext, requestContext?: RequestContext, messageList?: MessageList, retryCount?: number, writer?: ProcessorStreamWriter): Promise | null | undefined; blocked: boolean; reason?: string; tripwireOptions?: TripWireOptions; processorId?: string; }>>; runOutputProcessorsForStream(streamResult: MastraModelOutput, observabilityContext?: ObservabilityContext, writer?: ProcessorStreamWriter): Promise>; runInputProcessors(messageList: MessageList, observabilityContext?: ObservabilityContext, requestContext?: RequestContext, retryCount?: number): Promise; /** * Run processInputStep for all processors that implement it. * Called at each step of the agentic loop, before the LLM is invoked. * * Unlike processInput which runs once at the start, this runs at every step * (including tool call continuations). This is useful for: * - Transforming message types between steps (e.g., AI SDK 'reasoning' -> Anthropic 'thinking') * - Modifying messages based on step context * - Implementing per-step message transformations * * @param args.messages - The current messages to be sent to the LLM (MastraDBMessage format) * @param args.messageList - MessageList instance for managing message sources * @param args.stepNumber - The current step number (0-indexed) * @param args.tracingContext - Optional tracing context for observability * @param args.requestContext - Optional runtime context with execution metadata * * @returns The processed MessageList */ runProcessInputStep(args: RunProcessInputStepArgs): Promise; /** * Run processLLMRequest for all processors that implement it. * * Called *after* `MessageList` has been converted to `LanguageModelV2Prompt` * and immediately *before* the prompt is forwarded to the provider. * Mutations are scoped to this single call — they do not affect the * persisted message list, memory, UI, or future model swaps. */ runProcessLLMRequest(args: { prompt: LanguageModelV2Prompt; model: unknown; stepNumber: number; steps: Array>; requestContext?: RequestContext; retryCount?: number; abortSignal?: AbortSignal; tracingContext?: TracingContext; writer?: ProcessorStreamWriter; }): Promise<{ prompt: LanguageModelV2Prompt; response?: CachedLLMStepResponse; }>; /** * Run processLLMResponse for all processors that implement it. * * Called *after* the LLM step completes (or after a cached response is * replayed) and *after* output processors have collected the response * chunks. The shared `state` object is the same instance passed to * `processLLMRequest` for the same step, allowing processors to correlate * pre- and post-call work (e.g. cache key stash, then cache write). */ runProcessLLMResponse(args: { chunks: CachedLLMStepChunk[]; model: unknown; stepNumber: number; steps: Array>; warnings?: LanguageModelV2CallWarning[]; request?: unknown; rawResponse?: unknown; fromCache: boolean; requestContext?: RequestContext; retryCount?: number; abortSignal?: AbortSignal; tracingContext?: TracingContext; writer?: ProcessorStreamWriter; }): Promise; /** * Type guard to check if result is { messages, systemMessages } */ private isProcessInputResultWithSystemMessages; /** * Run processOutputStep for all processors that implement it. * Called after each LLM response in the agentic loop, before tool execution. * * Unlike processOutputResult which runs once at the end, this runs at every step. * This is the ideal place to implement guardrails that can trigger retries. * * @param args.messages - The current messages including the LLM response * @param args.messageList - MessageList instance for managing message sources * @param args.stepNumber - The current step number (0-indexed) * @param args.finishReason - The finish reason from the LLM * @param args.toolCalls - Tool calls made in this step (if any) * @param args.text - Generated text from this step * @param args.tracingContext - Optional tracing context for observability * @param args.requestContext - Optional runtime context with execution metadata * @param args.retryCount - Number of times processors have triggered retry * * @returns The processed MessageList */ runProcessOutputStep(args: { steps: Array>; messages: MastraDBMessage[]; messageList: MessageList; stepNumber: number; finishReason?: string; toolCalls?: ToolCallInfo[]; text?: string; usage?: LanguageModelUsage; requestContext?: RequestContext; retryCount?: number; writer?: ProcessorStreamWriter; } & Partial): Promise; /** * Run processAPIError on all processors that implement it. * Called when an LLM API call fails with a non-retryable error. * Iterates through both input and output processors. * * @returns { retry: boolean } indicating whether to retry the LLM call */ runProcessAPIError(args: { error: unknown; messages: MastraDBMessage[]; messageList: MessageList; stepNumber: number; steps: Array>; messageId?: string; requestContext?: RequestContext; retryCount?: number; writer?: ProcessorStreamWriter; abortSignal?: AbortSignal; rotateResponseMessageId?: () => string; } & Partial): Promise<{ retry: boolean; }>; static applyMessagesToMessageList(messages: MastraDBMessage[], messageList: MessageList, idsBeforeProcessing: string[], check: ReturnType, defaultSource?: 'input' | 'response'): void; static validateAndFormatProcessInputStepResult(result: ProcessInputStepResult | Awaited | undefined | void, { messageList, processor, stepNumber, }: { messageList: MessageList; processor: Processor; stepNumber: number; }): Promise; } export {}; //# sourceMappingURL=runner.d.ts.map