import { RequestContext } from '../di/index.js'; import type { SerializedError } from '../error/index.js'; import type { PubSub } from '../events/pubsub.js'; import type { ObservabilityContext, Span, SpanType, TracingPolicy } from '../observability/index.js'; import type { ExecutionGraph } from './execution-engine.js'; import { ExecutionEngine } from './execution-engine.js'; import type { ExecuteConditionalParams, ExecuteForeachParams, ExecuteLoopParams, ExecuteParallelParams } from './handlers/control-flow.js'; import type { ExecuteEntryParams, PersistStepUpdateParams } from './handlers/entry.js'; import type { ExecuteSleepParams, ExecuteSleepUntilParams } from './handlers/sleep.js'; import type { ExecuteStepParams } from './handlers/step.js'; import type { ConditionFunction, ConditionFunctionParams, Step } from './step.js'; import type { DefaultEngineType, EntryExecutionResult, ExecutionContext, MutableContext, OutputWriter, RestartExecutionParams, SerializedStepFlowEntry, StepExecutionResult, StepFlowEntry, StepResult, StepTripwireInfo, TimeTravelExecutionParams } from './types.js'; export type { ExecutionContext } from './types.js'; /** * Default implementation of the ExecutionEngine */ export declare class DefaultExecutionEngine extends ExecutionEngine { /** * The retryCounts map is used to keep track of the retry count for each step. * The step id is used as the key and the retry count is the value. */ protected retryCounts: Map; /** * Get or generate the retry count for a step. * If the step id is not in the map, it will be added and the retry count will be 0. * If the step id is in the map, it will return the retry count. * * @param stepId - The id of the step. * @returns The retry count for the step. */ getOrGenerateRetryCount(stepId: Step['id']): number; /** * Check if a step is a nested workflow that requires special handling. * Override this in subclasses to detect platform-specific workflow types. * * @param _step - The step to check * @returns true if the step is a nested workflow, false otherwise */ isNestedWorkflowStep(_step: Step): boolean; /** * Execute the sleep duration. Override to use platform-specific sleep primitives. * * @param duration - The duration to sleep in milliseconds * @param _sleepId - Unique identifier for this sleep operation * @param _workflowId - The workflow ID (for constructing platform-specific IDs) */ executeSleepDuration(duration: number, _sleepId: string, _workflowId: string): Promise; /** * Execute sleep until a specific date. Override to use platform-specific sleep primitives. * * @param date - The date to sleep until * @param _sleepUntilId - Unique identifier for this sleep operation * @param _workflowId - The workflow ID (for constructing platform-specific IDs) */ executeSleepUntilDate(date: Date, _sleepUntilId: string, _workflowId: string): Promise; /** * Wrap a durable operation (like dynamic sleep function evaluation). * Override to add platform-specific durability. * * @param _operationId - Unique identifier for this operation * @param operationFn - The function to execute * @returns The result of the operation */ wrapDurableOperation(_operationId: string, operationFn: () => Promise): Promise; /** * Get the engine context to pass to step execution functions. * Override to provide platform-specific engine primitives (e.g., Inngest step). * * @returns An object containing engine-specific context */ getEngineContext(): Record; /** * Evaluate a single condition for conditional execution. * Override to add platform-specific durability (e.g., Inngest step.run wrapper). * * @param conditionFn - The condition function to evaluate * @param index - The index of this condition * @param context - The execution context for the condition * @param operationId - Unique identifier for this operation * @returns The index if condition is truthy, null otherwise */ evaluateCondition(conditionFn: ConditionFunction, index: number, context: ConditionFunctionParams, operationId: string): Promise; /** * Handle step execution start - emit events and return start timestamp. * Override to add platform-specific durability (e.g., Inngest step.run wrapper). * * @param params - Parameters for step start * @returns The start timestamp (used by some engines like Inngest) */ onStepExecutionStart(params: { step: Step; inputData: any; pubsub: PubSub; executionContext: ExecutionContext; stepCallId: string; stepInfo: Record; operationId: string; skipEmits?: boolean; }): Promise; /** * Execute a nested workflow step. Override to use platform-specific workflow invocation. * This hook is called when isNestedWorkflowStep returns true. * * Default behavior: returns null to indicate the base executeStep should handle it normally. * Inngest overrides this to use inngestStep.invoke() for nested workflows. * * @param params - Parameters for nested workflow execution * @returns StepResult if handled, null if should use default execution */ executeWorkflowStep(_params: ObservabilityContext & { step: Step; stepResults: Record>; executionContext: ExecutionContext; resume?: { steps: string[]; resumePayload: any; runId?: string; }; timeTravel?: TimeTravelExecutionParams; prevOutput: any; inputData: any; pubsub: PubSub; startedAt: number; abortController: AbortController; requestContext: RequestContext; outputWriter?: OutputWriter; stepSpan?: Span; perStep?: boolean; }): Promise | null>; /** * Create a child span for a workflow step. * Override to add durability (e.g., Inngest memoization). * * Default: creates span directly via parent span's createChildSpan. * * @param params - Parameters for span creation * @returns The created span, or undefined if no parent span or tracing disabled */ createStepSpan(params: { parentSpan: Span | undefined; stepId: string; operationId: string; options: { name: string; type: SpanType; input?: unknown; entityType?: string; entityId?: string; tracingPolicy?: TracingPolicy; requestContext?: RequestContext; }; executionContext: ExecutionContext; }): Promise | undefined>; /** * End a workflow step span. * Override to add durability (e.g., Inngest memoization). * * Default: calls span.end() directly. * * @param params - Parameters for ending the span */ endStepSpan(params: { span: Span | undefined; operationId: string; endOptions: { output?: unknown; attributes?: Record; }; }): Promise; /** * Record an error on a workflow step span. * Override to add durability (e.g., Inngest memoization). * * Default: calls span.error() directly. * * @param params - Parameters for recording the error */ errorStepSpan(params: { span: Span | undefined; operationId: string; errorOptions: { error: Error; attributes?: Record; }; }): Promise; /** * Create a generic child span (for control-flow operations like parallel, conditional, loop). * Override to add durability (e.g., Inngest memoization). * * Default: creates span directly via parent span's createChildSpan. * * @param params - Parameters for span creation * @returns The created span, or undefined if no parent span or tracing disabled */ createChildSpan(params: { parentSpan: Span | undefined; operationId: string; options: { name: string; type: SpanType; input?: unknown; attributes?: Record; tracingPolicy?: TracingPolicy; }; executionContext: ExecutionContext; }): Promise | undefined>; /** * End a generic child span (for control-flow operations). * Override to add durability (e.g., Inngest memoization). * * Default: calls span.end() directly. * * @param params - Parameters for ending the span */ endChildSpan(params: { span: Span | undefined; operationId: string; endOptions?: { output?: unknown; attributes?: Record; }; }): Promise; /** * Record an error on a generic child span (for control-flow operations). * Override to add durability (e.g., Inngest memoization). * * Default: calls span.error() directly. * * @param params - Parameters for recording the error */ errorChildSpan(params: { span: Span | undefined; operationId: string; errorOptions: { error: Error; attributes?: Record; }; }): Promise; /** * Execute a step with retry logic. * Default engine: handles retries internally with a loop. * Inngest engine: overrides to throw RetryAfterError for external retry handling. * * @param stepId - Unique identifier for the step (used for durability) * @param runStep - The step execution function to run * @param params - Retry parameters and context * @returns Discriminated union: { ok: true, result: T } or { ok: false, error: ... } */ executeStepWithRetry(stepId: string, runStep: () => Promise, params: { retries: number; delay: number; stepSpan?: Span; workflowId: string; runId: string; }): Promise<{ ok: true; result: T; } | { ok: false; error: { status: 'failed'; error: Error; endedAt: number; tripwire?: StepTripwireInfo; }; }>; /** * Format an error for the workflow result. * Override to customize error formatting (e.g., include stack traces). */ protected formatResultError(error: Error | unknown, lastOutput: StepResult): SerializedError; protected fmtReturnValue(_pubsub: PubSub, stepResults: Record>, lastOutput: StepResult, error?: Error | unknown, stepExecutionPath?: string[]): Promise; /** * Serialize a RequestContext Map to a plain object for JSON serialization. * Used by durable execution engines to persist context across step replays. */ serializeRequestContext(requestContext: RequestContext): Record; /** * Deserialize a plain object back to a RequestContext instance. * Used to restore context after durable execution replay. */ protected deserializeRequestContext(obj: Record): RequestContext; /** * Whether this engine requires requestContext to be serialized for durable operations. * Default engine passes by reference (no serialization needed). * Inngest engine overrides to return true (serialization required for memoization). */ requiresDurableContextSerialization(): boolean; /** * Build MutableContext from current execution state. * This extracts only the fields that can change during step execution. */ buildMutableContext(executionContext: ExecutionContext): MutableContext; /** * Apply mutable context changes back to the execution context. */ applyMutableContext(executionContext: ExecutionContext, mutableContext: MutableContext): void; /** * Executes a workflow run with the provided execution graph and input * @param graph The execution graph to execute * @param input The input data for the workflow * @returns A promise that resolves to the workflow output */ execute(params: { workflowId: string; runId: string; resourceId?: string; disableScorers?: boolean; graph: ExecutionGraph; serializedStepGraph: SerializedStepFlowEntry[]; input?: TInput; initialState?: TState; restart?: RestartExecutionParams; timeTravel?: TimeTravelExecutionParams; resume?: { steps: string[]; stepResults: Record>; resumePayload: any; resumePath: number[]; stepExecutionPath?: string[]; label?: string; forEachIndex?: number; }; pubsub: PubSub; retryConfig?: { attempts?: number; delay?: number; }; requestContext: RequestContext; workflowSpan?: Span; abortController: AbortController; outputWriter?: OutputWriter; format?: 'legacy' | 'vnext' | undefined; outputOptions?: { includeState?: boolean; includeResumeLabels?: boolean; }; perStep?: boolean; /** Trace IDs for creating child spans in durable execution */ tracingIds?: { traceId: string; workflowSpanId: string; }; }): Promise; getStepOutput(stepResults: Record, step?: StepFlowEntry): any; executeSleep(params: ExecuteSleepParams): Promise; executeSleepUntil(params: ExecuteSleepUntilParams): Promise; executeStep(params: ExecuteStepParams): Promise; executeParallel(params: ExecuteParallelParams): Promise>; executeConditional(params: ExecuteConditionalParams): Promise>; executeLoop(params: ExecuteLoopParams): Promise>; executeForeach(params: ExecuteForeachParams): Promise>; persistStepUpdate(params: PersistStepUpdateParams): Promise; executeEntry(params: ExecuteEntryParams): Promise; } //# sourceMappingURL=default.d.ts.map