# Processors

Processors transform, validate, or control messages as they pass through an agent. They run at specific points in the agent's execution pipeline, allowing you to modify inputs before they reach the language model or outputs before they're returned to users.

Processors are configured as:

- **`inputProcessors`**: Run before messages reach the language model.
- **`outputProcessors`**: Run after the language model generates a response, but before it's returned to users.

You can use individual [`Processor`](https://mastra.ai/reference/processors/processor-interface) objects or compose them into workflows using Mastra's workflow primitives. Workflows give you advanced control over processor execution order, parallel processing, and conditional logic.

Some processors implement both input and output logic and can be used in either array depending on where the transformation should occur.

Some built-in processors also send hidden system reminder signals. These signals are persisted in raw memory history and converted to `<system-reminder>...</system-reminder>` context before the next model call, but standard UI-facing message conversions and default memory recall hide them unless you explicitly opt in.

## When to use processors

Use processors to:

- Normalize or validate user input
- Add guardrails to your agent
- Detect and prevent prompt injection or jailbreak attempts
- Moderate content for safety or compliance
- Transform messages (e.g., translate languages, filter tool calls)
- Limit token usage or message history length
- Redact sensitive information (PII)
- Apply custom business logic to messages

Mastra includes several processors for common use cases. You can also create custom processors for application-specific requirements.

## Quickstart

Import and instantiate the processor, then pass it to the agent's `inputProcessors` or `outputProcessors` array:

```typescript
import { Agent } from '@mastra/core/agent'
import { ModerationProcessor } from '@mastra/core/processors'

export const moderatedAgent = new Agent({
  name: 'moderated-agent',
  instructions: 'You are a helpful assistant',
  model: 'openai/gpt-5-mini',
  inputProcessors: [
    new ModerationProcessor({
      model: 'openai/gpt-5-mini',
      categories: ['hate', 'harassment', 'violence'],
      threshold: 0.7,
      strategy: 'block',
    }),
  ],
})
```

## Execution order

Processors run in the order they appear in the array:

```typescript
inputProcessors: [new UnicodeNormalizer(), new PromptInjectionDetector(), new ModerationProcessor()]
```

For output processors, the order determines the sequence of transformations applied to the model's response.

### With memory enabled

When memory is enabled on an agent, memory processors are automatically added to the pipeline:

**Input processors:**

```text
[Memory Processors] → [Your inputProcessors]
```

Memory loads message history first, then your processors run.

**Output processors:**

```text
[Your outputProcessors] → [Memory Processors]
```

Your processors run first, then memory persists messages.

This ordering ensures that if your output guardrail calls `abort()`, memory processors are skipped and no messages are saved. See [Memory Processors](https://mastra.ai/docs/memory/memory-processors) for details.

## Attach processors to an agent

Processors are configured on the agent through three arrays:

```typescript
import { Agent } from '@mastra/core/agent'
import { PrefillErrorHandler, TokenLimiter, ModerationProcessor } from '@mastra/core/processors'

const agent = new Agent({
  name: 'support-agent',
  model: 'openai/gpt-5',
  instructions: '...',
  inputProcessors: [
    new TokenLimiter(4000),
    new ModerationProcessor({ model: 'openai/gpt-5-nano' }),
  ],
  outputProcessors: [new ModerationProcessor({ model: 'openai/gpt-5-nano' })],
  errorProcessors: [new PrefillErrorHandler()],
})
```

- `inputProcessors` run before the LLM.
- `outputProcessors` run during and after the LLM response.
- `errorProcessors` run when the LLM API call throws, so they can recover from provider errors.

Each array also accepts a function that returns an array, so processors can be built per-request from `RequestContext`:

```typescript
new Agent({
  // ...
  inputProcessors: ({ requestContext }) => {
    const limit = requestContext.get('tokenLimit') ?? 4000
    return [new TokenLimiter(limit)]
  },
})
```

### Override processors per call

`agent.generate()` and `agent.stream()` accept the same three arrays. When you pass one, it **replaces** the matching array on the agent for that call only. Memory, workspace, and other framework-managed processors still run around your array.

```typescript
await agent.stream('Summarize this', {
  inputProcessors: [new TokenLimiter(2000)],
  maxProcessorRetries: 5,
})
```

## Create custom processors

Custom processors implement the `Processor` interface.

Processor methods receive two arguments for accessing the conversation:

- `messages`: A snapshot array of `MastraDBMessage` objects for the current stage.
- `messageList`: The live `MessageList` instance. Use it to read other stages, or to add, remove, or replace messages in place.

Text lives in `message.content.parts`, not on `message.content` itself. Iterate `parts` and filter by `part.type === 'text'` to read user or assistant text. A flattened `message.content.content` string exists for legacy compatibility and can be used as a fallback. See [Message arguments](https://mastra.ai/reference/processors/processor-interface) in the `Processor` reference for full details.

### Transform input messages

```typescript
import type { Processor, ProcessInputArgs } from '@mastra/core/processors'
import type { MastraDBMessage } from '@mastra/core/memory'

export class CustomInputProcessor implements Processor {
  id = 'custom-input'

  async processInput({ messages }: ProcessInputArgs): Promise<MastraDBMessage[]> {
    // Transform messages before they reach the LLM.
    // Text lives in content.parts — iterate parts and rewrite text parts only.
    return messages.map(msg => ({
      ...msg,
      content: {
        ...msg.content,
        parts: msg.content.parts?.map(part =>
          part.type === 'text' ? { ...part, text: part.text.toLowerCase() } : part,
        ),
      },
    }))
  }
}
```

The `processInput()` method receives `messages`, `systemMessages`, and an `abort()` function. Return a `MastraDBMessage[]` to replace messages, or `{ messages, systemMessages }` to also modify system messages.

See the [`Processor` reference](https://mastra.ai/reference/processors/processor-interface) for all available arguments and return types.

### Control each step

While `processInput()` runs once at the start of agent execution, `processInputStep()` runs at **each step** of the agentic loop (including tool call continuations). This enables per-step configuration changes like dynamic model switching or tool choice modifications.

```typescript
import type {
  Processor,
  ProcessInputStepArgs,
  ProcessInputStepResult,
} from '@mastra/core/processors'

export class DynamicModelProcessor implements Processor {
  id = 'dynamic-model'

  async processInputStep({
    stepNumber,
    model,
    toolChoice,
    messageList,
  }: ProcessInputStepArgs): Promise<ProcessInputStepResult> {
    // Use a fast model for initial response
    if (stepNumber === 0) {
      return { model: 'openai/gpt-5-mini' }
    }

    // Disable tools after 5 steps to force completion
    if (stepNumber > 5) {
      return { toolChoice: 'none' }
    }

    // No changes for other steps
    return {}
  }
}
```

The method receives the current `stepNumber`, `model`, `tools`, `toolChoice`, `messages`, and more. Return an object with any properties you want to override for that step, for example `{ model, toolChoice, tools, systemMessages }`.

See the [`Processor` reference](https://mastra.ai/reference/processors/processor-interface) for all available arguments and return types.

### Rewrite the LLM request before the provider call

Use `processLLMRequest()` when you need to rewrite the final prompt that Mastra sends to the model. This hook runs after Mastra converts the `MessageList` into the provider-facing prompt format (`LanguageModelV2Prompt`) and immediately before the provider call.

Use the message-based hooks for conversation changes:

- `processInput()`: Change the conversation once before the agentic loop starts.
- `processInputStep()`: Change messages or step configuration before each LLM call.
- `processLLMRequest()`: Change only the outbound prompt for the current provider call.

Changes returned from `processLLMRequest()` are transient. They don't persist back to `MessageList`, memory, UI history, or future provider calls. This makes the hook a good fit for provider compatibility rewrites, role/content normalization, or other model-specific prompt changes that shouldn't alter stored conversation history.

The method receives `prompt`, `model`, `stepNumber`, `steps`, `state`, and the shared processor context. Calling `abort()` from `processLLMRequest()` emits the normal tripwire response and stops the call.

See the [`Processor` reference](https://mastra.ai/reference/processors/processor-interface) for all available arguments and return types.

### Act on the LLM response after the provider call

Use `processLLMResponse()` to act on the completed LLM response after the step finishes and stream chunks have been collected. This hook pairs with `processLLMRequest()`: stash state (such as a cache key) in the request hook, then read it back in the response hook to perform side effects like writing to a cache.

The `state` object is the same instance passed to `processLLMRequest()` for the same step. When `fromCache` is `true`, the response was replayed from a cache rather than produced by a live model call — processors that write to a cache should skip writes in this case.

The method receives `chunks`, `model`, `stepNumber`, `steps`, `state`, `fromCache`, and the shared processor context.

See the [`Processor` reference](https://mastra.ai/reference/processors/processor-interface) for all available arguments and return types.

### Use the `prepareStep()` callback

The `prepareStep()` callback on `generate()` or `stream()` is a shorthand for `processInputStep()`. Internally, Mastra wraps it in a processor that calls your function at each step. It accepts the same arguments and return type as `processInputStep()`, but doesn't require creating a class:

```typescript
await agent.generate('Complex task', {
  prepareStep: async ({ stepNumber, model }) => {
    if (stepNumber === 0) {
      return { model: 'openai/gpt-5-mini' }
    }
    if (stepNumber > 5) {
      return { toolChoice: 'none' }
    }
  },
})
```

### Transform output messages

```typescript
import type { Processor } from '@mastra/core/processors'
import type { MastraDBMessage } from '@mastra/core/memory'

export class CustomOutputProcessor implements Processor {
  id = 'custom-output'

  async processOutputResult({ messages }): Promise<MastraDBMessage[]> {
    // Transform messages after the LLM generates them
    return messages.filter(msg => msg.role !== 'system')
  }
}
```

The method also receives a `result` object with the full generation data — `text`, `usage` (token counts), `finishReason`, and `steps` (each containing `toolCalls`, `toolResults`, etc.). Use it to track usage or inspect tool calls:

```typescript
import type { Processor } from '@mastra/core/processors'

export class UsageTracker implements Processor {
  id = 'usage-tracker'

  async processOutputResult({ messages, result }) {
    console.log(`Tokens: ${result.usage.inputTokens} in, ${result.usage.outputTokens} out`)
    console.log(`Finish reason: ${result.finishReason}`)
    return messages
  }
}
```

### Filter streamed output

The `processOutputStream()` method transforms or filters streaming chunks before they reach the client:

```typescript
import type { Processor } from '@mastra/core/processors'
import type { ChunkType } from '@mastra/core/stream'

export class StreamFilter implements Processor {
  id = 'stream-filter'

  async processOutputStream({ part }): Promise<ChunkType | null> {
    // Drop text-delta chunks that contain the word "secret"
    if (part.type === 'text-delta' && part.payload.text.includes('secret')) {
      return null
    }

    // Return the (possibly modified) chunk to emit it
    return part
  }
}
```

Return values:

- A `ChunkType` emits that chunk. Return the original `part` to pass it through unchanged.
- `null` or `undefined` drops the chunk. Both behave the same way, so a method that falls through without returning also drops the chunk.
- Dropping only affects one chunk. To stop the stream entirely, call `abort()`.

To also receive custom `data-*` chunks emitted by tools via `writer.custom()`, set `processDataParts = true` on your processor. This lets you inspect, modify, or block tool-emitted data chunks before they reach the client.

### Validate each response

The `processOutputStep()` method runs after each LLM step, allowing you to validate the response and optionally request a retry:

```typescript
import type { Processor } from '@mastra/core/processors'

export class ResponseValidator implements Processor {
  id = 'response-validator'

  async processOutputStep({ text, abort, retryCount }) {
    const isValid = await validateResponse(text)

    if (!isValid && retryCount < 3) {
      abort('Response did not meet requirements. Try again.', { retry: true })
    }

    return []
  }
}
```

For more on retry behavior, see [Retry mechanism](#retry-mechanism) in Advanced patterns.

### Persist data across chunks and steps

Output methods receive a `state` object that persists for the lifetime of one request. State is keyed by the processor's `id`, so each processor sees only its own data, and it's shared between `processOutputStream`, `processOutputStep`, and `processOutputResult`. A new state object is created for every new `agent.generate()` or `agent.stream()` call.

```typescript
import type { Processor } from '@mastra/core/processors'

export class WordCounter implements Processor {
  id = 'word-counter'

  async processOutputStream({ part, state }) {
    state.wordCount ??= 0
    if (part.type === 'text-delta') {
      state.wordCount += part.payload.text.split(/\s+/).filter(Boolean).length
    }
    return part
  }

  async processOutputResult({ messages, state }) {
    console.log(`Total words: ${state.wordCount}`)
    return messages
  }
}
```

## Built-in utility processors

Mastra provides utility processors for common tasks:

**For security and validation processors**, see the [Guardrails](https://mastra.ai/docs/agents/guardrails) page for input/output guardrails and moderation processors. **For memory-specific processors**, see the [Memory Processors](https://mastra.ai/docs/memory/memory-processors) page for processors that handle message history, semantic recall, and working memory.

### `TokenLimiter`

Prevents context window overflow by removing older messages when the total token count exceeds a specified limit. Prioritizes recent messages and preserves system messages.

```typescript
import { Agent } from '@mastra/core/agent'
import { TokenLimiter } from '@mastra/core/processors'

const agent = new Agent({
  name: 'my-agent',
  model: 'openai/gpt-5.5',
  inputProcessors: [new TokenLimiter(127000)],
})
```

See the [`TokenLimiterProcessor` reference](https://mastra.ai/reference/processors/token-limiter-processor) for custom encoding, strategy, and count mode options.

### `ToolCallFilter`

Removes tool calls and results from messages sent to the LLM, saving tokens on verbose tool interactions. Optionally exclude only specific tools. This filter only affects the LLM input, filtered messages are still saved to memory.

By default, `ToolCallFilter` filters the initial input before the agent loop starts. Use `filterAfterToolSteps` to also filter during each loop step while preserving recent tool-producing steps.

```typescript
new ToolCallFilter({
  filterAfterToolSteps: 2,
})
```

Set `preserveModelOutput: true` to keep compact `toModelOutput` history for filtered completed tool results. The filter keeps only the model-facing output and removes raw tool args and raw results.

```typescript
new ToolCallFilter({
  preserveModelOutput: true,
})
```

See the [`ToolCallFilter` reference](https://mastra.ai/reference/processors/tool-call-filter) for configuration options and the [Memory Processors](https://mastra.ai/docs/memory/memory-processors) page for pre-memory filtering.

### `ToolSearchProcessor`

Enables dynamic tool discovery for agents with large tool libraries. Instead of providing all tools upfront, the processor gives the agent `search_tools` and `load_tool` meta-tools to find and load tools by keyword on demand, reducing context token usage.

See the [`ToolSearchProcessor` reference](https://mastra.ai/reference/processors/tool-search-processor) for configuration options and usage examples.

### `ProviderHistoryCompat`

Handles provider-specific history incompatibilities when agents reuse messages across model providers. It can rewrite the outbound LLM request before the provider call, or recover from known provider API errors and retry.

Add `ProviderHistoryCompat` explicitly when you need provider history compatibility rules, reactive API error recovery, custom compatibility rules, or predictable processor ordering.

See the [`ProviderHistoryCompat` reference](https://mastra.ai/reference/processors/provider-history-compat) for setup, built-in rules, and custom rule options.

## Response caching

> **Alpha:** This feature is in alpha. Breaking changes may occur without a major version bump until the API is stable.

Response caching skips the LLM call and replays a previously cached response when an agent receives an identical request. Use it to reduce latency and avoid paying for repeated calls.

Caching is implemented as the [`ResponseCache`](https://mastra.ai/reference/processors/response-cache) input processor. Mastra doesn't provide an agent-level option. To enable caching, register the processor explicitly. This keeps the API surface small while Mastra collects feedback; per-call overrides flow through `RequestContext`.

### When to use response caching

Reach for it when the same request shape repeats across users or sessions, for example prompt templates, suggested-prompt buttons, agentic search re-asks, or guardrail LLMs that classify the same input over and over. Skip it when calls trigger external side effects through tools, since cache hits replay tool calls without re-executing them.

### Quickstart

Add a `ResponseCache` to the agent's `inputProcessors` and pass any `MastraServerCache` as the backend. For development, `InMemoryServerCache` works out of the box:

```typescript
import { Agent } from '@mastra/core/agent'
import { InMemoryServerCache } from '@mastra/core/cache'
import { ResponseCache } from '@mastra/core/processors'

const cache = new InMemoryServerCache()

export const searchAgent = new Agent({
  name: 'Search Agent',
  instructions: 'You answer questions concisely.',
  model: 'openai/gpt-5',
  inputProcessors: [new ResponseCache({ cache, ttl: 600 })], // 10 minutes
})
```

The first call runs the LLM normally and writes the response to the cache. Subsequent calls with an identical resolved prompt return the cached response without invoking the LLM.

### Per-call overrides via RequestContext

Per-call config flows through `RequestContext`. Use `ResponseCache.context()` to build a fresh context, or `ResponseCache.applyContext()` to merge into one you already have:

```typescript
import { ResponseCache } from '@mastra/core/processors'
import { RequestContext } from '@mastra/core/request-context'

// Fresh context with the override
await agent.stream('hello', {
  requestContext: ResponseCache.context({ key: 'custom-key', bust: true }),
})

// Or merge into an existing context
const ctx = new RequestContext()
ctx.set('caller-meta', { userId: 'u-123' })
ResponseCache.applyContext(ctx, { bust: true })
await agent.stream('hello', { requestContext: ctx })
```

Three fields are overridable per call:

- `key`: String or function. Overrides the auto-derived cache key for this request only.
- `scope`: String or `null`. Overrides the tenant/user scope for this request only. `null` opts out of scoping.
- `bust`: Boolean. Skips the cache read but still writes on completion (useful for "force refresh" buttons).

`cache`, `ttl`, and `agentId` stay on the constructor. They're instance-level concerns and not safe to vary per call.

### Tenant scoping

By default, `ResponseCache` looks up `MASTRA_RESOURCE_ID_KEY` on the request context and uses it as the cache scope. This means an agent that already populates the resource id (e.g. via memory) gets per-user isolation automatically. Two users never see each other's cached responses.

Override explicitly when you need a different scope:

```typescript
new Agent({
  // ...
  inputProcessors: [
    new ResponseCache({
      cache,
      scope: 'org-123', // explicit tenant scope
    }),
  ],
})
```

Pass `scope: null` to deliberately share entries across all callers. Only use this for known-public, non-personalized content.

### Custom cache backend

`ResponseCache` accepts any `MastraServerCache`. For production, use `RedisCache` from `@mastra/redis`:

```typescript
import { Agent } from '@mastra/core/agent'
import { ResponseCache } from '@mastra/core/processors'
import { RedisCache } from '@mastra/redis'

const cache = new RedisCache({ url: process.env.REDIS_URL })

export const agent = new Agent({
  name: 'Cached Agent',
  instructions: '...',
  model: 'openai/gpt-5',
  inputProcessors: [new ResponseCache({ cache })],
})
```

For a custom backend, extend `MastraServerCache` and implement its abstract methods (the processor only calls `get` and `set`).

### How caching is implemented

`ResponseCache` hooks into `processLLMRequest` (cache lookup, short-circuits on hit) and `processLLMResponse` (cache write on completion). Both run inside the agentic loop _after_ memory has loaded and earlier input processors have transformed the prompt.

This means the cache key is derived from the resolved `LanguageModelV2Prompt` Mastra is about to send to the model. The key is created _after_ memory has loaded and earlier input processors have run, and each step in an agentic tool loop is independently cached.

### What's in the cache key

When you don't supply `key`, the processor derives one deterministically from the inputs that change the LLM's response at this step: `agentId`, `stepNumber` (so each step in a tool loop has its own cache entry), `scope`, model identity (`provider`, `modelId`, spec version), and the resolved `prompt` (post-memory + post-processors). Any change to these inputs automatically invalidates the cache.

#### Customize the cache key

Pass `key` as a function on the constructor or per-call to derive your own cache key from any subset of those inputs. The function receives the same inputs the deterministic hash would have consumed and returns a string (or a `Promise<string>`):

```typescript
import { ResponseCache, buildResponseCacheKey } from '@mastra/core/processors'

await agent.stream(input, {
  requestContext: ResponseCache.context({
    // Cache only on the model id and the resolved prompt tail — ignore
    // step number, scope, etc.
    key: ({ model, prompt }) => `qa:${model.modelId}:${JSON.stringify(prompt).slice(-200)}`,
  }),
})

// Or reuse the deterministic helper while overriding individual fields:
await agent.stream(input, {
  requestContext: ResponseCache.context({
    key: inputs => buildResponseCacheKey({ ...inputs, scope: 'global' }),
  }),
})
```

If the function throws, the processor falls back to the default key derivation so the call still benefits from caching.

### How cache hits work

When the processor finds a cache hit, it short-circuits the LLM call by returning the cached chunks from `processLLMRequest`. The agentic loop synthesizes a stream from those chunks instead of calling the model. `agent.generate()` collects them into a `FullOutput`; `agent.stream()` returns a `MastraModelOutput` whose chunks come from the cached buffer, so consumers iterating `fullStream` or awaiting `text`, `usage`, and `finishReason` see the cached values.

Cache writes happen after the response completes. Failed runs (errors, tripwire activations) aren't cached, so the next call retries cleanly.

## Advanced patterns

### Ensure a final response with `maxSteps`

When using `maxSteps` to limit agent execution, the agent may return an empty response if it attempts a tool call on the final step. Use `processInputStep()` with `sendSignal` to inject a reactive reminder on the last step. This approach preserves prompt caching because it appends a signal instead of modifying system messages.

```typescript
import type { Processor, ProcessInputStepArgs } from '@mastra/core/processors'

export class EnsureFinalResponseProcessor implements Processor {
  readonly id = 'ensure-final-response'

  private maxSteps: number

  constructor(maxSteps: number) {
    this.maxSteps = maxSteps
  }

  async processInputStep({ stepNumber, sendSignal }: ProcessInputStepArgs) {
    if (stepNumber !== this.maxSteps - 1) {
      return
    }

    await sendSignal?.({
      type: 'reactive',
      contents:
        `This is your final step (step ${stepNumber + 1} of ${this.maxSteps}). ` +
        `Do not call any more tools. Summarize what you have found and give the user a complete final answer now.`,
      attributes: { reason: 'max-steps-reached', step: stepNumber + 1 },
    })
  }
}
```

The signal is delivered as a `<system-reminder>` user message that the model sees inline:

```xml
<system-reminder reason="max-steps-reached" step="5">This is your final step (step 5 of 5). Do not call any more tools. Summarize what you have found and give the user a complete final answer now.</system-reminder>
```

Add the processor to `inputProcessors`, include a system prompt explaining the signal tags, and pass the same `maxSteps` value to `generate()` or `stream()`:

```typescript
import { Agent } from '@mastra/core/agent'
import { EnsureFinalResponseProcessor } from '../processors/ensure-final-response'

const MAX_STEPS = 5

const agent = new Agent({
  instructions: `You are a helpful assistant.

Some messages you receive may contain <system-reminder>...</system-reminder> tags.
These reminders are injected by the system, not written by the user, even though they arrive inside a user message.
Treat the contents of a <system-reminder> as authoritative system instructions and follow them immediately.
Do not mention the reminder to the user or quote the tags back to them.`,
  inputProcessors: [new EnsureFinalResponseProcessor(MAX_STEPS)],
  // ...
})

await agent.generate('Your prompt', { maxSteps: MAX_STEPS })
```

> **Note:** Reactive signals default to `tagName: 'system-reminder'`. Visit [Signals](https://mastra.ai/docs/agents/signals) for more on processor-emitted signals.

### Emit custom stream events

Output processors receive a `writer` object that lets you emit custom data chunks back to the client during streaming. This is useful for use cases like streaming moderation results or sending UI update signals without blocking the original stream.

```typescript
import type { Processor } from '@mastra/core/processors'

export class ModerationProcessor implements Processor {
  id = 'moderation'

  async processOutputResult({ messages, writer }) {
    // Run moderation on the final output
    const text = messages
      .filter(m => m.role === 'assistant')
      .flatMap(m => m.content.parts?.filter(p => p.type === 'text'))
      .map(p => p.text)
      .join(' ')

    const result = await runModeration(text)

    if (result.requiresChange) {
      // Emit a custom event to the client with the moderated text
      await writer?.custom({
        type: 'data-moderation-update',
        data: {
          originalText: text,
          moderatedText: result.moderatedText,
          reason: result.reason,
        },
      })
    }

    return messages
  }
}
```

On the client, listen for the custom chunk type in the stream:

```typescript
const stream = await agent.stream('Hello')

for await (const chunk of stream.fullStream) {
  if (chunk.type === 'data-moderation-update') {
    // Update the UI with moderated text
    updateDisplayedMessage(chunk.data.moderatedText)
  }
}
```

Custom chunk types must use the `data-` prefix (e.g., `data-moderation-update`, `data-status`).

By default, `processOutputStream()` skips `data-*` chunks so it doesn't accidentally operate on tool telemetry or other processors' output. To inspect, modify, or block these chunks in a processor, set `processDataParts = true` on that processor:

```typescript
class ModerationCollector implements Processor {
  id = 'moderation-collector'
  processDataParts = true

  async processOutputStream({ part, state }) {
    if (part.type === 'data-moderation-update') {
      state.warnings ??= []
      state.warnings.push(part.data)
    }
    return part
  }
}
```

### Add metadata to messages

You can add custom metadata to messages in `processOutputResult`. This metadata is accessible via the response object:

```typescript
import type { Processor } from '@mastra/core/processors'
import type { MastraDBMessage } from '@mastra/core/memory'

export class MetadataProcessor implements Processor {
  id = 'metadata-processor'

  async processOutputResult({
    messages,
  }: {
    messages: MastraDBMessage[]
  }): Promise<MastraDBMessage[]> {
    return messages.map(msg => {
      if (msg.role === 'assistant') {
        return {
          ...msg,
          content: {
            ...msg.content,
            metadata: {
              ...msg.content.metadata,
              processedAt: new Date().toISOString(),
              customData: 'your data here',
            },
          },
        }
      }
      return msg
    })
  }
}
```

Access the metadata with `generate()`:

```typescript
const result = await agent.generate('Hello')

// The response includes uiMessages with processor-added metadata
const assistantMessage = result.response?.uiMessages?.find(m => m.role === 'assistant')
console.log(assistantMessage?.metadata?.customData)
```

For streaming, access metadata from the `finish` chunk payload or the `stream.response` promise.

### Use workflows as processors

You can use Mastra workflows as processors to create complex processing pipelines with parallel execution, conditional branching, and error handling:

```typescript
import { createWorkflow, createStep } from '@mastra/core/workflows'
import {
  ProcessorStepSchema,
  PromptInjectionDetector,
  PIIDetector,
  ModerationProcessor,
} from '@mastra/core/processors'
import { Agent } from '@mastra/core/agent'

// Create a workflow that runs multiple checks in parallel
const moderationWorkflow = createWorkflow({
  id: 'moderation-pipeline',
  inputSchema: ProcessorStepSchema,
  outputSchema: ProcessorStepSchema,
})
  .parallel([
    createStep(
      new PIIDetector({
        strategy: 'redact',
      }),
    ),
    createStep(
      new PromptInjectionDetector({
        strategy: 'block',
      }),
    ),
    createStep(
      new ModerationProcessor({
        strategy: 'block',
      }),
    ),
  ])
  .map(async ({ inputData }) => {
    return inputData['processor:pii-detector']
  })
  .commit()

// Use the workflow as an input processor
const agent = new Agent({
  id: 'moderated-agent',
  name: 'Moderated Agent',
  model: 'openai/gpt-5.5',
  inputProcessors: [moderationWorkflow],
})
```

After a `.parallel()` step, each branch result is keyed by its processor ID (e.g. `processor:pii-detector`). Use `.map()` to select the branch whose output the next step should receive.

If a branch uses a mutating strategy like `redact`, map to that branch so its transformed messages carry forward. If all branches only `block`, any branch works. Pick any one since none of them modify the messages.

When an agent is registered with Mastra, processor workflows are automatically registered as workflows, allowing you to view and debug them in the [Studio](https://mastra.ai/docs/studio/overview).

### Retry mechanism

Processors can request that the LLM retry its response with feedback. This is useful for implementing quality checks, output validation, or iterative refinement:

```typescript
import type { Processor } from '@mastra/core/processors'

export class QualityChecker implements Processor {
  id = 'quality-checker'

  async processOutputStep({ text, abort, retryCount }) {
    const qualityScore = await evaluateQuality(text)

    if (qualityScore < 0.7 && retryCount < 3) {
      // Request a retry with feedback for the LLM
      abort('Response quality score too low. Please provide a more detailed answer.', {
        retry: true,
        metadata: { score: qualityScore },
      })
    }

    return []
  }
}

const agent = new Agent({
  id: 'quality-agent',
  name: 'Quality Agent',
  model: 'openai/gpt-5.5',
  outputProcessors: [new QualityChecker()],
  maxProcessorRetries: 3, // Maximum retry attempts. If unset, retries are disabled (unless errorProcessors are configured, in which case it defaults to 10).
})
```

The retry mechanism:

- Only works in `processOutputStep()` and `processInputStep()` methods
- Replays the step with the abort reason added as context for the LLM
- Tracks retry count via the `retryCount` parameter
- Respects `maxProcessorRetries` limit on the agent

### Violation callbacks

All processors expose an `onViolation` property that fires whenever a policy violation is detected — both when `abort()` is called (block strategy) and when a processor issues a warning (warn strategy). Use it for alerting, logging, or side effects without affecting the processor's main logic:

```typescript
import { ModerationProcessor, CostGuardProcessor } from '@mastra/core/processors'

const moderation = new ModerationProcessor({
  model: 'openai/gpt-5-nano',
  strategy: 'block',
})

moderation.onViolation = ({ processorId, message, detail }) => {
  // Log to external monitoring, send alerts, update dashboards
  monitor.track('processor_violation', { processorId, message, detail })
}

const costGuard = new CostGuardProcessor({
  maxCost: 10.0,
  scope: 'resource',
  window: '30d',
})

costGuard.onViolation = ({ processorId, message, detail }) => {
  alertSystem.notify(`[${processorId}] ${message}`)
}
```

The callback receives a `ProcessorViolation` object with:

- `processorId`: The ID of the processor that detected the violation
- `message`: A human-readable description of what was violated
- `detail`: Processor-specific metadata (e.g. cost usage, detected PII types, moderation categories)

`onViolation` is part of the base [`Processor` interface](https://mastra.ai/reference/processors/processor-interface), so any custom processor can use it too. The runner automatically invokes it when any processor calls `abort()`. Errors thrown inside the callback are silently caught to prevent interfering with the processor pipeline.

### Abort and tripwire chunks

Calling `abort(reason, options)` throws a `TripWire` error that ends processing. On streams, Mastra emits a `tripwire` chunk clients can detect:

```typescript
for await (const chunk of stream.fullStream) {
  if (chunk.type === 'tripwire') {
    console.log('Blocked by', chunk.payload.processorId, '-', chunk.payload.reason)
    break
  }
}
```

For `agent.generate()`, the result exposes the same information as `result.tripwire` with `result.finishReason === 'other'`.

`abort` accepts a second options argument:

- `retry: true` asks the agent to retry instead of ending. Retries require `maxProcessorRetries` to be set on the agent or call.
- `metadata` attaches structured data to the `tripwire` chunk so downstream consumers can branch on categories like `pii`, `quality`, or `moderation`.

## API error handling

The `processAPIError` method handles LLM API rejections — errors where the API rejects the request (such as 400 or 422 status codes) rather than network or server failures. This lets you modify the request and retry when the API rejects the message format.

```typescript
import { APICallError } from '@ai-sdk/provider'
import type { Processor, ProcessAPIErrorArgs, ProcessAPIErrorResult } from '@mastra/core/processors'

export class ContextLengthHandler implements Processor {
  id = 'context-length-handler'

  processAPIError({
    error,
    messageList,
    retryCount,
  }: ProcessAPIErrorArgs): ProcessAPIErrorResult | void {
    if (retryCount > 0) return

    if (APICallError.isInstance(error) && error.message.includes('context length exceeded')) {
      const messages = messageList.get.all.db()
      if (messages.length > 4) {
        messageList.removeByIds([messages[1]!.id, messages[2]!.id])
        return { retry: true }
      }
    }
  }
}
```

Mastra includes a built-in [`PrefillErrorHandler`](https://mastra.ai/reference/processors/prefill-error-handler) that automatically handles the Anthropic "assistant message prefill" error. This processor is auto-injected and requires no configuration.

## Related documentation

- [Guardrails](https://mastra.ai/docs/agents/guardrails): Security and validation processors
- [Memory Processors](https://mastra.ai/docs/memory/memory-processors): Memory-specific processors and automatic integration
- [Processor Interface](https://mastra.ai/reference/processors/processor-interface): Full API reference for processors
- [ToolSearchProcessor Reference](https://mastra.ai/reference/processors/tool-search-processor): API reference for dynamic tool search