# Processor interface

The `Processor` interface defines the contract for all processors in Mastra. Processors can implement one or more methods to handle different stages of the agent execution pipeline.

## When processor methods run

The six processor methods run at different points in the agent execution lifecycle:

```text
┌─────────────────────────────────────────────────────────────────┐
│                     Agent Execution Flow                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  User Input                                                     │
│      │                                                          │
│      ▼                                                          │
│  ┌─────────────────┐                                            │
│  │  processInput   │  ← Runs ONCE at start                      │
│  └────────┬────────┘                                            │
│           │                                                     │
│           ▼                                                     │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │                   Agentic Loop                          │    │
│  │  ┌─────────────────────┐                                │    │
│  │  │  processInputStep   │  ← Runs at EACH step           │    │
│  │  └──────────┬──────────┘                                │    │
│  │             │                                           │    │
│  │             ▼                                           │    │
│  │       LLM Execution ──── API Error? ──┐                │    │
│  │             │                          │                │    │
│  │             │              ┌───────────────────┐        │    │
│  │             │              │  processAPIError  │        │    │
│  │             │              └─────────┬─────────┘        │    │
│  │             │                 retry? └── Loop back ──┐  │    │
│  │             ▼                                        │  │    │
│  │  ┌──────────────────────┐                            │  │    │
│  │  │ processOutputStream  │  ← Runs on EACH stream chunk  │    │
│  │  └──────────┬───────────┘                               │    │
│  │             │                                           │    │
│  │             ▼                                           │    │
│  │  ┌──────────────────────┐                               │    │
│  │  │  processOutputStep   │  ← Runs after EACH LLM step   │    │
│  │  └──────────┬───────────┘                               │    │
│  │             │                                           │    │
│  │             ▼                                           │    │
│  │     Tool Execution (if needed)                          │    │
│  │             │                                           │    │
│  │             └──────── Loop back if tools called ────────│    │
│  └─────────────────────────────────────────────────────────┘    │
│           │                                                     │
│           ▼                                                     │
│  ┌─────────────────────┐                                        │
│  │ processOutputResult │  ← Runs ONCE after completion          │
│  └─────────────────────┘                                        │
│           │                                                     │
│           ▼                                                     │
│     Final Response                                              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
```

| Method                | When it runs                                           | Use case                                                                      |
| --------------------- | ------------------------------------------------------ | ----------------------------------------------------------------------------- |
| `processInput`        | Once at the start, before the agentic loop             | Validate/transform initial user input, add context                            |
| `processInputStep`    | At each step of the agentic loop, before each LLM call | Transform messages between steps, handle tool results                         |
| `processAPIError`     | When an LLM API call fails                             | Inspect API rejections, optionally mutate state/messages, and request a retry |
| `processOutputStream` | On each streaming chunk during LLM response            | Filter/modify streaming content, detect patterns in real-time                 |
| `processOutputStep`   | After each LLM response, before tool execution         | Validate output quality, implement guardrails with retry                      |
| `processOutputResult` | Once after generation completes                        | Post-process final response, log results                                      |

## Interface definition

```typescript
interface Processor<TId extends string = string> {
  readonly id: TId
  readonly name?: string

  processInput?(args: ProcessInputArgs): Promise<ProcessInputResult> | ProcessInputResult
  processInputStep?(args: ProcessInputStepArgs): ProcessorMessageResult
  processAPIError?(
    args: ProcessAPIErrorArgs,
  ): Promise<ProcessAPIErrorResult | void> | ProcessAPIErrorResult | void
  processOutputStream?(args: ProcessOutputStreamArgs): Promise<ChunkType | null | undefined>
  processOutputStep?(args: ProcessOutputStepArgs): ProcessorMessageResult
  processOutputResult?(args: ProcessOutputResultArgs): ProcessorMessageResult
}
```

## Properties

**id** (`string`): Unique identifier for the processor. Used for tracing and debugging.

**name** (`string`): Optional display name for the processor. Falls back to id if not provided.

