import type { StepResult } from '../_types/@internal_ai-sdk-v5/dist/index.js'; import type { MastraDBMessage } from '../agent/message-list/index.js'; import { MessageList } from '../agent/message-list/index.js'; import type { TripWireOptions } from '../agent/trip-wire.js'; import type { IMastraLogger } from '../logger/index.js'; import { SpanType } from '../observability/index.js'; import type { ObservabilityContext, Span } from '../observability/index.js'; import type { RequestContext } from '../request-context/index.js'; import type { ChunkType } from '../stream/index.js'; import type { MastraModelOutput } from '../stream/base/output.js'; import type { ErrorProcessorOrWorkflow, OutputResult, ProcessInputStepResult, Processor, ProcessorMessageResult, ProcessorStreamWriter, ProcessorWorkflow, RunProcessInputStepArgs, RunProcessInputStepResult, ToolCallInfo } from './index.js'; /** * Implementation of processor state management */ /** * Tracks state for stream processing across chunks. * Used by both legacy processors and workflow processors. */ export declare class ProcessorState { private inputAccumulatedText; private outputAccumulatedText; private outputChunkCount; customState: Record; streamParts: ChunkType[]; span?: Span; constructor(options?: { processorName?: string; processorIndex?: number; createSpan?: boolean; } & Partial); /** Track incoming chunk (before processor transformation) */ addInputPart(part: ChunkType): void; /** Track outgoing chunk (after processor transformation) */ addOutputPart(part: ChunkType | null | undefined): void; /** Get final output for span */ getFinalOutput(): { totalChunks: number; accumulatedText: string; }; } /** * Union type for processor or workflow that can be used as a processor */ type ProcessorOrWorkflow = Processor | ProcessorWorkflow; export declare class ProcessorRunner { readonly inputProcessors: ProcessorOrWorkflow[]; readonly outputProcessors: ProcessorOrWorkflow[]; readonly errorProcessors: ErrorProcessorOrWorkflow[]; private readonly logger; private readonly agentName; /** * Shared processor state that persists across loop iterations. * Used by all processor methods (input and output) to share state. * Keyed by processor ID. */ private readonly processorStates; constructor({ inputProcessors, outputProcessors, errorProcessors, logger, agentName, processorStates, }: { inputProcessors?: ProcessorOrWorkflow[]; outputProcessors?: ProcessorOrWorkflow[]; errorProcessors?: ErrorProcessorOrWorkflow[]; logger: IMastraLogger; agentName: string; processorStates?: Map; }); /** * Get or create ProcessorState for the given processor ID. * This state persists across loop iterations and is shared between * all processor methods (input and output). */ private getProcessorState; /** * Execute a workflow as a processor and handle the result. * Returns the processed messages and any tripwire information. */ private executeWorkflowAsProcessor; runOutputProcessors(messageList: MessageList, observabilityContext?: ObservabilityContext, requestContext?: RequestContext, retryCount?: number, writer?: ProcessorStreamWriter, result?: OutputResult): Promise; /** * Process a stream part through all output processors with state management */ processPart(part: ChunkType, processorStates: Map>, observabilityContext?: ObservabilityContext, requestContext?: RequestContext, messageList?: MessageList, retryCount?: number, writer?: ProcessorStreamWriter): Promise<{ part: ChunkType | null | undefined; blocked: boolean; reason?: string; tripwireOptions?: TripWireOptions; processorId?: string; }>; runOutputProcessorsForStream(streamResult: MastraModelOutput, observabilityContext?: ObservabilityContext, writer?: ProcessorStreamWriter): Promise>; runInputProcessors(messageList: MessageList, observabilityContext?: ObservabilityContext, requestContext?: RequestContext, retryCount?: number): Promise; /** * Run processInputStep for all processors that implement it. * Called at each step of the agentic loop, before the LLM is invoked. * * Unlike processInput which runs once at the start, this runs at every step * (including tool call continuations). This is useful for: * - Transforming message types between steps (e.g., AI SDK 'reasoning' -> Anthropic 'thinking') * - Modifying messages based on step context * - Implementing per-step message transformations * * @param args.messages - The current messages to be sent to the LLM (MastraDBMessage format) * @param args.messageList - MessageList instance for managing message sources * @param args.stepNumber - The current step number (0-indexed) * @param args.tracingContext - Optional tracing context for observability * @param args.requestContext - Optional runtime context with execution metadata * * @returns The processed MessageList */ runProcessInputStep(args: RunProcessInputStepArgs): Promise; /** * Type guard to check if result is { messages, systemMessages } */ private isProcessInputResultWithSystemMessages; /** * Run processOutputStep for all processors that implement it. * Called after each LLM response in the agentic loop, before tool execution. * * Unlike processOutputResult which runs once at the end, this runs at every step. * This is the ideal place to implement guardrails that can trigger retries. * * @param args.messages - The current messages including the LLM response * @param args.messageList - MessageList instance for managing message sources * @param args.stepNumber - The current step number (0-indexed) * @param args.finishReason - The finish reason from the LLM * @param args.toolCalls - Tool calls made in this step (if any) * @param args.text - Generated text from this step * @param args.tracingContext - Optional tracing context for observability * @param args.requestContext - Optional runtime context with execution metadata * @param args.retryCount - Number of times processors have triggered retry * * @returns The processed MessageList */ runProcessOutputStep(args: { steps: Array>; messages: MastraDBMessage[]; messageList: MessageList; stepNumber: number; finishReason?: string; toolCalls?: ToolCallInfo[]; text?: string; requestContext?: RequestContext; retryCount?: number; writer?: ProcessorStreamWriter; } & Partial): Promise; /** * Run processAPIError on all processors that implement it. * Called when an LLM API call fails with a non-retryable error. * Iterates through both input and output processors. * * @returns { retry: boolean } indicating whether to retry the LLM call */ runProcessAPIError(args: { error: unknown; messages: MastraDBMessage[]; messageList: MessageList; stepNumber: number; steps: Array>; messageId?: string; requestContext?: RequestContext; retryCount?: number; writer?: ProcessorStreamWriter; abortSignal?: AbortSignal; rotateResponseMessageId?: () => string; } & Partial): Promise<{ retry: boolean; }>; static applyMessagesToMessageList(messages: MastraDBMessage[], messageList: MessageList, idsBeforeProcessing: string[], check: ReturnType, defaultSource?: 'input' | 'response'): void; static validateAndFormatProcessInputStepResult(result: ProcessInputStepResult | Awaited | undefined | void, { messageList, processor, stepNumber, }: { messageList: MessageList; processor: Processor; stepNumber: number; }): Promise; } export {}; //# sourceMappingURL=runner.d.ts.map