import type { LanguageModelV2 } from '@ai-sdk/provider-v5'; import type { CoreMessage as CoreMessageV4 } from '../_types/@internal_ai-sdk-v4/dist/index.js'; import type { CallSettings, StepResult, ToolChoice } from '../_types/@internal_ai-sdk-v5/dist/index.js'; import type { MessageList, MastraDBMessage } from '../agent/message-list/index.js'; import type { TripWireOptions } from '../agent/trip-wire.js'; import type { ModelRouterModelId } from '../llm/model/index.js'; import type { MastraLanguageModel, OpenAICompatibleConfig, SharedProviderOptions } from '../llm/model/shared.types.js'; import type { Mastra } from '../mastra/index.js'; import type { ObservabilityContext } from '../observability/index.js'; import type { RequestContext } from '../request-context/index.js'; import type { InferStandardSchemaOutput, StandardSchemaWithJSON } from '../schema/index.js'; import type { ChunkType } from '../stream/index.js'; import type { DataChunkType, LanguageModelUsage, LLMStepResult } from '../stream/types.js'; import type { Workflow } from '../workflows/index.js'; import type { StructuredOutputOptions } from './processors/index.js'; import type { ProcessorStepOutput } from './step-schema.js'; /** * Writer interface for processors to emit custom data chunks to the stream. * This enables real-time streaming of processor-specific data (e.g., observation markers). */ export interface ProcessorStreamWriter { /** * Emit a custom data chunk to the stream. * The chunk type must start with 'data-' prefix. * @param data - The data chunk to emit */ custom(data: T extends { type: `data-${string}`; } ? DataChunkType : T): Promise; } /** * Base context shared by all processor methods */ export interface ProcessorContext extends Partial { /** * Function to abort processing with an optional reason and options. * @param reason - The reason for aborting * @param options - Options including retry flag and metadata */ abort: (reason?: string, options?: TripWireOptions) => never; /** Optional runtime context with execution metadata */ requestContext?: RequestContext; /** * Number of times processors have triggered retry for this generation. * Use this to implement retry limits within your processor. */ retryCount: number; /** * Optional stream writer for emitting custom data chunks. * Available when the agent is streaming and outputWriter is provided. * Use writer.custom() to emit data-* chunks that will be streamed to the client. */ writer?: ProcessorStreamWriter; /** * Optional abort signal from the parent agent execution. * Processors should pass this to any long-running operations (e.g., LLM calls) * so they can be canceled when the parent agent is aborted. */ abortSignal?: AbortSignal; } /** * Context for message-based processor methods (processInput, processOutputResult, processInputStep) */ export interface ProcessorMessageContext extends ProcessorContext { /** The current messages being processed */ messages: MastraDBMessage[]; /** MessageList instance for managing message sources */ messageList: MessageList; } /** * Return type for processInput that includes modified system messages */ export interface ProcessInputResultWithSystemMessages { messages: MastraDBMessage[]; systemMessages: CoreMessageV4[]; } /** * Return type for message-based processor methods * - MessageList: Return the same messageList instance passed in (indicates you've mutated it) * - MastraDBMessage[]: Return transformed messages array (for simple transformations) */ export type ProcessorMessageResult = Promise | MessageList | MastraDBMessage[]; /** * Possible return types from processInput */ export type ProcessInputResult = MessageList | MastraDBMessage[] | ProcessInputResultWithSystemMessages; /** * Arguments for processInput method */ export interface ProcessInputArgs extends ProcessorMessageContext { /** All system messages (agent instructions, user-provided, memory) for read/modify access */ systemMessages: CoreMessageV4[]; /** Per-processor state that persists across all method calls within this request */ state: Record; } /** * Resolved generation result passed to processOutputResult. * Contains the same data available in the onFinish callback. */ export interface OutputResult { /** The accumulated text from all steps */ text: string; /** Token usage (cumulative across all steps) */ usage: LanguageModelUsage; /** Why the generation finished (e.g. 'stop', 'tool-calls', 'length') */ finishReason: string; /** All LLM step results (each contains text, toolCalls, toolResults, usage, sources, files, reasoning, etc.) */ steps: LLMStepResult[]; } /** * Arguments for processOutputResult method */ export interface ProcessOutputResultArgs extends ProcessorMessageContext { /** Per-processor state that persists across all method calls within this request */ state: Record; /** Resolved generation result with usage, text, steps, and finish reason */ result: OutputResult; } /** * Arguments for processInputStep method * * Note: structuredOutput.schema is typed as OutputSchema (not the specific OUTPUT type) because * processors run in a chain and any previous processor may have modified structuredOutput. * The actual schema type is only known at the generate()/stream() call site. */ export interface ProcessInputStepArgs extends ProcessorMessageContext { /** The current step number (0-indexed) */ stepNumber: number; steps: Array>; /** The active assistant response message ID for this step, when this processor is running inside an agent loop */ messageId?: string; /** Mark the current assistant response message ID as complete and rotate to a fresh one, when supported by the caller */ rotateResponseMessageId?: () => string; /** All system messages (agent instructions, user-provided, memory) for read/modify access */ systemMessages: CoreMessageV4[]; /** Per-processor state that persists across all method calls within this request */ state: Record; /** * Current model for this step. * Can be a resolved MastraLanguageModelV2 or an unresolved config (string, OpenAI-compatible config). */ model: MastraLanguageModel; /** Current tools available for this step */ tools?: Record; toolChoice?: ToolChoice; activeTools?: string[]; providerOptions?: SharedProviderOptions; modelSettings?: Omit; /** * Structured output configuration. The schema type is StandardSchemaWithJSON (not the specific OUTPUT) * because processors can modify it, and the actual type is only known at runtime. */ structuredOutput?: StructuredOutputOptions>; /** * Number of times processors have triggered retry for this generation. * Use this to implement retry limits within your processor. */ retryCount: number; } export type RunProcessInputStepArgs = Omit & { messageId?: string; rotateResponseMessageId?: () => string; retryCount?: number; }; /** * Result from processInputStep method * * Note: structuredOutput.schema is typed as StandardSchemaWithJSON (not the specific OUTPUT type) because * processors can modify it dynamically, and the actual type is only known at runtime. */ export type ProcessInputStepResult = { model?: LanguageModelV2 | ModelRouterModelId | OpenAICompatibleConfig | MastraLanguageModel; /** Override the active assistant response message ID for this step */ messageId?: string; /** Replace tools for this step - accepts both AI SDK tools and Mastra createTool results */ tools?: Record; toolChoice?: ToolChoice; activeTools?: string[]; messages?: MastraDBMessage[]; messageList?: MessageList; /** Replace all system messages with these */ systemMessages?: CoreMessageV4[]; providerOptions?: SharedProviderOptions; modelSettings?: Omit; /** * Structured output configuration. The schema type is StandardSchemaWithJSON (not the specific OUTPUT) * because processors can modify it, and the actual type is only known at runtime. */ structuredOutput?: StructuredOutputOptions>; /** * Number of times processors have triggered retry for this generation. * Use this to implement retry limits within your processor. */ retryCount?: number; }; export type RunProcessInputStepResult = Omit & { model?: MastraLanguageModel; }; /** * Arguments for processOutputStream method */ export interface ProcessOutputStreamArgs extends ProcessorContext { /** The current chunk being processed */ part: ChunkType; /** All chunks seen so far */ streamParts: ChunkType[]; /** Mutable state object that persists across chunks */ state: Record; /** Optional MessageList instance for accessing conversation history */ messageList?: MessageList; } /** * Tool call information for processOutputStep */ export interface ToolCallInfo { toolName: string; toolCallId: string; args: unknown; } /** * Arguments for processOutputStep method. * Called after each LLM response in the agentic loop, before tool execution. */ export interface ProcessOutputStepArgs extends ProcessorMessageContext { /** The current step number (0-indexed) */ stepNumber: number; /** The finish reason from the LLM (stop, tool-use, length, etc.) */ finishReason?: string; /** Tool calls made in this step (if any) */ toolCalls?: ToolCallInfo[]; /** Generated text from this step */ text?: string; /** All system messages */ systemMessages: CoreMessageV4[]; /** All completed steps so far (including the current step) */ steps: Array>; /** Mutable state object that persists across steps */ state: Record; } /** * Arguments for processAPIError method. * Called when the LLM API call fails with a non-retryable error (API rejection). * This is distinct from network errors or retryable server errors (which are handled by p-retry). */ export interface ProcessAPIErrorArgs extends ProcessorMessageContext { /** The error that occurred during the LLM API call */ error: unknown; /** The current step number (0-indexed) */ stepNumber: number; /** All completed steps so far */ steps: Array>; /** The active assistant response message ID for this step, when this processor is running inside an agent loop */ messageId?: string; /** Mark the current assistant response message ID as complete and rotate to a fresh one, when supported by the caller */ rotateResponseMessageId?: () => string; /** Per-processor state that persists across all method calls within this request */ state: Record; /** The current retry count for this error handler */ retryCount: number; } /** * Result from processAPIError method. */ export type ProcessAPIErrorResult = { /** Whether to retry the LLM call after applying modifications */ retry: boolean; }; /** * Processor interface for transforming messages and stream chunks. * * @template TId - The processor's unique identifier type * @template TTripwireMetadata - The type of metadata passed when calling abort() */ export interface Processor { readonly id: TId; readonly name?: string; readonly description?: string; /** Index of this processor in the workflow (set at runtime when combining processors) */ processorIndex?: number; /** When true, this processor will also receive `data-*` chunks in processOutputStream. Default: false. */ processDataParts?: boolean; /** * Process input messages before they are sent to the LLM * * @returns Either: * - MessageList: The same messageList instance passed in (indicates you've mutated it) * - MastraDBMessage[]: Transformed messages array (for simple transformations) * - { messages, systemMessages }: Object with both messages and modified system messages */ processInput?(args: ProcessInputArgs): Promise | ProcessInputResult; /** * Process output stream chunks with built-in state management * This allows processors to accumulate chunks and make decisions based on larger context * Return null or undefined to skip emitting the part */ processOutputStream?(args: ProcessOutputStreamArgs): Promise; /** * Process the complete output result after streaming/generate is finished * * @returns Either: * - MessageList: The same messageList instance passed in (indicates you've mutated it) * - MastraDBMessage[]: Transformed messages array (for simple transformations) */ processOutputResult?(args: ProcessOutputResultArgs): ProcessorMessageResult; /** * Process input messages at each step of the agentic loop, before they are sent to the LLM. * Unlike processInput which runs once at the start, this runs at every step (including tool call continuations). * * @returns Either: * - ProcessInputStepResult object with model, toolChoice, messages, etc. * - MessageList: The same messageList instance passed in (indicates you've mutated it) * - MastraDBMessage[]: Transformed messages array (for simple transformations) * - undefined/void: No changes */ processInputStep?(args: ProcessInputStepArgs): Promise | ProcessInputStepResult | MessageList | MastraDBMessage[] | void | undefined; /** * Process output after each LLM response in the agentic loop, before tool execution. * Unlike processOutputResult which runs once at the end, this runs at every step. * * This is the ideal place to implement guardrails that can trigger retries: * - Validate tone, format, or content of LLM responses * - Check for policy violations before tools are executed * - Implement self-correction by calling abort({ retry: true }) * * @returns Either: * - MessageList: The same messageList instance passed in (indicates you've mutated it) * - MastraDBMessage[]: Transformed messages array (for simple transformations) */ processOutputStep?(args: ProcessOutputStepArgs): ProcessorMessageResult; /** * Process an LLM API rejection error before it's surfaced as a final error. * Only called for non-retryable API rejections (e.g., 400/422 status codes), * NOT for network errors or retryable server errors (which are handled by p-retry). * * This allows processors to inspect the error, modify the request (e.g., append messages), * and signal a retry. Unlike processOutputStep which runs after successful responses, * this runs when the API call is rejected. * * @returns ProcessAPIErrorResult indicating whether to retry with the modified state, * or void/undefined to not handle the error */ processAPIError?(args: ProcessAPIErrorArgs): Promise | ProcessAPIErrorResult | void; /** * Internal method called when the processor is registered with a Mastra instance. * This allows processors to access Mastra services like knowledge, storage, etc. * @internal */ __registerMastra?(mastra: Mastra): void; } /** * Base class for processors that need access to Mastra services. * Extend this class to automatically get access to the Mastra instance * when the processor is registered with an agent. * * @example * ```typescript * class MyProcessor extends BaseProcessor<'my-processor'> { * readonly id = 'my-processor'; * * async processInput(args: ProcessInputArgs) { * // Access Mastra services via this.mastra * const knowledge = this.mastra?.getKnowledge(); * // ... * } * } * ``` */ export declare abstract class BaseProcessor implements Processor { abstract readonly id: TId; readonly name?: string; /** * The Mastra instance this processor is registered with. * Available after the processor is registered via __registerMastra. */ protected mastra?: Mastra; /** * Called when the processor is registered with a Mastra instance. * @internal */ __registerMastra(mastra: Mastra): void; } type WithRequired = T & { [P in K]-?: NonNullable; }; export type InputProcessor = (WithRequired, 'id' | 'processInput'> & Processor) | (WithRequired, 'id' | 'processInputStep'> & Processor); export type OutputProcessor = (WithRequired, 'id' | 'processOutputStream'> & Processor) | (WithRequired, 'id' | 'processOutputResult'> & Processor) | (WithRequired, 'id' | 'processOutputStep'> & Processor); export type ErrorProcessor = WithRequired, 'id' | 'processAPIError'> & Processor; export type ProcessorTypes = InputProcessor | OutputProcessor | ErrorProcessor; /** * A Workflow that can be used as a processor. * The workflow must accept ProcessorStepInput and return ProcessorStepOutput. */ export type ProcessorWorkflow = Workflow; /** * Input processor config: can be a Processor or a Workflow. */ export type InputProcessorOrWorkflow = InputProcessor | ProcessorWorkflow; /** * Output processor config: can be a Processor or a Workflow. */ export type OutputProcessorOrWorkflow = OutputProcessor | ProcessorWorkflow; /** * Error processor config: must be a processor with processAPIError. * Workflows are not supported because LLM API rejection handling only invokes processor methods. */ export type ErrorProcessorOrWorkflow = ErrorProcessor; /** * Type guard to check if an object is a Workflow that can be used as a processor. * A ProcessorWorkflow must have 'id', 'inputSchema', 'outputSchema', and 'execute' properties. */ export declare function isProcessorWorkflow(obj: unknown): obj is ProcessorWorkflow; export * from './processors/index.js'; export { PrefillErrorHandler } from './prefill-error-handler.js'; export { ProcessorState, ProcessorRunner } from './runner.js'; export * from './memory/index.js'; export type { TripWireOptions } from '../agent/trip-wire.js'; export { ProcessorStepSchema, ProcessorStepInputSchema, ProcessorStepOutputSchema, ProcessorInputPhaseSchema, ProcessorInputStepPhaseSchema, ProcessorOutputStreamPhaseSchema, ProcessorOutputResultPhaseSchema, ProcessorOutputStepPhaseSchema, ProcessorMessageSchema, ProcessorMessageContentSchema, MessageContentSchema, MessagePartSchema, TextPartSchema, ImagePartSchema, FilePartSchema, ToolInvocationPartSchema, ReasoningPartSchema, SourcePartSchema, StepStartPartSchema, } from './step-schema.js'; export type { ProcessorStepData, ProcessorStepDataFlexible, ProcessorStepInput, ProcessorStepOutput, ProcessorMessage, MessageContent, MessagePart, } from './step-schema.js'; //# sourceMappingURL=index.d.ts.map