**processDataParts** (`boolean`): When true, the processOutputStream method also receives \`data-\*\` chunks emitted by tools via writer.custom(). Defaults to false.

## Methods

### `processInput`

Processes input messages before they're sent to the LLM. Runs once at the start of agent execution.

```typescript
processInput?(args: ProcessInputArgs): Promise<ProcessInputResult> | ProcessInputResult;
```

#### `ProcessInputArgs`

**messages** (`MastraDBMessage[]`): User and assistant messages to process (excludes system messages).

**systemMessages** (`CoreMessage[]`): All system messages (agent instructions, memory context, user-provided). Can be modified and returned.

**messageList** (`MessageList`): Full MessageList instance for advanced message management.

**abort** (`(reason?: string, options?: { retry?: boolean; metadata?: unknown }) => never`): Function to abort processing. Throws a TripWire error that stops execution. Pass \`retry: true\` to request the LLM retry the step with feedback.

**retryCount** (`number`): Number of times processors have triggered retry for this generation. Use this to limit retry attempts.

**tracingContext** (`TracingContext`): Tracing context for observability.

**requestContext** (`RequestContext`): Request-scoped context with execution metadata like threadId and resourceId.

#### `ProcessInputResult`

The method can return one of three types:

**MastraDBMessage\[]** (`array`): Transformed messages array. System messages remain unchanged.

**MessageList** (`MessageList`): The same messageList instance passed in. Indicates you've mutated it directly.

**{ messages, systemMessages }** (`object`): Object with both transformed messages and modified system messages.

***

### `processInputStep`

Processes input messages at each step of the agentic loop, before they're sent to the LLM. Unlike `processInput` which runs once at the start, this runs at every step including tool call continuations.

```typescript
processInputStep?(args: ProcessInputStepArgs): ProcessorMessageResult;
```

#### Execution order in the agentic loop

1. `processInput` (once at start)
2. `processInputStep` from inputProcessors (at each step, before LLM call)
3. `prepareStep` callback (runs as part of the processInputStep pipeline, after inputProcessors)
4. LLM execution
5. Tool execution (if needed)
6. Repeat from step 2 if tools were called

#### `ProcessInputStepArgs`

**messages** (`MastraDBMessage[]`): All messages including tool calls and results from previous steps (read-only snapshot).

**messageList** (`MessageList`): MessageList instance for managing messages. Can mutate directly or return in result.

**stepNumber** (`number`): Current step number (0-indexed). Step 0 is the initial LLM call.

**steps** (`StepResult[]`): Results from previous steps, including text, toolCalls, and toolResults.

**systemMessages** (`CoreMessage[]`): All system messages (read-only snapshot). Return in result to replace.

**model** (`MastraLanguageModelV2`): Current model being used. Return a different model in result to switch.

**toolChoice** (`ToolChoice`): Current tool choice setting ('auto', 'none', 'required', or specific tool).

**activeTools** (`string[]`): Currently active tool names. Return filtered array to limit tools.

**tools** (`ToolSet`): Current tools available for this step. Return in result to add/replace tools.

**providerOptions** (`SharedV2ProviderOptions`): Provider-specific options (e.g., Anthropic cacheControl, OpenAI reasoningEffort).

**modelSettings** (`CallSettings`): Model settings like temperature, maxTokens, topP.

**structuredOutput** (`StructuredOutputOptions`): Structured output configuration (schema, output mode). Return in result to modify.

**abort** (`(reason?: string) => never`): Function to abort processing.

**tracingContext** (`TracingContext`): Tracing context for observability.

**requestContext** (`RequestContext`): Request-scoped context with execution metadata.

#### `ProcessInputStepResult`

The method can return any combination of these properties:

**model** (`LanguageModelV2 | string`): Change the model for this step. Can be a model instance or router ID like 'openai/gpt-5.4'.

**toolChoice** (`ToolChoice`): Change tool selection behavior for this step.

**activeTools** (`string[]`): Filter which tools are available for this step.

**tools** (`ToolSet`): Replace or modify tools for this step. Use spread to merge: { tools: { ...tools, newTool } }.

**messages** (`MastraDBMessage[]`): Replace all messages. Cannot be used with messageList.

**messageList** (`MessageList`): Return the same messageList instance (indicates you mutated it). Cannot be used with messages.

**systemMessages** (`CoreMessage[]`): Replace all system messages for this step only.

**providerOptions** (`SharedV2ProviderOptions`): Change provider-specific options for this step.

**modelSettings** (`CallSettings`): Change model settings for this step.

**structuredOutput** (`StructuredOutputOptions`): Change structured output configuration for this step.

#### Processor chaining

When multiple processors implement `processInputStep`, they run in order and changes chain through:

```text
Processor 1: receives { model: 'gpt-5.4' } → returns { model: 'gpt-5.4-mini' }
Processor 2: receives { model: 'gpt-5.4-mini' } → returns { toolChoice: 'none' }
Final: model = 'gpt-5.4-mini', toolChoice = 'none'
```

#### System message isolation

System messages are **reset to their original values** at the start of each step. Modifications made in `processInputStep` only affect the current step, not subsequent steps.

#### Use cases

- Dynamic model switching based on step number or context
- Disabling tools after a certain number of steps
- Dynamically adding or replacing tools based on conversation context
- Transforming message part types between providers (e.g., `reasoning` → `thinking` for Anthropic)
- Modifying messages based on step number or accumulated context
- Adding step-specific system instructions
- Adjusting provider options per step (e.g., cache control)
- Modifying structured output schema based on step context

***

### `processAPIError`

Handles LLM API rejection errors before they surface as final errors. This runs when the API call fails with a non-retryable error (such as a 400 or 422 status code). Unlike `processOutputStep` which runs after successful responses, this runs when the API rejects the request.

Add processors that implement `processAPIError` to an agent's `errorProcessors` array.

Processors can inspect the error, modify the request (for example, by appending messages to the `messageList`), and return `{ retry: true }` to signal a retry with the modified state.

```typescript
processAPIError?(args: ProcessAPIErrorArgs): Promise<ProcessAPIErrorResult | void> | ProcessAPIErrorResult | void;
```

#### `ProcessAPIErrorArgs`

**error** (`unknown`): The error that occurred during the LLM API call.

**messages** (`MastraDBMessage[]`): All messages at the time of the error.

**messageList** (`MessageList`): MessageList instance for managing messages. Modify this to change the request before retry.

**stepNumber** (`number`): Current step number (0-indexed).

**steps** (`StepResult[]`): All completed steps so far.

**state** (`Record<string, unknown>`): Per-processor state that persists across all method calls within this request.

**retryCount** (`number`): The current retry count for error handlers. Use this to limit retry attempts.

**abort** (`(reason?: string, options?: { retry?: boolean; metadata?: unknown }) => never`): Function to abort processing.

**writer** (`ProcessorStreamWriter`): Stream writer for emitting custom data chunks during streaming. Use \`writer.custom()\` to send transient UI signals.

**requestContext** (`RequestContext`): Request context passed through from the agent call.

**abortSignal** (`AbortSignal`): Signal for cancelling the operation.

#### `ProcessAPIErrorResult`

**retry** (`boolean`): Whether to retry the LLM call after applying modifications.

#### Use cases

- Handling API-specific rejections by modifying the request and retrying
- Converting non-retryable errors into retryable ones with request modifications
- Implementing model-specific error recovery strategies

#### Example: Custom error recovery

```typescript
import { APICallError } from '@ai-sdk/provider'
import type { Processor, ProcessAPIErrorArgs, ProcessAPIErrorResult } from '@mastra/core/processors'

