import { ReadableStream } from 'node:stream/web'; import type { MastraPrimitives } from '../action/index.js'; import { Agent } from '../agent/index.js'; import type { AgentExecutionOptions, AgentStreamOptions } from '../agent/index.js'; import { MastraBase } from '../base.js'; import { RequestContext } from '../di/index.js'; import type { MastraScorers } from '../evals/index.js'; import type { PubSub } from '../events/pubsub.js'; import type { IMastraLogger } from '../logger/index.js'; import type { Mastra } from '../mastra/index.js'; import type { ObservabilityContext, TracingOptions, TracingPolicy } from '../observability/index.js'; import type { Processor } from '../processors/index.js'; import { ProcessorStepOutputSchema, ProcessorStepInputSchema } from '../processors/step-schema.js'; import type { InferPublicSchema, InferStandardSchemaOutput, PublicSchema, StandardSchemaWithJSON } from '../schema/index.js'; import type { StorageListWorkflowRunsInput } from '../storage/index.js'; import { WorkflowRunOutput } from '../stream/RunOutput.js'; import { Tool } from '../tools/index.js'; import type { ToolExecutionContext } from '../tools/index.js'; import type { DynamicArgument } from '../types/index.js'; import { PUBSUB_SYMBOL } from './constants.js'; import type { ExecutionEngine, ExecutionGraph } from './execution-engine.js'; import type { ConditionFunction, ExecuteFunction, InnerOutput, LoopConditionFunction, Step, SuspendOptions } from './step.js'; import type { DefaultEngineType, DynamicMapping, ExtractSchemaFromStep, ExtractSchemaType, PathsToStringProps, SerializedStepFlowEntry, StepFlowEntry, StepResult, StepsRecord, StepWithComponent, StreamEvent, SubsetOf, TimeTravelContext, WorkflowConfig, WorkflowEngineType, WorkflowOptions, WorkflowResult, WorkflowType, WorkflowRunStatus, WorkflowState, WorkflowStateField, WorkflowStreamEvent, StepParams, OutputWriter, StepMetadata, WorkflowRunStartOptions } from './types.js'; export type AgentStepOptions = Omit & AgentStreamOptions, 'format' | 'tracingContext' | 'requestContext' | 'abortSignal' | 'context' | 'onStepFinish' | 'output' | 'experimental_output' | 'resourceId' | 'threadId' | 'scorers'>; export declare function mapVariable>({ step, path, }: { step: TStep; path: PathsToStringProps>> | '.'; }): { step: TStep; path: PathsToStringProps>> | '.'; }; export declare function mapVariable({ initData: TWorkflow, path, }: { initData: TWorkflow; path: PathsToStringProps>> | '.'; }): { initData: TWorkflow; path: PathsToStringProps>> | '.'; }; /** * Creates a step from explicit params (IMPORTANT: FIRST overload for best error messages when using .then in workflows) * @param params Configuration parameters for the step * @param params.id Unique identifier for the step * @param params.description Optional description of what the step does * @param params.inputSchema Zod schema defining the input structure * @param params.outputSchema Zod schema defining the output structure * @param params.execute Function that performs the step's operations * @returns A Step object that can be added to the workflow */ export declare function createStep(params: StepParams): Step : unknown, InferPublicSchema, InferPublicSchema, TResumeSchema extends PublicSchema ? InferPublicSchema : unknown, TSuspendSchema extends PublicSchema ? InferPublicSchema : unknown, DefaultEngineType, TRequestContextSchema extends PublicSchema ? InferPublicSchema : unknown>; /** * Creates a step from an agent (defaults to { text: string } output) */ export declare function createStep(agent: Agent, agentOptions?: Omit, 'structuredOutput'> & { structuredOutput?: never; retries?: number; scorers?: DynamicArgument; }): Step; /** * Creates a step from an agent with structured output */ export declare function createStep(agent: Agent, agentOptions: Omit, 'structuredOutput'> & { structuredOutput: { schema: StandardSchemaWithJSON; }; retries?: number; scorers?: DynamicArgument; metadata?: StepMetadata; }): Step; /** * Creates a step from a tool */ export declare function createStep, TId extends string, TRequestContext extends Record | unknown = unknown>(tool: Tool, toolOptions?: { retries?: number; scorers?: DynamicArgument; metadata?: StepMetadata; }): Step; /** * Creates a step from a Processor - wraps a Processor as a workflow step * Note: We require at least one processor method to distinguish from StepParams */ export declare function createStep(processor: (Processor & { processInput: Function; }) | (Processor & { processInputStream: Function; }) | (Processor & { processInputStep: Function; }) | (Processor & { processOutputStream: Function; }) | (Processor & { processOutputResult: Function; }) | (Processor & { processOutputStep: Function; })): Step<`processor:${TProcessorId}`, unknown, InferPublicSchema, InferPublicSchema, unknown, unknown, DefaultEngineType>; /** * IMPORTANT: Fallback overload - provides better error messages when StepParams doesn't match * This should be LAST and will show clearer errors about what's wrong * This is a copy of first one, KEEP THIS IN SYNC! */ export declare function createStep(params: StepParams): Step : unknown, InferPublicSchema, InferPublicSchema, TResumeSchema extends PublicSchema ? InferPublicSchema : unknown, TSuspendSchema extends PublicSchema ? InferPublicSchema : unknown, DefaultEngineType, TRequestContextSchema extends PublicSchema ? InferPublicSchema : unknown>; export declare function cloneStep(step: Step, opts: { id: TStepId; }): Step; /** * Type guard to check if an object is a Processor. * A Processor must have an 'id' property and at least one processor method. */ export declare function isProcessor(obj: unknown): obj is Processor; /** * A Workflow with all type parameters erased. * Use this instead of manually specifying `Workflow` so that * adding or removing type parameters only requires updating one place. */ export type AnyWorkflow = Workflow; export declare function createWorkflow[] = Step[], TRequestContext extends Record | unknown = unknown>(params: WorkflowConfig): Workflow; export declare function cloneWorkflow[] = Step[], TPrevSchema = TInput>(workflow: Workflow, opts: { id: TWorkflowId; }): Workflow; export declare class Workflow[] = Step[], TWorkflowId extends string = string, TState = unknown, TInput = unknown, TOutput = unknown, TPrevSchema = TInput, TRequestContext extends Record | unknown = unknown> extends MastraBase implements Step { #private; id: TWorkflowId; description?: string | undefined; inputSchema: StandardSchemaWithJSON; outputSchema: StandardSchemaWithJSON; stateSchema?: StandardSchemaWithJSON; requestContextSchema?: StandardSchemaWithJSON; steps: Record; stepDefs?: TSteps; engineType: WorkflowEngineType; /** Type of workflow - 'processor' for processor workflows, 'default' otherwise */ type: WorkflowType; committed: boolean; protected stepFlow: StepFlowEntry[]; protected serializedStepFlow: SerializedStepFlowEntry[]; protected executionEngine: ExecutionEngine; protected executionGraph: ExecutionGraph; retryConfig: { attempts?: number; delay?: number; }; constructor({ mastra, id, inputSchema, outputSchema, stateSchema, requestContextSchema, description, executionEngine, retryConfig, steps, options, type, }: WorkflowConfig); get runs(): Map>; get mastra(): Mastra>, Record, Record>, Record, IMastraLogger, Record>, Record>, Record>, Record>, Record> | undefined; get options(): Omit & Required>; __registerMastra(mastra: Mastra): void; __registerPrimitives(p: MastraPrimitives): void; __setLogger(logger: IMastraLogger): void; setStepFlow(stepFlow: StepFlowEntry[]): void; /** * Adds a step to the workflow * @param step The step to add to the workflow * @returns The workflow instance for chaining * * The step's inputSchema must be satisfied by the previous step's output (or workflow input for first step). * This means: TPrevSchema must be assignable to TStepInput */ then(step: Step, TPrevSchema extends TStepInput ? TStepInput : TPrevSchema, TSchemaOut, any, any, TEngineType, any>): Workflow; /** * Adds a sleep step to the workflow * @param duration The duration to sleep for * @returns The workflow instance for chaining */ sleep(duration: number | ExecuteFunction): Workflow; /** * Adds a sleep until step to the workflow * @param date The date to sleep until * @returns The workflow instance for chaining */ sleepUntil(date: Date | ExecuteFunction): Workflow; /** * @deprecated waitForEvent has been removed. Please use suspend/resume instead. */ waitForEvent(_event: string, _step: Step, TStepInputSchema, TSchemaOut, any, any, TEngineType>, _opts?: { timeout?: number; }): void; map(mappingConfig: { [k: string]: { step: Step | Step[]; path: string; } | { value: any; schema: PublicSchema; } | { initData: Workflow; path: string; } | { requestContextPath: string; schema: PublicSchema; } | DynamicMapping; } | ExecuteFunction, stepOptions?: { id?: string | null; }): Workflow; parallel[]>(steps: TParallelSteps & { [K in keyof TParallelSteps]: TParallelSteps[K] extends Step ? Step, TPrevSchema, O, any, any, TEngineType> : `Error: Expected Step with state schema that is a subset of workflow state`; }): Workflow]: InferStandardSchemaOutput[K]["outputSchema"]>; }, TRequestContext>; branch, Step ]>>(steps: TBranchSteps): Workflow]?: InferStandardSchemaOutput[K]["outputSchema"]>; }, TRequestContext>; dowhile(step: Step, TStepInputSchema, TSchemaOut, any, any, TEngineType>, condition: LoopConditionFunction): Workflow; dountil(step: Step, TStepInputSchema, TSchemaOut, any, any, TEngineType>, condition: LoopConditionFunction): Workflow; foreach(step: TPrevIsArray extends true ? Step, TStepInputSchema, TSchemaOut, any, any, TEngineType> : 'Previous step must return an array type', opts?: { concurrency: number; }): Workflow; /** * Builds the execution graph for this workflow * @returns The execution graph that can be used to execute the workflow */ buildExecutionGraph(): ExecutionGraph; /** * Finalizes the workflow definition and prepares it for execution * This method should be called after all steps have been added to the workflow * @returns A built workflow instance ready for execution */ commit(): Workflow; get stepGraph(): StepFlowEntry[]; get serializedStepGraph(): SerializedStepFlowEntry[]; /** * Creates a new workflow run instance and stores a snapshot of the workflow in the storage * @param options Optional configuration for the run * @param options.runId Optional custom run ID, defaults to a random UUID * @param options.resourceId Optional resource ID to associate with this run * @param options.disableScorers Optional flag to disable scorers for this run * @returns A Run instance that can be used to execute the workflow */ createRun(options?: { runId?: string; resourceId?: string; disableScorers?: boolean; }): Promise>; listScorers({ requestContext, }?: { requestContext?: RequestContext; }): Promise; execute({ runId, inputData, resumeData, state, setState, suspend, restart, resume, timeTravel, [PUBSUB_SYMBOL]: pubsub, mastra, requestContext, abort, abortSignal, retryCount, outputWriter, validateInputs, perStep, engine: _engine, bail: _bail, ...rest }: { runId?: string; inputData: TInput; resumeData?: unknown; state: TState; setState: (state: TState) => Promise; suspend: (suspendPayload: any, suspendOptions?: SuspendOptions) => InnerOutput | Promise; restart?: boolean; timeTravel?: { inputData?: TInput; steps: string[]; nestedStepResults?: Record>>; resumeData?: any; }; resume?: { steps: string[]; resumePayload: any; runId?: string; label?: string; forEachIndex?: number; }; [PUBSUB_SYMBOL]: PubSub; mastra: Mastra; requestContext?: RequestContext; engine: DefaultEngineType; abortSignal: AbortSignal; bail: (result: any) => any; abort: () => any; retryCount?: number; outputWriter?: OutputWriter; validateInputs?: boolean; perStep?: boolean; } & Partial): Promise; listWorkflowRuns(args?: StorageListWorkflowRunsInput): Promise; listActiveWorkflowRuns(): Promise<{ runs: import("../storage").WorkflowRun[]; total: number; }>; restartAllActiveWorkflowRuns(): Promise; deleteWorkflowRunById(runId: string): Promise; protected getWorkflowRunSteps({ runId, workflowId }: { runId: string; workflowId: string; }): Promise>>; /** * Get a workflow run by ID with processed execution state and metadata. * * @param runId - The unique identifier of the workflow run * @param options - Configuration options for the result * @param options.withNestedWorkflows - Whether to include nested workflow steps (default: true) * @param options.fields - Specific fields to return (for performance optimization) * @returns The workflow run result with metadata and processed execution state, or null if not found */ getWorkflowRunById(runId: string, options?: { withNestedWorkflows?: boolean; fields?: WorkflowStateField[]; }): Promise; } /** * Represents a workflow run that can be executed */ export declare class Run[] = Step[], TState = unknown, TInput = unknown, TOutput = unknown, TRequestContext extends Record | unknown = unknown> { #private; protected pubsub: PubSub; /** * Unique identifier for this workflow */ readonly workflowId: string; /** * Unique identifier for this run */ readonly runId: string; /** * Unique identifier for the resource this run is associated with */ readonly resourceId?: string; /** * Whether to disable scorers for this run */ readonly disableScorers?: boolean; /** * Options around how to trace this run */ readonly tracingPolicy?: TracingPolicy; /** * Options around how to trace this run */ readonly validateInputs?: boolean; /** * Internal state of the workflow run */ protected state: Record; /** * The execution engine for this run */ executionEngine: ExecutionEngine; /** * The execution graph for this run */ executionGraph: ExecutionGraph; /** * The serialized step graph for this run */ serializedStepGraph: SerializedStepFlowEntry[]; /** * The steps for this workflow */ readonly workflowSteps: Record; workflowRunStatus: WorkflowRunStatus; readonly workflowEngineType: WorkflowEngineType; get mastra(): Mastra>, Record, Record>, Record, IMastraLogger, Record>, Record>, Record>, Record>, Record> | undefined; streamOutput?: WorkflowRunOutput>; protected closeStreamAction?: () => Promise; protected executionResults?: Promise>; protected stateSchema?: StandardSchemaWithJSON; protected inputSchema?: StandardSchemaWithJSON; protected requestContextSchema?: StandardSchemaWithJSON; protected cleanup?: () => void; protected retryConfig?: { attempts?: number; delay?: number; }; constructor(params: { workflowId: string; runId: string; resourceId?: string; stateSchema?: StandardSchemaWithJSON; inputSchema?: StandardSchemaWithJSON; requestContextSchema?: StandardSchemaWithJSON; executionEngine: ExecutionEngine; executionGraph: ExecutionGraph; mastra?: Mastra; retryConfig?: { attempts?: number; delay?: number; }; cleanup?: () => void; serializedStepGraph: SerializedStepFlowEntry[]; disableScorers?: boolean; tracingPolicy?: TracingPolicy; workflowSteps: Record; validateInputs?: boolean; workflowEngineType: WorkflowEngineType; }); get abortController(): AbortController; /** * Cancels the workflow execution. * This aborts any running execution and updates the workflow status to 'canceled' in storage. */ cancel(): Promise; protected _validateInput(inputData?: TInput): Promise; protected _validateInitialState(initialState?: TState): Promise; protected _validateRequestContext(requestContext?: RequestContext): Promise; protected _validateResumeData(resumeData: TResume, suspendedStep?: StepWithComponent): Promise; protected _validateTimetravelInputData(inputData: TInput, step: Step): Promise; protected _start({ inputData, initialState, requestContext, outputWriter, tracingOptions, format, outputOptions, perStep, ...rest }: (TInput extends unknown ? { inputData?: TInput; } : { inputData: TInput; }) & (TState extends unknown ? { initialState?: TState; } : { initialState: TState; }) & { requestContext?: RequestContext; outputWriter?: OutputWriter; tracingOptions?: TracingOptions; format?: 'legacy' | 'vnext' | undefined; outputOptions?: { includeState?: boolean; includeResumeLabels?: boolean; }; perStep?: boolean; } & Partial): Promise>; /** * Starts the workflow execution with the provided input * @param input The input data for the workflow * @returns A promise that resolves to the workflow output */ start(args: (TInput extends unknown ? { inputData?: TInput; } : { inputData: TInput; }) & (TState extends unknown ? { initialState?: TState; } : { initialState: TState; }) & { requestContext?: RequestContext; } & WorkflowRunStartOptions): Promise>; /** * Starts the workflow execution without waiting for completion (fire-and-forget). * Returns immediately with the runId. The workflow executes in the background. * Use this when you don't need to wait for the result or want to avoid polling failures. * @param args The input data and configuration for the workflow * @returns A promise that resolves immediately with the runId */ startAsync(args: (TInput extends unknown ? { inputData?: TInput; } : { inputData: TInput; }) & (TState extends unknown ? { initialState?: TState; } : { initialState: TState; }) & { requestContext?: RequestContext; } & WorkflowRunStartOptions): Promise<{ runId: string; }>; /** * Starts the workflow execution with the provided input as a stream * @param input The input data for the workflow * @returns A promise that resolves to the workflow output */ streamLegacy({ inputData, requestContext, onChunk, tracingOptions, ...rest }?: (TInput extends unknown ? { inputData?: TInput; } : { inputData: TInput; }) & { requestContext?: RequestContext; onChunk?: (chunk: StreamEvent) => Promise; tracingOptions?: TracingOptions; } & Partial): { stream: ReadableStream; getWorkflowState: () => Promise>; }; /** * Observe the workflow stream * @returns A readable stream of the workflow events */ observeStreamLegacy(): { stream: ReadableStream; }; /** * Observe the workflow stream * @returns A readable stream of the workflow events */ observeStream(): ReadableStream; /** * Starts the workflow execution with the provided input as a stream * @param input The input data for the workflow * @returns A promise that resolves to the workflow output */ stream({ inputData, requestContext, tracingOptions, closeOnSuspend, initialState, outputOptions, perStep, ...rest }: (TInput extends unknown ? { inputData?: TInput; } : { inputData: TInput; }) & (TState extends unknown ? { initialState?: TState; } : { initialState: TState; }) & { requestContext?: RequestContext; tracingOptions?: TracingOptions; closeOnSuspend?: boolean; outputOptions?: { includeState?: boolean; includeResumeLabels?: boolean; }; perStep?: boolean; } & Partial): WorkflowRunOutput>; /** * Resumes the workflow execution with the provided input as a stream * @param input The input data for the workflow * @returns A promise that resolves to the workflow output */ resumeStream({ step, resumeData, requestContext, tracingOptions, forEachIndex, outputOptions, perStep, ...rest }?: { resumeData?: TResume; step?: Step | [ ...Step[], Step ] | string | string[]; requestContext?: RequestContext; tracingOptions?: TracingOptions; forEachIndex?: number; outputOptions?: { includeState?: boolean; includeResumeLabels?: boolean; }; perStep?: boolean; } & Partial): WorkflowRunOutput>; /** * @internal */ watch(cb: (event: WorkflowStreamEvent) => void): () => void; /** * @internal */ watchAsync(cb: (event: WorkflowStreamEvent) => void): Promise<() => void>; resume(params: { resumeData?: TResume; step?: Step | [ ...Step[], Step ] | string | string[]; label?: string; requestContext?: RequestContext; retryCount?: number; tracingOptions?: TracingOptions; outputWriter?: OutputWriter; outputOptions?: { includeState?: boolean; includeResumeLabels?: boolean; }; forEachIndex?: number; perStep?: boolean; } & Partial): Promise>; /** * Restarts the workflow execution that was previously active * @returns A promise that resolves to the workflow output */ restart(args?: { requestContext?: RequestContext; outputWriter?: OutputWriter; tracingOptions?: TracingOptions; } & Partial): Promise>; protected _resume(params: { resumeData?: TResume; step?: Step | [ ...Step[], Step ] | string | string[]; label?: string; requestContext?: RequestContext; retryCount?: number; tracingOptions?: TracingOptions; outputWriter?: OutputWriter; format?: 'legacy' | 'vnext' | undefined; isVNext?: boolean; outputOptions?: { includeState?: boolean; includeResumeLabels?: boolean; }; forEachIndex?: number; perStep?: boolean; } & Partial): Promise>; protected _restart({ requestContext, outputWriter, tracingOptions, ...rest }: { requestContext?: RequestContext; outputWriter?: OutputWriter; tracingOptions?: TracingOptions; } & Partial): Promise>; protected _timeTravel({ inputData, resumeData, initialState, step: stepParam, context, nestedStepsContext, requestContext, outputWriter, tracingOptions, outputOptions, perStep, ...rest }: { inputData?: TInput; resumeData?: any; initialState?: TState; step: Step | [ ...Step[], Step ] | string | string[]; context?: TimeTravelContext; nestedStepsContext?: Record>; requestContext?: RequestContext; outputWriter?: OutputWriter; tracingOptions?: TracingOptions; outputOptions?: { includeState?: boolean; includeResumeLabels?: boolean; }; perStep?: boolean; } & Partial): Promise>; timeTravel(args: { inputData?: TInput; resumeData?: any; initialState?: TState; step: Step | [ ...Step[], Step ] | string | string[]; context?: TimeTravelContext; nestedStepsContext?: Record>; requestContext?: RequestContext; outputWriter?: OutputWriter; tracingOptions?: TracingOptions; outputOptions?: { includeState?: boolean; includeResumeLabels?: boolean; }; perStep?: boolean; } & Partial): Promise>; timeTravelStream({ inputData, resumeData, initialState, step, context, nestedStepsContext, requestContext, tracingOptions, outputOptions, perStep, ...rest }: { inputData?: TTravelInput; initialState?: TState; resumeData?: any; step: Step | [ ...Step[], Step ] | string | string[]; context?: TimeTravelContext; nestedStepsContext?: Record>; requestContext?: RequestContext; tracingOptions?: TracingOptions; outputOptions?: { includeState?: boolean; includeResumeLabels?: boolean; }; perStep?: boolean; } & Partial): WorkflowRunOutput>; /** * @access private * @returns The execution results of the workflow run */ _getExecutionResults(): Promise> | undefined; } //# sourceMappingURL=workflow.d.ts.map