import type { MastraServerCache } from '../../cache/base.js'; import type { PubSub } from '../../events/pubsub.js'; import type { Mastra } from '../../mastra/index.js'; import type { MastraModelOutput } from '../../stream/base/output.js'; import type { ChunkType } from '../../stream/types.js'; import { Agent } from '../agent.js'; import type { AgentExecutionOptions } from '../agent.types.js'; import type { MessageListInput } from '../message-list/index.js'; import type { ToolsInput } from '../types.js'; import { ExtendedRunRegistry } from './run-registry.js'; import type { AgentFinishEventData, AgentStepFinishEventData, AgentSuspendedEventData, DurableAgenticWorkflowInput } from './types.js'; import { createDurableAgenticWorkflow } from './workflows/index.js'; /** * Options for DurableAgent.stream() */ export interface DurableAgentStreamOptions { /** Custom instructions that override the agent's default instructions for this execution */ instructions?: AgentExecutionOptions['instructions']; /** Additional context messages to provide to the agent */ context?: AgentExecutionOptions['context']; /** Memory configuration for conversation persistence and retrieval */ memory?: AgentExecutionOptions['memory']; /** Unique identifier for this execution run */ runId?: string; /** Request Context containing dynamic configuration and state */ requestContext?: AgentExecutionOptions['requestContext']; /** Maximum number of steps to run */ maxSteps?: number; /** Additional tool sets that can be used for this execution */ toolsets?: AgentExecutionOptions['toolsets']; /** Client-side tools available during execution */ clientTools?: AgentExecutionOptions['clientTools']; /** Tool selection strategy */ toolChoice?: AgentExecutionOptions['toolChoice']; /** Tool names enabled for this execution */ activeTools?: AgentExecutionOptions['activeTools']; /** Model-specific settings like temperature */ modelSettings?: AgentExecutionOptions['modelSettings']; /** Require approval for all tool calls */ requireToolApproval?: boolean; /** Automatically resume suspended tools */ autoResumeSuspendedTools?: boolean; /** Maximum number of tool calls to execute concurrently */ toolCallConcurrency?: number; /** Whether to include raw chunks in the stream output */ includeRawChunks?: boolean; /** Maximum processor retries */ maxProcessorRetries?: number; /** Structured output configuration */ structuredOutput?: AgentExecutionOptions['structuredOutput']; /** Version overrides for sub-agent delegation */ versions?: AgentExecutionOptions['versions']; /** Callback when chunk is received */ onChunk?: (chunk: ChunkType) => void | Promise; /** Callback when step finishes */ onStepFinish?: (result: AgentStepFinishEventData) => void | Promise; /** Callback when execution finishes */ onFinish?: (result: AgentFinishEventData) => void | Promise; /** Callback on error */ onError?: (error: Error) => void | Promise; /** Callback when workflow suspends (e.g., for tool approval) */ onSuspended?: (data: AgentSuspendedEventData) => void | Promise; /** When true, the in-loop background task check step skips waiting (streamUntilIdle sets this) */ _skipBgTaskWait?: boolean; } /** * Result from DurableAgent.stream() */ export interface DurableAgentStreamResult { /** The streaming output */ output: MastraModelOutput; /** The full stream - delegates to output.fullStream for server compatibility */ readonly fullStream: ReadableStream; /** The unique run ID for this execution */ runId: string; /** Thread ID if using memory */ threadId?: string; /** Resource ID if using memory */ resourceId?: string; /** Cleanup function to call when done (unsubscribes from pubsub) */ cleanup: () => void; } /** * Configuration for DurableAgent - wraps an existing Agent with durable execution */ export interface DurableAgentConfig { /** * The Agent to wrap with durable execution capabilities. * All agent methods (getModel, listTools, etc.) delegate to this agent. */ agent: Agent; /** * Optional ID override. Defaults to agent.id. */ id?: TAgentId; /** * Optional name override. Defaults to agent.name. */ name?: string; /** * PubSub instance for streaming events. * Optional - if not provided, defaults to EventEmitterPubSub. */ pubsub?: PubSub; /** * Cache instance for storing stream events. * Enables resumable streams - clients can disconnect and reconnect * without missing events. * * - If not provided: Inherits from Mastra instance, or uses InMemoryServerCache * - If provided: Uses the provided cache backend (e.g., Redis) * - If set to `false`: Disables caching (streams are not resumable) */ cache?: MastraServerCache | false; /** * Maximum steps for the agentic loop. * Defaults to the workflow default if not specified. */ maxSteps?: number; /** * Timeout in milliseconds before automatic cleanup of registry entries * after a stream finishes or errors. This provides a grace period for * late observers to access the stream. * * Defaults to 30000 (30 seconds). * Set to 0 to disable auto-cleanup (manual cleanup() required). */ cleanupTimeoutMs?: number; } /** * DurableAgent wraps an existing Agent with durable execution capabilities. * * Key features: * 1. Resumable streams - clients can disconnect and reconnect without missing events * 2. Serializable workflow inputs - works with durable execution engines * 3. PubSub-based streaming - events flow through pubsub for distribution * * DurableAgent extends Agent, delegating most methods to the wrapped agent. * It overrides stream() to use durable execution with the agentic workflow. * * Subclasses (EventedAgent, InngestAgent) override executeWorkflow() to * customize how the workflow is executed. * * @example * ```typescript * import { Agent } from '@mastra/core/agent'; * import { DurableAgent } from '@mastra/core/agent/durable'; * * const agent = new Agent({ * id: 'my-agent', * instructions: 'You are a helpful assistant', * model: openai('gpt-4'), * }); * * const durableAgent = new DurableAgent({ agent }); * * const { output, runId, cleanup } = await durableAgent.stream('Hello!'); * const text = await output.text; * cleanup(); * ``` */ export declare class DurableAgent extends Agent { #private; /** * Create a new DurableAgent that wraps an existing Agent */ constructor(config: DurableAgentConfig); /** * Get the resolved cache instance. * Lazily initialized to allow inheriting from Mastra. */ get cache(): MastraServerCache | null; /** * Get the PubSub instance. * Returns CachingPubSub if caching is enabled, otherwise the inner pubsub. */ get pubsub(): PubSub; /** * Get the wrapped agent instance. */ get agent(): Agent; /** * Get the run registry (for testing and advanced usage) */ get runRegistry(): ExtendedRunRegistry; /** * Get the max steps configured for this agent */ get maxSteps(): number | undefined; /** * Get the cleanup timeout in milliseconds. * Returns 0 if auto-cleanup is disabled. */ get cleanupTimeoutMs(): number; getModel(options?: any): import("../../_types/@internal_ai-sdk-v4/dist/index.d.ts").LanguageModelV1 | import("..").MastraLanguageModel | Promise; getInstructions(options?: any): import("../../llm").SystemMessage | Promise; listTools(options?: any): TTools | Promise; getMemory(): Promise; getVoice(): Promise>; /** * Get the PubSub instance for use by subclasses. * @internal */ protected get pubsubInternal(): PubSub; /** * Get the run registry for use by subclasses. * @internal */ protected get runRegistryInternal(): ExtendedRunRegistry; /** * Execute the durable workflow. * * Subclasses override this method to customize how the workflow is executed: * - DurableAgent (this): Runs the workflow directly via createRun + start * - EventedAgent: Uses run.startAsync() for fire-and-forget execution * - InngestAgent: Uses inngest.send() to trigger Inngest function * * @param runId - The unique run ID * @param workflowInput - The serialized workflow input * @internal */ protected executeWorkflow(runId: string, workflowInput: DurableAgenticWorkflowInput): Promise; /** * Create the durable workflow for this agent. * * Subclasses can override this method to use a different workflow implementation: * - DurableAgent (this): Uses createDurableAgenticWorkflow() * - InngestAgent: Uses createInngestDurableAgenticWorkflow() * * @internal */ protected createWorkflow(): ReturnType; /** * Emit an error event to pubsub. * * @param runId - The run ID * @param error - The error to emit * @internal */ protected emitError(runId: string, error: Error): Promise; /** * Stream a response from the agent using durable execution. */ stream(messages: MessageListInput, options?: DurableAgentStreamOptions): Promise>; /** * Resume a suspended workflow execution. */ resume(runId: string, resumeData: unknown, options?: { onChunk?: (chunk: ChunkType) => void | Promise; onStepFinish?: (result: AgentStepFinishEventData) => void | Promise; onFinish?: (result: AgentFinishEventData) => void | Promise; onError?: (error: Error) => void | Promise; onSuspended?: (data: AgentSuspendedEventData) => void | Promise; }): Promise>; /** * Observe an existing stream. * Use this to reconnect to a stream after a network disconnection. * * **Warning:** The returned `cleanup()` function destroys the run's registry * entries and cached PubSub events. Only call it when you are done with the * run entirely. If the workflow is suspended and you intend to resume later, * do not call cleanup — let the auto-cleanup timer handle it after * FINISH/ERROR. Auto-cleanup does not fire on SUSPENDED events. */ observe(runId: string, options?: { offset?: number; onChunk?: (chunk: ChunkType) => void | Promise; onStepFinish?: (result: AgentStepFinishEventData) => void | Promise; onFinish?: (result: AgentFinishEventData) => void | Promise; onError?: (error: Error) => void | Promise; onSuspended?: (data: AgentSuspendedEventData) => void | Promise; }): Promise, 'runId'> & { runId: string; }>; /** * Get the workflow instance for direct execution. * Lazily creates the workflow and registers Mastra on it (needed for * getAgentById in execution steps). */ getWorkflow(): import("../../workflows").Workflow[], "durable-agentic-loop", unknown, { __workflowKind: "durable-agent"; runId: string; agentId: string; messageListState: any; toolsMetadata: any[]; modelConfig: { provider: string; modelId: string; specificationVersion?: string | undefined; settings?: Record | undefined; providerOptions?: Record | undefined; }; options: any; state: any; messageId: string; agentName?: string | undefined; modelList?: { id: string; config: { provider: string; modelId: string; specificationVersion?: string | undefined; originalConfig?: string | Record | undefined; providerOptions?: Record | undefined; }; maxRetries: number; enabled: boolean; }[] | undefined; }, { messageListState: any; messageId: string; stepResult: any; output: { usage: any; steps: any[]; text?: string | undefined; }; state: any; }, { messageListState: any; messageId: string; stepResult: any; output: { usage: any; steps: any[]; text?: string | undefined; }; state: any; }, unknown>; /** * Stream until all background tasks complete and the agent is idle. * Mirrors the regular Agent's streamUntilIdle but adapted for durable execution. */ streamUntilIdle(messages: MessageListInput, streamOptions?: DurableAgentStreamOptions & { maxIdleMs?: number; }): Promise>; /** * Prepare for durable execution without starting it. */ prepare(messages: MessageListInput, options?: AgentExecutionOptions): Promise<{ runId: string; messageId: string; workflowInput: DurableAgenticWorkflowInput; registryEntry: import("./types").RunRegistryEntry; threadId: string | undefined; resourceId: string | undefined; }>; /** * Get the durable workflows required by this agent. * Called by Mastra during agent registration. * @internal */ getDurableWorkflows(): import("../../workflows").Workflow[], "durable-agentic-loop", unknown, { __workflowKind: "durable-agent"; runId: string; agentId: string; messageListState: any; toolsMetadata: any[]; modelConfig: { provider: string; modelId: string; specificationVersion?: string | undefined; settings?: Record | undefined; providerOptions?: Record | undefined; }; options: any; state: any; messageId: string; agentName?: string | undefined; modelList?: { id: string; config: { provider: string; modelId: string; specificationVersion?: string | undefined; originalConfig?: string | Record | undefined; providerOptions?: Record | undefined; }; maxRetries: number; enabled: boolean; }[] | undefined; }, { messageListState: any; messageId: string; stepResult: any; output: { usage: any; steps: any[]; text?: string | undefined; }; state: any; }, { messageListState: any; messageId: string; stepResult: any; output: { usage: any; steps: any[]; text?: string | undefined; }; state: any; }, unknown>[]; /** * Set the Mastra instance. * Called by the durable agent registration path in addAgent(). * Delegates to __registerMastra so the pubsub wiring and agent * registration happen regardless of which entry point is called first. * @internal */ __setMastra(mastra: Mastra): void; /** * Register the Mastra instance. * Called by Mastra during agent registration (normal Agent path). * * Also wires mastra.pubsub as the inner pubsub (if the user didn't provide * a custom one), so that the OBSERVE_AGENT_STREAM_ROUTE handler can subscribe * to the same PubSub instance that this agent publishes to. * @internal */ __registerMastra(mastra: Mastra): void; } //# sourceMappingURL=durable-agent.d.ts.map