export class ErrorRecoveryProcessor implements Processor {
  id = 'error-recovery'

  processAPIError({
    error,
    messageList,
    retryCount,
  }: ProcessAPIErrorArgs): ProcessAPIErrorResult | void {
    // Only retry once
    if (retryCount > 0) return

    // Check for a specific API error
    if (APICallError.isInstance(error) && error.message.includes('context length exceeded')) {
      // Trim older messages to fit within context
      const messages = messageList.get.all.db()
      if (messages.length > 4) {
        messageList.removeByIds([messages[1]!.id, messages[2]!.id])
        return { retry: true }
      }
    }
  }
}
```

***

### `processOutputStream`

Processes streaming output chunks with built-in state management. Allows processors to accumulate chunks and make decisions based on larger context.

```typescript
processOutputStream?(args: ProcessOutputStreamArgs): Promise<ChunkType | null | undefined>;
```

#### `ProcessOutputStreamArgs`

**part** (`ChunkType`): The current stream chunk being processed.

**streamParts** (`ChunkType[]`): All chunks seen so far in the stream.

**state** (`Record<string, unknown>`): Mutable state object that persists across chunks within a single stream.

**abort** (`(reason?: string) => never`): Function to abort the stream.

**messageList** (`MessageList`): MessageList instance for accessing conversation history.

**tracingContext** (`TracingContext`): Tracing context for observability.

**requestContext** (`RequestContext`): Request-scoped context with execution metadata.

**writer** (`ProcessorStreamWriter`): Stream writer for emitting custom data chunks back to the client. Call writer.custom() to emit data-\* typed chunks. Available during streaming.

#### Return value

- Return the `ChunkType` to emit it (possibly modified)
- Return `null` or `undefined` to skip emitting the chunk

***

### `processOutputResult`

Processes the complete output result after streaming or generation is finished.

```typescript
processOutputResult?(args: ProcessOutputResultArgs): ProcessorMessageResult;
```

#### `ProcessOutputResultArgs`

**messages** (`MastraDBMessage[]`): The generated response messages.

**messageList** (`MessageList`): MessageList instance for managing messages.

**state** (`Record<string, unknown>`): Per-processor state that persists across all method calls within this request. Shared with processOutputStream and other methods.

**result** (`OutputResult`): Resolved generation result containing \`text\` (accumulated text), \`usage\` (token usage with inputTokens, outputTokens, totalTokens), \`finishReason\` (why generation ended), and \`steps\` (all LLM step results, each with toolCalls, toolResults, reasoning, sources, files, etc.).

**abort** (`(reason?: string) => never`): Function to abort processing.

**tracingContext** (`TracingContext`): Tracing context for observability.

**requestContext** (`RequestContext`): Request-scoped context with execution metadata.

**writer** (`ProcessorStreamWriter`): Stream writer for emitting custom data chunks back to the client. Call writer.custom() to emit data-\* typed chunks. Available during streaming.

***

### `processOutputStep`

Processes output after each LLM response in the agentic loop, before tool execution. Unlike `processOutputResult` which runs once at the end, this runs at every step. This is the ideal method for implementing guardrails that can trigger retries.

```typescript
processOutputStep?(args: ProcessOutputStepArgs): ProcessorMessageResult;
```

#### `ProcessOutputStepArgs`

**messages** (`MastraDBMessage[]`): All messages including the latest LLM response.

**messageList** (`MessageList`): MessageList instance for managing messages.

**stepNumber** (`number`): Current step number (0-indexed).

**finishReason** (`string`): The finish reason from the LLM (stop, tool-use, length, etc.).

**toolCalls** (`ToolCallInfo[]`): Tool calls made in this step (if any).

**text** (`string`): Generated text from this step.

**systemMessages** (`CoreMessage[]`): All system messages for read/modify access.

**abort** (`(reason?: string, options?: { retry?: boolean; metadata?: unknown }) => never`): Function to abort processing. Pass \`retry: true\` to request the LLM retry the step.

**retryCount** (`number`): Number of times processors have triggered retry. Use this to limit retry attempts.

**tracingContext** (`TracingContext`): Tracing context for observability.

**requestContext** (`RequestContext`): Request-scoped context with execution metadata.

#### Use cases

- Implementing quality guardrails that can request retries
- Validating LLM output before tool execution
- Adding per-step logging or metrics
- Implementing output moderation with retry capability

#### Example: Quality guardrail with retry

```typescript
import type { Processor } from '@mastra/core'

