import { processBatches } from './batchUtil.js'; import { AxFlowDependencyAnalyzer } from './dependencyAnalyzer.js'; import type { AxFlowExecutionStep, AxFlowParallelGroup, AxFlowStepFunction, } from './types.js'; /** * Builds and manages the execution plan with automatic parallelization. * * This class is the core of AxFlow's performance optimization system. * It analyzes the dependency relationships between steps and creates * an optimized execution plan that maximizes parallelism while ensuring * correct execution order. * * Key responsibilities: * 1. **Dependency Analysis**: Tracks what fields each step depends on and produces * 2. **Parallel Grouping**: Groups independent steps that can run simultaneously * 3. **Execution Optimization**: Creates optimized execution functions that * run parallel groups concurrently * 4. **Signature Inference**: Provides data for automatic signature generation * * The planner works by building a directed acyclic graph (DAG) of dependencies * and then creating execution levels where all steps in a level can run in parallel. */ export class AxFlowExecutionPlanner { private steps: AxFlowExecutionStep[] = []; private parallelGroups: AxFlowParallelGroup[] = []; private readonly analyzer = new AxFlowDependencyAnalyzer(); private initialFields: Set = new Set(); /** * Adds an execution step to the plan for analysis and optimization. * * This method is called for every operation in the flow (execute, map, merge, etc.) * and performs dependency analysis to understand what the step needs and produces. * This information is crucial for building the parallel execution plan. * * The method handles different types of steps: * - **Execute steps**: LLM node operations that depend on specific state fields * - **Map steps**: Transformations that modify the state object * - **Merge steps**: Operations that combine results from branches or parallel operations * - **Other steps**: Generic operations that don't fit other categories * * @param stepFunction - The actual function to execute for this step * @param nodeName - Name of the node (for execute steps) * @param mapping - Function that maps state to node inputs (for execute steps) * @param stepType - Type of step for specialized analysis * @param mapTransform - Transformation function (for map steps) * @param mergeOptions - Options for merge operations (result key, merge function) */ addExecutionStep( stepFunction: AxFlowStepFunction, nodeName?: string, mapping?: (state: any) => any, stepType?: | 'execute' | 'map' | 'merge' | 'parallel-map' | 'parallel' | 'derive', mapTransform?: (state: any) => any, mergeOptions?: { resultKey?: string; mergeFunction?: (...args: any[]) => any; }, deriveOptions?: { inputFieldName: string; outputFieldName: string; batchSize?: number; } ): void { let dependencies: string[] = []; let produces: string[] = []; let type: | 'execute' | 'map' | 'merge' | 'parallel-map' | 'parallel' | 'derive' = stepType || 'map'; if (nodeName && mapping) { type = 'execute'; dependencies = this.analyzer.analyzeMappingDependencies( mapping, nodeName ); // For execute steps, we know the result will be stored as `${nodeName}Result` produces = [`${nodeName}Result`]; } else if (type === 'map' && mapTransform) { // Analyze map transformation to determine what fields it produces const mapOutputFields = this.analyzeMapTransformation(mapTransform); produces = mapOutputFields; dependencies = this.getAllProducedFields(); } else if (type === 'parallel-map') { // Parallel map operations produce fields from all transforms if (Array.isArray(mapTransform)) { // Multiple transforms - analyze each one const allFields = new Set(); for (const transform of mapTransform) { const fields = this.analyzeMapTransformation(transform); fields.forEach((f) => allFields.add(f)); } produces = Array.from(allFields); } else if (mapTransform) { // Single transform produces = this.analyzeMapTransformation(mapTransform); } else { // No transform provided, use default produces = ['_parallelMapResult']; } dependencies = this.getAllProducedFields(); } else if (type === 'merge') { // Merge operations produce their result key or merge all previous results if (mergeOptions?.resultKey) { produces = [mergeOptions.resultKey]; } else { // Branch merge - analyze what fields the branches produce const branchFields = this.analyzeBranchMergeFields(); produces = branchFields.length > 0 ? branchFields : ['_mergedResult']; } // Check if this is a parallel merge step by looking at the function code const funcCode = stepFunction.toString(); if (funcCode.includes('_parallelResults')) { dependencies = ['_parallelResults']; } else { dependencies = this.getAllProducedFields(); } } else if (type === 'parallel') { // Parallel operations produce _parallelResults for merge step produces = ['_parallelResults']; dependencies = this.getAllProducedFields(); } else if (type === 'derive') { // Derive operations produce the specified output field and depend on the input field if (deriveOptions?.outputFieldName && deriveOptions?.inputFieldName) { produces = [deriveOptions.outputFieldName]; // Also analyze the transform function for other dependencies const otherDependencies = mapTransform ? this.analyzer.analyzeMappingDependencies(mapTransform, 'derive') : []; dependencies = [ deriveOptions.inputFieldName, ...otherDependencies, ].filter((v, i, a) => a.indexOf(v) === i); // Ensure uniqueness } else { // Fallback if options not provided properly produces = ['_deriveResult']; dependencies = this.getAllProducedFields(); } } else if (stepFunction.toString().includes('transform(')) { type = 'map'; // Fallback: Map steps are harder to analyze statically, assume they depend on all previous steps dependencies = this.getAllProducedFields(); produces = ['_mapResult']; } else if (stepFunction.toString().includes('_parallelResults')) { // This is likely a parallel step that produces _parallelResults produces = ['_parallelResults']; dependencies = this.getAllProducedFields(); } // Infer initial fields from dependencies // A field is an initial field if it's consumed by a step but not produced by any previous step for (const dep of dependencies) { if (!this.getAllProducedFields().includes(dep)) { this.initialFields.add(dep); } } const step: AxFlowExecutionStep = { type, nodeName, dependencies, produces, stepFunction, stepIndex: this.steps.length, }; this.steps.push(step); // Don't rebuild parallel groups during construction - only after initial fields are set // this.rebuildParallelGroups() } /** * Analyzes a step function to determine what fields it produces. * * This method analyzes the step function to understand what new fields * it adds to the state. It uses a mock state approach: * 1. Creates a mock state with sample data * 2. Runs the step function on the mock state * 3. Compares the result to see what fields were added * * @param stepFunction - The step function to analyze * @returns Array of field names that the step function produces */ private analyzeStepFunctionProduction( stepFunction: AxFlowStepFunction ): string[] { // For step functions, we should primarily rely on source analysis // since they might contain complex AI operations that we can't mock safely try { const sourceAnalysis = this.analyzeStepFunctionSource(stepFunction); if ( sourceAnalysis.length > 0 && !sourceAnalysis.includes('_stepResult') ) { return sourceAnalysis; } } catch (error) { console.debug('Step function source analysis failed:', error); } // Only try dynamic analysis for simple functions try { const mockState = this.createMockState(); const originalKeys = Object.keys(mockState); // Create a mock context for the step function const mockContext = { mainAi: { getOptions: () => ({ trace: false }), forward: () => Promise.resolve({ text: 'mock' }), } as any, mainOptions: undefined, }; const result = stepFunction(mockState, mockContext); // Handle async functions if (result && typeof result === 'object' && 'then' in result) { // For async functions, we can't easily determine the output // Fall back to analyzing the function source code return this.analyzeStepFunctionSource(stepFunction); } if (result && typeof result === 'object' && !Array.isArray(result)) { const newKeys = Object.keys(result); const addedFields = newKeys.filter( (key) => !originalKeys.includes(key) ); if (addedFields.length > 0) { return addedFields; } } } catch (error) { // If analysis fails, fall back to source analysis console.debug('Step function dynamic analysis failed:', error); } return this.analyzeStepFunctionSource(stepFunction); } /** * Analyzes step function source code to determine what fields it produces. * * @param stepFunction - The step function to analyze * @returns Array of field names that the step function produces */ private analyzeStepFunctionSource( stepFunction: AxFlowStepFunction ): string[] { try { const source = stepFunction.toString(); // Look for patterns like: { ...state, fieldName: value } const fieldAssignments = source.match( /\{\s*\.\.\.state\s*,\s*(\w+)\s*:/g ); if (fieldAssignments) { const fields = fieldAssignments .map((assignment) => { const match = assignment.match(/(\w+)\s*:/); return match ? match[1] : null; }) .filter(Boolean); if (fields.length > 0) { return fields as string[]; } } // Look for direct property assignments like: state.fieldName = value const propertyAssignments = source.match(/state\.(\w+)\s*=/g); if (propertyAssignments) { const fields = propertyAssignments .map((assignment) => { const match = assignment.match(/state\.(\w+)\s*=/); return match ? match[1] : null; }) .filter(Boolean); if (fields.length > 0) { return fields as string[]; } } } catch (error) { console.debug('Step function source analysis failed:', error); } // Fallback to a generic field name return ['_stepResult']; } /** * Analyzes a map transformation function to determine what fields it produces. * * This is a challenging problem because map transformations can produce arbitrary * new fields based on complex logic. The method uses a mock state approach: * 1. Creates a mock state with sample data * 2. Runs the transformation on the mock state * 3. Analyzes the result to see what fields were produced * * This approach works for most common transformation patterns but may miss * edge cases where the transformation behavior depends on specific data values. * * @param mapTransform - The map transformation function to analyze * @returns Array of field names that the transformation produces */ private analyzeMapTransformation( mapTransform: (state: any) => any ): string[] { // Never execute user transforms here to avoid side effects during planning. // Fallback to source analysis which is safe and conservative. try { const src = mapTransform.toString(); const _fields = src .split(/\{[\s\S]*?\}/) // crude block split .flatMap(() => []); // If we can’t infer safely, return generic field to keep planner robust } catch { /* ignore */ } return ['_mapResult']; } /** * Creates a mock state with sample data for transformation analysis. * * This method builds a representative state object that includes: * - Initial fields from the flow input * - Result fields from previous steps with realistic structure * - Sample data that allows transformations to execute * * The mock state is used to run map transformations in a controlled * environment to determine what fields they produce. * * @returns Mock state object with sample data */ private createMockState(): any { const mockState: any = {}; // Add initial fields for (const field of this.initialFields) { mockState[field] = this.createMockValue(field); } // Add produced fields from previous steps for (const step of this.steps) { for (const field of step.produces) { if (field.endsWith('Result')) { mockState[field] = { // Add common result field patterns text: 'mockText', value: 'mockValue', result: 'mockResult', data: 'mockData', // Add specific field names that tests might expect processedText: 'mockProcessedText', sentimentValue: 'mockSentiment', confidenceScore: 0.8, isComplex: false, mockValue: 'mockValue', responseText: 'mockResponseText', inputText: 'mockInputText', }; } else { mockState[field] = this.createMockValue(field); } } } return mockState; } /** * Creates appropriate mock values based on field names and patterns. */ private createMockValue(fieldName: string): any { // Handle array fields if ( fieldName.includes('List') || fieldName.includes('Array') || fieldName.endsWith('s') ) { return ['mockItem1', 'mockItem2']; } // Handle numeric fields if ( fieldName.includes('count') || fieldName.includes('Count') || fieldName.includes('index') || fieldName.includes('Index') ) { return 0; } // Handle boolean fields if ( fieldName.includes('is') || fieldName.includes('has') || fieldName.includes('can') ) { return false; } // Default to string return 'mockValue'; } /** * Analyzes what fields are produced by conditional merge operations. * * Conditional merges are complex because they don't transform data like map operations, * but instead select which branch's results to use based on a condition. * The challenge is determining what fields will be available after the merge * without knowing which branch will be taken at runtime. * * This method uses heuristics to determine the likely output fields: * 1. Look at recent execute steps (likely branch operations) * 2. If found, use their output fields as potential merge results * 3. Fallback to all execute step fields if no recent pattern is found * * The analysis assumes that branches in a conditional merge will produce * similar types of fields, so we can use any branch's fields as representative * of what the merge might produce. * * @returns string[] - Array of field names that the merge operation might produce */ private analyzeBranchMergeFields(): string[] { // Look at the last few steps to find execute steps that would be merged // We focus on recent steps because they're more likely to be part of the // current branch structure being merged const recentExecuteSteps = this.steps .slice(-5) // Look at last 5 steps .filter((step) => step.type === 'execute' && step.nodeName) .flatMap((step) => step.produces); if (recentExecuteSteps.length > 0) { return recentExecuteSteps; } // Fallback: return all execute step fields // This is a broader approach when we can't identify recent branch patterns // It includes all possible fields that could be produced by any node return this.steps .filter((step) => step.type === 'execute' && step.nodeName) .flatMap((step) => step.produces); } /** * Sets the initial fields and triggers parallel group rebuilding. * * This method is called once the flow knows what input fields are available. * It triggers the parallel group analysis which determines the optimal * execution strategy for the entire flow. * * @param fields - Array of field names available at the start of execution */ setInitialFields(fields: string[]): void { this.initialFields = new Set(fields); this.rebuildParallelGroups(); } /** * Rebuilds the parallel execution groups based on step dependencies. * * This is the core algorithm that creates the parallel execution plan. * It uses a level-by-level approach: * * 1. **Level 0**: Steps with no dependencies (can run immediately) * 2. **Level 1**: Steps that depend only on Level 0 outputs * 3. **Level N**: Steps that depend on outputs from previous levels * * Steps within the same level can run in parallel because they don't * depend on each other's outputs. * * The algorithm ensures: * - Correct execution order (dependencies are satisfied) * - Maximum parallelism (independent steps run simultaneously) * - Deadlock prevention (circular dependencies are detected) * * Time complexity: O(n²) where n is the number of steps * Space complexity: O(n) for tracking processed steps and available fields */ private rebuildParallelGroups(): void { this.parallelGroups = []; const processedSteps = new Set(); const availableFields = new Set(this.initialFields); let currentLevel = 0; while (processedSteps.size < this.steps.length) { const currentLevelSteps: AxFlowExecutionStep[] = []; // Find all steps that can run at this level for (const step of this.steps) { if (processedSteps.has(step.stepIndex)) continue; // Check if all dependencies are available const canRun = step.dependencies.length === 0 || step.dependencies.every((dep) => availableFields.has(dep)); if (canRun) { // Special handling for merge steps - they should run in their own group // to ensure they see the results from the previous parallel step if (step.type === 'merge' && currentLevelSteps.length > 0) { // Don't add merge step to current level if there are already steps // It will be picked up in the next iteration continue; } currentLevelSteps.push(step); processedSteps.add(step.stepIndex); // If this is a merge step, don't add any more steps to this level if (step.type === 'merge') { break; } } } if (currentLevelSteps.length > 0) { // Add all produced fields from this level to available fields for (const step of currentLevelSteps) { step.produces.forEach((field) => availableFields.add(field)); } this.parallelGroups.push({ level: currentLevel, steps: currentLevelSteps, }); currentLevel++; } else { // No progress made - try to add steps that haven't been processed yet // This handles cases where dependencies might not be perfectly resolved const remainingSteps = this.steps.filter( (step) => !processedSteps.has(step.stepIndex) ); if (remainingSteps.length > 0) { // Add the first remaining step to make progress const nextStep = remainingSteps[0]; processedSteps.add(nextStep.stepIndex); // Add produced fields to available fields nextStep.produces.forEach((field) => availableFields.add(field)); this.parallelGroups.push({ level: currentLevel, steps: [nextStep], }); currentLevel++; } else { // No remaining steps, we're done break; } } } } /** * Gets all fields produced by previous steps. * * This is used by steps that depend on "everything produced so far" * such as map transformations and merge operations. * * @returns Array of all field names produced by previous steps */ private getAllProducedFields(): string[] { const fields: string[] = []; for (const step of this.steps) { fields.push(...step.produces); } return fields; } /** * Creates optimized execution functions that implement the parallel execution plan. * * This method converts the parallel groups into actual executable functions. * It creates a series of steps where: * - Single-step groups execute directly * - Multi-step groups execute in parallel with batch size control * - Results are properly merged to maintain state consistency * * The optimized execution can significantly improve performance for flows * with independent operations, especially I/O-bound operations like LLM calls. * * Performance benefits: * - Reduces total execution time for independent operations * - Maximizes CPU and I/O utilization * - Maintains correctness through dependency management * - Controls resource usage through batch size limiting * * @param batchSize - Maximum number of concurrent operations (optional) * @returns Array of optimized step functions ready for execution */ createOptimizedExecution(batchSize?: number): AxFlowStepFunction[] { const optimizedSteps: AxFlowStepFunction[] = []; for (const group of this.parallelGroups) { if (group.steps.length === 1) { // Single step - execute directly const step = group.steps[0]; if (step) { optimizedSteps.push(step.stepFunction); } } else if (group.steps.length > 1) { // Multiple steps - execute in parallel with batch size control const parallelStep: AxFlowStepFunction = async (state, context) => { const results = await processBatches( group.steps, async (step) => { return await step.stepFunction(state, context); }, batchSize ); // Check if any step produces _parallelResults (indicates this is a parallel flow) const hasParallelResults = results.some( (result) => result && typeof result === 'object' && '_parallelResults' in result ); if (hasParallelResults) { // Find the step that produced _parallelResults and return it directly const parallelResult = results.find( (result) => result && typeof result === 'object' && '_parallelResults' in result ); if (parallelResult) { // Return the parallel result directly - the merge step will handle cleanup return parallelResult; } return state; } // Merge all results for regular parallel execution let mergedState = state; for (const result of results) { mergedState = { ...mergedState, ...result }; } return mergedState; }; optimizedSteps.push(parallelStep); } } return optimizedSteps; } /** * Gets optimized execution steps for the flow. * * This method provides the optimized execution steps that can be used * to execute the flow with maximum parallelism while maintaining * dependency order. * * @returns Array of optimized step functions ready for execution */ getOptimizedExecutionSteps(): AxFlowStepFunction[] { // If parallel groups haven't been built yet, build them with empty initial fields if (this.parallelGroups.length === 0 && this.steps.length > 0) { this.rebuildParallelGroups(); } return this.createOptimizedExecution(); } /** * Gets detailed execution plan information for debugging and analysis. * * This method provides comprehensive information about the execution plan, * including step counts, parallel grouping details, and the complete * dependency structure. It's particularly useful for: * - Debugging execution flow issues * - Performance analysis and optimization * - Understanding parallelization effectiveness * - Monitoring execution plan complexity * * @returns Object containing detailed execution plan metrics and data */ getExecutionPlan(): { totalSteps: number; parallelGroups: number; maxParallelism: number; steps: AxFlowExecutionStep[]; groups: AxFlowParallelGroup[]; } { // If parallel groups haven't been built yet, build them with empty initial fields if (this.parallelGroups.length === 0 && this.steps.length > 0) { this.rebuildParallelGroups(); } return { totalSteps: this.steps.length, parallelGroups: this.parallelGroups.length, maxParallelism: this.steps.length === 0 ? 1 : Math.max(...this.parallelGroups.map((g) => g.steps.length), 0), steps: this.steps, groups: this.parallelGroups, }; } }