export class QualityGuardrail implements Processor {
  id = 'quality-guardrail'

  async processOutputStep({ text, abort, retryCount }) {
    const score = await evaluateResponseQuality(text)

    if (score < 0.7) {
      if (retryCount < 3) {
        // Request retry with feedback for the LLM
        abort('Response quality too low. Please provide more detail.', {
          retry: true,
          metadata: { qualityScore: score },
        })
      } else {
        // Max retries reached, block the response
        abort('Response quality too low after multiple attempts.')
      }
    }

    return []
  }
}
```

## Processor types

Mastra provides type aliases to ensure processors implement the required methods:

```typescript
// Must implement processInput OR processInputStep (or both)
type InputProcessor = Processor & ({ processInput: required } | { processInputStep: required })

// Must implement processOutputStream, processOutputStep, OR processOutputResult (or any combination)
type OutputProcessor = Processor &
  (
    | { processOutputStream: required }
    | { processOutputStep: required }
    | { processOutputResult: required }
  )

// Must implement processAPIError
type ErrorProcessor = Processor & { processAPIError: required }
```

Configure processors that implement `processAPIError` in `errorProcessors`:

```typescript
const agent = new Agent({
  // ...
  errorProcessors: [new PrefillErrorHandler()],
})
```

## Usage examples

### Basic input processor

```typescript
import type { Processor, MastraDBMessage } from '@mastra/core'

export class LowercaseProcessor implements Processor {
  id = 'lowercase'

  async processInput({ messages }): Promise<MastraDBMessage[]> {
    return messages.map(msg => ({
      ...msg,
      content: {
        ...msg.content,
        parts: msg.content.parts?.map(part =>
          part.type === 'text' ? { ...part, text: part.text.toLowerCase() } : part,
        ),
      },
    }))
  }
}
```

### Per-step processor with `processInputStep`

```typescript
import type { Processor, ProcessInputStepArgs, ProcessInputStepResult } from '@mastra/core'

export class DynamicModelProcessor implements Processor {
  id = 'dynamic-model'

  async processInputStep({
    stepNumber,
    steps,
    toolChoice,
  }: ProcessInputStepArgs): Promise<ProcessInputStepResult> {
    // Use a fast model for initial response
    if (stepNumber === 0) {
      return { model: 'openai/gpt-5-mini' }
    }

    // Switch to powerful model after tool calls
    if (steps.length > 0 && steps[steps.length - 1].toolCalls?.length) {
      return { model: 'openai/gpt-5.4' }
    }

    // Disable tools after 5 steps to force completion
    if (stepNumber > 5) {
      return { toolChoice: 'none' }
    }

    return {}
  }
}
```

### Message transformer with `processInputStep`

```typescript
import type { Processor, MastraDBMessage } from '@mastra/core'

export class ReasoningTransformer implements Processor {
  id = 'reasoning-transformer'

  async processInputStep({ messages, messageList }) {
    // Transform reasoning parts to thinking parts at each step
    // This is useful when switching between model providers
    for (const msg of messages) {
      if (msg.role === 'assistant' && msg.content.parts) {
        for (const part of msg.content.parts) {
          if (part.type === 'reasoning') {
            ;(part as any).type = 'thinking'
          }
        }
      }
    }
    return messageList
  }
}
```

### Hybrid processor (input and output)

```typescript
import type { Processor, MastraDBMessage, ChunkType } from '@mastra/core'

export class ContentFilter implements Processor {
  id = 'content-filter'
  private blockedWords: string[]

  constructor(blockedWords: string[]) {
    this.blockedWords = blockedWords
  }

  async processInput({ messages, abort }): Promise<MastraDBMessage[]> {
    for (const msg of messages) {
      const text = msg.content.parts
        ?.filter(p => p.type === 'text')
        .map(p => p.text)
        .join(' ')

      if (this.blockedWords.some(word => text?.includes(word))) {
        abort('Blocked content detected in input')
      }
    }
    return messages
  }

  async processOutputStream({ part, abort }): Promise<ChunkType | null> {
    if (part.type === 'text-delta') {
      if (this.blockedWords.some(word => part.textDelta.includes(word))) {
        abort('Blocked content detected in output')
      }
    }
    return part
  }
}
```

### Stream accumulator with state

```typescript
import type { Processor, ChunkType } from '@mastra/core'

export class WordCounter implements Processor {
  id = 'word-counter'

  async processOutputStream({ part, state }): Promise<ChunkType> {
    // Initialize state on first chunk
    if (!state.wordCount) {
      state.wordCount = 0
    }

    // Count words in text chunks
    if (part.type === 'text-delta') {
      const words = part.textDelta.split(/\s+/).filter(Boolean)
      state.wordCount += words.length
    }

    // Log word count on finish
    if (part.type === 'finish') {
      console.log(`Total words: ${state.wordCount}`)
    }

    return part
  }
}
```

## Related

- [Processors overview](https://mastra.ai/docs/agents/processors): Conceptual guide to processors
- [Guardrails](https://mastra.ai/docs/agents/guardrails): Security and validation processors
- [Memory Processors](https://mastra.ai/docs/memory/memory-processors): Memory-specific processors