/* eslint-disable @typescript-eslint/no-explicit-any, functional/prefer-immutable-types */ import type { Meter, Tracer } from '@opentelemetry/api'; import { context, type Context as OtelContext, SpanKind, trace, } from '@opentelemetry/api'; import type { AxAIService } from '../ai/types.js'; import type { AxGen } from '../dsp/generate.js'; import type { AxOptimizedProgram } from '../dsp/optimizer.js'; import { AxProgram } from '../dsp/program.js'; import type { AxFieldType } from '../dsp/sig.js'; import { type AxField, AxSignature, f } from '../dsp/sig.js'; import { ax } from '../dsp/template.js'; import type { AxGenIn, AxGenOut, AxGenStreamingOut, AxMessage, AxProgramDemos, AxProgramExamples, AxProgramForwardOptions, AxProgramForwardOptionsWithModels, AxProgrammable, AxProgramStreamingForwardOptionsWithModels, AxProgramTrace, AxProgramUsage, AxSetExamplesOptions, } from '../dsp/types.js'; import { mergeProgramUsage } from '../dsp/util.js'; import { processBatches } from './batchUtil.js'; import { AxFlowExecutionPlanner } from './executionPlanner.js'; import { type AxFlowLoggerFunction, axCreateFlowColorLogger, createTimingLogger, } from './logger.js'; import { AxFlowSubContextImpl } from './subContext.js'; import type { AddNodeResult, AxFlowAutoParallelConfig, AxFlowable, AxFlowBranchContext, AxFlowDynamicContext, AxFlowExecutionStep, AxFlowNodeDefinition, AxFlowParallelBranch, AxFlowParallelGroup, AxFlowState, AxFlowStepFunction, AxFlowSubContext, AxFlowTypedParallelBranch, AxFlowTypedSubContext, GetGenIn, GetGenOut, InferAxGen, } from './types.js'; /** * AxFlow - A fluent, chainable API for building and orchestrating complex, stateful AI programs. * * Now with advanced type-safe chaining where each method call evolves the type information, * providing compile-time type safety and superior IntelliSense. * * @example * ``` * const flow = new AxFlow<{ topic: string }, { finalAnswer: string }>() * .node('summarizer', 'text:string -> summary:string') * .node('critic', 'summary:string -> critique:string') * .execute('summarizer', state => ({ text: `About ${state.topic}` })) // state is { topic: string } * .execute('critic', state => ({ summary: state.summarizerResult.summary })) // state evolves! * .map(state => ({ finalAnswer: state.criticResult.critique })) // fully typed! * * const result = await flow.forward(ai, { topic: "AI safety" }) * ``` */ export class AxFlow< IN extends Record, OUT, // NOTE: The `any` here is necessary because TNodes must accommodate AxProgrammable instances with various input/output types TNodes extends Record> = Record< string, never >, // Node registry for type tracking TState extends AxFlowState = IN, // Current evolving state type > implements AxFlowable { private static _ctorWarned = false; private readonly nodes: Map = new Map(); private readonly flowDefinition: AxFlowStepFunction[] = []; private readonly nodeGenerators: Map< string, AxProgrammable > = new Map(); private readonly loopStack: number[] = []; private readonly stepLabels: Map = new Map(); private branchContext: AxFlowBranchContext | null = null; // Automatic parallelization components private readonly autoParallelConfig: AxFlowAutoParallelConfig; private readonly executionPlanner = new AxFlowExecutionPlanner(); // Program field that gets initialized when something is added to the graph private program?: AxProgram; // Node-level usage tracking private nodeUsage: Map = new Map(); // Node-level trace tracking private nodeTraces: Map[]> = new Map(); // Verbose logging support private readonly flowLogger?: AxFlowLoggerFunction; private readonly timingLogger?: ReturnType; // Default AI options to propagate to node forwards (e.g., tracer, meter) private readonly defaultAIOptions?: Readonly<{ tracer?: Tracer; meter?: Meter; }>; /** * Converts a string to camelCase for valid field names */ private toCamelCase(str: string): string { return str.replace(/_([a-z])/g, (_, letter) => letter.toUpperCase()); } /** * Executes a list of steps with comprehensive logging */ private async executeStepsWithLogging( steps: AxFlowStepFunction[], initialState: AxFlowState, context: Readonly<{ mainAi: AxAIService; mainOptions?: AxProgramForwardOptions; }>, _isOptimized: boolean ): Promise<{ finalState: AxFlowState; stepsExecuted: number }> { let state = { ...initialState }; let stepsExecuted = 0; for (let i = 0; i < steps.length; i++) { const step = steps[i]; if (!step) continue; // Determine step type and metadata for logging const stepType = this.getStepType(step, i); const stepMetadata = this.getStepMetadata(step, i); const previousFields = Object.keys(state); // Log step start if (this.flowLogger) { this.flowLogger({ name: 'StepStart', timestamp: Date.now(), stepIndex: i, stepType: stepType, nodeName: stepMetadata.nodeName, dependencies: stepMetadata.dependencies, produces: stepMetadata.produces, state: { ...state }, }); } // Execute step with timing const stepStartTime = Date.now(); this.timingLogger?.startTiming(`step-${i}`); try { const result = await step(state, context); state = result; stepsExecuted++; // Calculate execution time const executionTime = this.timingLogger?.endTiming(`step-${i}`) ?? Date.now() - stepStartTime; // Identify new fields const currentFields = Object.keys(state); const newFields = currentFields.filter( (field) => !previousFields.includes(field) ); // Extract result for execute steps with node names let nodeResult: any; if ( stepType === 'execute' && stepMetadata.nodeName && newFields.length > 0 ) { // For execute steps, try to find the result field that matches the node name const resultFieldName = `${stepMetadata.nodeName}Result`; nodeResult = state[resultFieldName]; } // Log step completion if (this.flowLogger) { this.flowLogger({ name: 'StepComplete', timestamp: Date.now(), stepIndex: i, stepType: stepType, nodeName: stepMetadata.nodeName, executionTime, state: { ...state }, newFields, result: nodeResult, }); } } catch (error) { // Log step error if (this.flowLogger) { this.flowLogger({ name: 'FlowError', timestamp: Date.now(), error: error instanceof Error ? error.message : String(error), stepIndex: i, stepType: stepType, nodeName: stepMetadata.nodeName, state: { ...state }, }); } throw error; } } return { finalState: state, stepsExecuted }; } /** * Determines the type of a step function for logging purposes */ private getStepType( step: AxFlowStepFunction, _index: number ): | 'execute' | 'map' | 'merge' | 'parallel-map' | 'parallel' | 'derive' | 'branch' | 'feedback' | 'while' | 'other' { const source = step.toString(); // Check for execute steps (contain nodeName references) if (source.includes('nodeName') || source.includes('nodeProgram')) { return 'execute'; } // Check for parallel operations if ( source.includes('_parallelResults') || source.includes('processBatches') ) { return 'parallel'; } // Check for merge operations if ( source.includes('branchValue') || source.includes('branches.get') || source.includes('mergeFunction') ) { return 'merge'; } // Check for map operations if (source.includes('transform(') || source.includes('...state,')) { return 'map'; } // Check for derive operations if (source.includes('inputValue') && source.includes('transformFn')) { return 'derive'; } // Check for control flow operations if (source.includes('condition(') && source.includes('iterations')) { if (source.includes('while')) { return 'while'; } return 'feedback'; } // Check for branch operations if (source.includes('branchSteps') || source.includes('currentState')) { return 'branch'; } return 'other'; } /** * Gets metadata about a step for logging purposes */ private getStepMetadata( step: AxFlowStepFunction, index: number ): { nodeName?: string; dependencies: string[]; produces: string[]; } { // Try to get metadata from execution planner if available const executionPlan = this.executionPlanner.getExecutionPlan(); const stepInfo = executionPlan.steps.find((s) => s.stepIndex === index); if (stepInfo) { return { nodeName: stepInfo.nodeName, dependencies: stepInfo.dependencies, produces: stepInfo.produces, }; } // Fallback to source analysis const source = step.toString(); const nodeName = this.extractNodeNameFromSource(source); return { nodeName, dependencies: [], produces: [], }; } /** * Extracts node name from step function source code */ private extractNodeNameFromSource(source: string): string | undefined { // Look for patterns like 'nodeName' variable references const nodeNameMatch = source.match(/nodeName['"]?\s*[=:]\s*['"](\w+)['"]/); if (nodeNameMatch) { return nodeNameMatch[1]; } // Look for patterns in node execution const nodeExecMatch = source.match(/nodeProgram\.get\(['"](\w+)['"]\)/); if (nodeExecMatch) { return nodeExecMatch[1]; } return undefined; } /** * Infers the signature of the flow based on the execution plan and node definitions. * This is the core method that determines what input/output fields the flow should have * based on the nodes and operations defined in the flow. * * The inference process follows these steps: * 1. If no nodes are defined, return a default signature * 2. Analyze the execution plan to find all produced and consumed fields * 3. Determine input fields (consumed but not produced by any step) * 4. Determine output fields with special handling for final map/merge operations * 5. If no clear pattern is found, create a comprehensive signature from all nodes * * Special handling for final operations: * - Map operations: Use the fields produced by the map transformation * - Merge operations: Use fields from the merged branches or merge result * - Conditional merges: Analyze what fields the branches actually produce * * @returns AxSignature - The inferred signature for this flow */ private inferSignatureFromFlow(): AxSignature { // Get execution plan to identify dependencies and field flow const executionPlan = this.executionPlanner.getExecutionPlan(); // If no nodes are defined AND no execution steps, return a default signature if (this.nodeGenerators.size === 0 && executionPlan.steps.length === 0) { // Create a default signature for flows without nodes or steps return f() .input('userInput', f.string('User input to the flow')) .output('flowOutput', f.string('Output from the flow')) .build(); } // This gives us a structured view of what each step consumes and produces const allProducedFields = new Set(); const allConsumedFields = new Set(); // Collect all produced and consumed fields from the execution plan // This helps us understand the data flow through the entire workflow for (const step of executionPlan.steps) { step.produces.forEach((field) => allProducedFields.add(field)); step.dependencies.forEach((field) => allConsumedFields.add(field)); } // Find input fields (consumed but not produced by any step) // These are fields that the flow needs from external input const inputFieldNames = new Set(); for (const consumed of Array.from(allConsumedFields)) { if (!allProducedFields.has(consumed)) { inputFieldNames.add(consumed); } } // Find output fields (produced but not consumed by subsequent steps) // These are the final results that the flow produces const outputFieldNames = new Set(); // Special handling for final map/merge operations // When a flow ends with a transformation or merge, we want to use those results // as the output rather than intermediate node results // Note: For derive operations, use standard logic to handle multiple derives properly const lastStep = executionPlan.steps[executionPlan.steps.length - 1]; if (lastStep && (lastStep.type === 'map' || lastStep.type === 'merge')) { // If the last step is a map/merge, use its produced fields as outputs lastStep.produces.forEach((field) => { // Skip internal fields like _mapResult, _mergedResult if (!field.startsWith('_')) { outputFieldNames.add(field); } }); // For conditional merges that produce _mergedResult, // use all fields from previous steps as potential outputs // This handles cases where the merge doesn't transform the data // but just selects which branch's results to use if ( lastStep.type === 'merge' && lastStep.produces.includes('_mergedResult') ) { // Find all node result fields from previous steps for (const step of executionPlan.steps) { if (step.type === 'execute' && step.produces.length > 0) { step.produces.forEach((field) => outputFieldNames.add(field)); } } } } else { // Standard logic: fields produced but not consumed by subsequent steps // This finds the "leaf" fields that aren't used by any other step for (const produced of Array.from(allProducedFields)) { // Check if this field is consumed by any step let isConsumed = false; for (const step of executionPlan.steps) { if (step.dependencies.includes(produced)) { isConsumed = true; break; } } if (!isConsumed) { // If this is a node result field (ends with "Result"), extract the actual output field names if (produced.endsWith('Result')) { const nodeName = produced.replace('Result', ''); const nodeGen = this.nodeGenerators.get(nodeName); if (nodeGen) { const sig = nodeGen.getSignature(); const outputFields = sig.getOutputFields(); // Add the actual output field names from the node signature for (const field of outputFields) { outputFieldNames.add(field.name); } } else { // Fallback to the original field name if node not found outputFieldNames.add(produced); } } else { outputFieldNames.add(produced); } } } } // If no clear input/output pattern, create a comprehensive signature // This is a fallback that includes all possible fields from all nodes // It's used when the execution plan analysis doesn't give clear results if (inputFieldNames.size === 0 && outputFieldNames.size === 0) { // Extract fields from node signatures const inputFields: AxField[] = []; const outputFields: AxField[] = []; // Go through each node and extract its input/output fields for (const [nodeName, nodeGen] of Array.from(this.nodeGenerators)) { const sig = nodeGen.getSignature(); // Add node's input fields as potential flow inputs // These are prefixed with the node name to avoid conflicts for (const field of sig.getInputFields()) { // Convert to camelCase to avoid validation issues const camelCaseName = this.toCamelCase(`${nodeName}_${field.name}`); inputFields.push({ name: camelCaseName, type: field.type, description: field.description, isOptional: field.isOptional, isInternal: field.isInternal, }); } // Add node's output fields as potential flow outputs // These are also prefixed with the node name for (const field of sig.getOutputFields()) { // Convert to camelCase to avoid validation issues const camelCaseName = this.toCamelCase(`${nodeName}_${field.name}`); outputFields.push({ name: camelCaseName, type: field.type, description: field.description, isOptional: field.isOptional, isInternal: field.isInternal, }); } } // Create signature from collected fields const inferredSignature = new AxSignature(); // Add input fields or default if (inputFields.length > 0) { inferredSignature.setInputFields(inputFields); } else { inferredSignature.addInputField({ name: 'userInput', type: { name: 'string' }, description: 'User input to the flow', }); } // Add output fields or default if (outputFields.length > 0) { inferredSignature.setOutputFields(outputFields); } else { inferredSignature.addOutputField({ name: 'flowOutput', type: { name: 'string' }, description: 'Output from the flow', }); } return inferredSignature; } // Build signature from identified input/output fields // This is the main path when we have clear input/output patterns const inferredSignature = new AxSignature(); // Add input fields const inputFields: AxField[] = []; for (const fieldName of Array.from(inputFieldNames)) { inputFields.push({ name: fieldName, type: { name: 'string' }, description: `Input field: ${fieldName}`, }); } // Add default input if none found if (inputFields.length === 0) { inputFields.push({ name: 'userInput', type: { name: 'string' }, description: 'User input to the flow', }); } // Add output fields const outputFields: AxField[] = []; for (const fieldName of Array.from(outputFieldNames)) { // Skip internal fields that start with underscore if (fieldName.startsWith('_')) { continue; } outputFields.push({ name: fieldName, type: { name: 'string' }, description: `Output field: ${fieldName}`, }); } // Add default output if none found if (outputFields.length === 0) { outputFields.push({ name: 'flowOutput', type: { name: 'string' }, description: 'Output from the flow', }); } inferredSignature.setInputFields(inputFields); inferredSignature.setOutputFields(outputFields); return inferredSignature; } constructor(options?: { autoParallel?: boolean; batchSize?: number; logger?: AxFlowLoggerFunction; debug?: boolean; tracer?: Tracer; meter?: Meter; }) { if (!AxFlow._ctorWarned) { // eslint-disable-next-line no-console console.warn( '[AxFlow] new AxFlow() is deprecated. Use flow() factory instead.' ); AxFlow._ctorWarned = true; } // Initialize configuration with defaults this.autoParallelConfig = { enabled: options?.autoParallel !== false, // Default to true batchSize: options?.batchSize || 10, // Default batch size of 10 }; // Initialize logging based on options if (options?.logger) { // Explicit logger provided this.flowLogger = options.logger; } else if (options?.debug === true) { // Debug mode enabled - use default color logger this.flowLogger = axCreateFlowColorLogger(); } else { // No logging this.flowLogger = undefined; } this.timingLogger = this.flowLogger ? createTimingLogger(this.flowLogger) : undefined; // Capture default AI options to propagate to node executions if (options?.tracer || options?.meter) { this.defaultAIOptions = { tracer: options.tracer, meter: options.meter, }; } } /** * Static factory method to create a new AxFlow instance with proper type safety * @param options - Optional configuration for the flow * @returns New AxFlow instance with type-safe defaults */ public static create< IN extends Record = Record, OUT = {}, TNodes extends Record> = Record< string, never >, TState extends AxFlowState = IN, >(options?: { autoParallel?: boolean; batchSize?: number; logger?: AxFlowLoggerFunction; debug?: boolean; }): AxFlow { return new AxFlow(options); } /** * Initializes the program field every time something is added to the graph */ private ensureProgram(): void { const signature = this.inferSignatureFromFlow(); if (!this.program) { this.program = new AxProgram(signature); for (const [_nodeName, nodeProgram] of Array.from(this.nodeGenerators)) { this.program.register(nodeProgram as any); } return; } this.program.setSignature(signature); } public setExamples( examples: Readonly>, options?: Readonly ): void { this.ensureProgram(); this.program!.setExamples(examples, options); } public setId(id: string): void { this.ensureProgram(); this.program!.setId(id); } public setParentId(parentId: string): void { this.ensureProgram(); this.program!.setParentId(parentId); } public getTraces(): AxProgramTrace[] { // Collect traces from all nodes const allTraces: AxProgramTrace[] = []; for (const [_nodeName, nodeTraces] of Array.from(this.nodeTraces)) { // Cast the traces to the expected type since they should be compatible allTraces.push(...(nodeTraces as AxProgramTrace[])); } return allTraces; } public setDemos(demos: readonly AxProgramDemos[]): void { this.ensureProgram(); this.program!.setDemos(demos); } public getUsage(): AxProgramUsage[] { // Collect usage from all nodes and merge const allUsage: AxProgramUsage[] = []; for (const [_nodeName, nodeUsage] of Array.from(this.nodeUsage)) { allUsage.push(...nodeUsage); } return mergeProgramUsage(allUsage); } public resetUsage(): void { // Clear node-level usage tracking this.nodeUsage.clear(); // Also reset usage on all node generators for (const [_nodeName, nodeProgram] of Array.from(this.nodeGenerators)) { if (nodeProgram && 'resetUsage' in nodeProgram) { nodeProgram.resetUsage(); } } } /** * Resets trace tracking for the flow. * This is called automatically on each forward/streamingForward call. */ public resetTraces(): void { // Clear node-level trace tracking this.nodeTraces.clear(); // Note: Individual node programs don't have resetTraces method, // so we only clear the flow-level trace collection } /** * Gets a detailed usage report broken down by node name. * This provides visibility into which nodes are consuming the most tokens. * * @returns Object mapping node names to their usage statistics */ public getUsageReport(): Record { const report: Record = {}; for (const [nodeName, nodeUsage] of Array.from(this.nodeUsage)) { report[nodeName] = mergeProgramUsage(nodeUsage); } return report; } /** * Expose node programs for system-level operations (optimization, inspection) */ public getNodePrograms(): ReadonlyArray<{ name: string; program: AxProgrammable; }> { return Array.from(this.nodeGenerators).map(([name, program]) => ({ name, program, })); } /** * Attempt to set instruction on a node if supported (AxGen. * setInstruction is optional; returns true if applied) */ public setNodeInstruction(name: string, instruction: string): boolean { const prog = this.nodeGenerators.get(name); if (!prog) return false; const anyProg = prog as any; if (typeof anyProg.setInstruction === 'function') { try { anyProg.setInstruction(instruction); return true; } catch { return false; } } return false; } /** * Bulk-apply instructions to nodes; ignores names that don’t exist or nodes without instruction setter */ public setAllNodeInstructions(map: Readonly>): void { for (const [name, instr] of Object.entries(map)) { this.setNodeInstruction(name, instr); } } /** * Gets a detailed trace report broken down by node name. * This provides visibility into the execution traces for each node. * * @returns Object mapping node names to their trace data */ public getTracesReport(): Record[]> { const report: Record[]> = {}; for (const [nodeName, nodeTraces] of Array.from(this.nodeTraces)) { report[nodeName] = nodeTraces; } return report; } public async *streamingForward>( ai: T, values: IN | AxMessage[], options?: Readonly> ): AxGenStreamingOut { // For now, we'll implement streaming by converting the regular forward result // This is a simplified implementation - full streaming would require more work // Note: forward() will handle the resetUsage() call const result = await this.forward(ai, values, options); // Yield the final result with correct AxGenDeltaOut structure yield { version: 1, index: 0, delta: result, }; } /** * Executes the flow with the given AI service and input values. * * This is the main execution method that orchestrates the entire flow execution. * It handles several complex aspects: * * 1. **Dynamic Signature Inference**: If the flow was created with a default signature * but has nodes defined, it will infer the actual signature from the flow structure. * * 2. **Execution Mode Selection**: Chooses between optimized parallel execution * (when auto-parallel is enabled) or sequential execution based on configuration. * * 3. **State Management**: Maintains the evolving state object as it flows through * each step, accumulating results and transformations. * * 4. **Performance Optimization**: Uses the execution planner to identify * independent operations that can run in parallel, reducing total execution time. * * Execution Flow: * - Initialize state with input values * - Infer signature if needed (based on nodes and current signature) * - Choose execution strategy (parallel vs sequential) * - Execute all steps while maintaining state consistency * - Return final state cast to expected output type * * @param ai - The AI service to use as the default for all steps * @param values - The input values for the flow * @param options - Optional forward options to use as defaults (includes autoParallel override) * @returns Promise that resolves to the final output */ public async forward>( ai: T, values: IN | AxMessage[], options?: Readonly< AxProgramForwardOptionsWithModels & { autoParallel?: boolean } > ): Promise { // Start flow timing const flowStartTime = Date.now(); this.timingLogger?.startTiming('flow-execution'); // Initialize state early so it's accessible in catch block let state: AxFlowState = {}; try { // Reset usage and trace tracking at the start of each forward call this.resetUsage(); this.resetTraces(); // Extract values from input - handle both IN and AxMessage[] cases let inputValues: IN; if (Array.isArray(values)) { // If values is an array of messages, find the most recent user message const lastUserMessage = values .filter((msg) => msg.role === 'user') .pop(); if (!lastUserMessage) { throw new Error('No user message found in values array'); } inputValues = lastUserMessage.values; } else { // If values is a single IN object inputValues = values; } // Dynamic signature inference - only if using default signature and have nodes // This allows flows to be created with a simple signature and then have it // automatically refined based on the actual nodes and operations defined if (this.nodeGenerators.size > 0) { // Initialize program with inferred signature this.ensureProgram(); } // Initialize state with input values // This creates the initial state object that will flow through all steps state = { ...inputValues }; // Log flow start if (this.flowLogger) { const executionPlan = this.getExecutionPlan(); this.flowLogger({ name: 'FlowStart', timestamp: flowStartTime, inputFields: Object.keys(inputValues), totalSteps: executionPlan.totalSteps, parallelGroups: executionPlan.parallelGroups, maxParallelism: executionPlan.maxParallelism, autoParallelEnabled: executionPlan.autoParallelEnabled, }); } // Determine tracer and create a parent span/context if available const tracer: Tracer | undefined = (options as any)?.tracer ?? this.defaultAIOptions?.tracer; const providedCtx: OtelContext | undefined = (options as any) ?.traceContext; let parentSpan: | ReturnType['startSpan']> | undefined; let parentCtx: OtelContext | undefined = providedCtx; if (tracer) { const execPlan = this.getExecutionPlan(); const spanName = (options as any)?.traceLabel ? `AxFlow > ${(options as any).traceLabel}` : 'AxFlow'; parentSpan = tracer.startSpan(spanName, { kind: SpanKind.INTERNAL, attributes: { total_steps: execPlan.totalSteps, parallel_groups: execPlan.parallelGroups, max_parallelism: execPlan.maxParallelism, auto_parallel_enabled: execPlan.autoParallelEnabled, }, }); const baseCtx = providedCtx ?? context.active(); parentCtx = trace.setSpan(baseCtx, parentSpan); } // Create execution context object // This provides consistent access to AI service and options for all steps const execContext = { mainAi: ai, mainOptions: ((): AxProgramForwardOptions | undefined => { const merged: AxProgramForwardOptions = { ...(this.defaultAIOptions ?? {}), ...(options as any), } as AxProgramForwardOptions; if ((options as any)?.model) merged.model = String((options as any).model); if (tracer) merged.tracer = tracer; if (parentCtx) (merged as any).traceContext = parentCtx; // If nothing to merge and no defaults, return undefined return Object.keys(merged).length > 0 ? merged : undefined; })(), } as const; // Determine execution strategy based on configuration // Auto-parallel can be disabled globally or overridden per execution const useAutoParallel = options?.autoParallel !== false && this.autoParallelConfig.enabled; let stepsExecuted = 0; if (useAutoParallel) { // OPTIMIZED PARALLEL EXECUTION PATH // This path uses the execution planner to identify independent operations // and execute them in parallel for better performance // Set initial fields for dependency analysis // This tells the planner what fields are available at the start this.executionPlanner.setInitialFields(Object.keys(inputValues)); // Get optimized execution plan with parallel groups and batch size control const optimizedSteps = this.executionPlanner.createOptimizedExecution( this.autoParallelConfig.batchSize ); // Execute optimized steps with logging const result = await this.executeStepsWithLogging( optimizedSteps, state, execContext, true ); state = result.finalState; stepsExecuted = result.stepsExecuted; } else { // SEQUENTIAL EXECUTION PATH // This path executes all steps in the order they were defined // It's simpler but potentially slower for independent operations // Execute all steps sequentially with logging const result = await this.executeStepsWithLogging( this.flowDefinition, state, execContext, false ); state = result.finalState; stepsExecuted = result.stepsExecuted; } // Log flow completion if (this.flowLogger) { const totalExecutionTime = this.timingLogger?.endTiming('flow-execution') ?? Date.now() - flowStartTime; this.flowLogger({ name: 'FlowComplete', timestamp: Date.now(), totalExecutionTime, finalState: state, outputFields: Object.keys(state), stepsExecuted, }); } // End parent span if created if (parentSpan) parentSpan.end(); // Return the final state cast to the expected output type // The type system ensures this is safe based on the signature inference return state as any; } catch (error) { // Log flow error if (this.flowLogger) { this.flowLogger({ name: 'FlowError', timestamp: Date.now(), error: error instanceof Error ? error.message : String(error), state, }); } // End parent span on error (if created) // @ts-expect-error runtime-scoped variable defined above if (typeof parentSpan !== 'undefined' && parentSpan) parentSpan.end(); throw error; } } /** * Declares a reusable computational node using a signature string. * Returns a new AxFlow type that tracks this node in the TNodes registry. * * @param name - The name of the node * @param signature - Signature string in the same format as AxSignature * @returns New AxFlow instance with updated TNodes type * * @example * ``` * flow.node('summarizer', 'text:string -> summary:string') * flow.node('analyzer', 'text:string -> analysis:string, confidence:number', { debug: true }) * ``` */ public node( name: TName, signature: TSig ): AxFlow< IN, OUT, TNodes & { [K in TName]: InferAxGen }, // Add new node to registry TState // State unchanged >; /** * Declares a reusable computational node using an AxSignature instance. * This allows using pre-configured signatures in the flow. * * @param name - The name of the node * @param signature - AxSignature instance to use for this node * @returns New AxFlow instance with updated TNodes type * * @example * ``` * const sig = s('text:string -> summary:string') * flow.node('summarizer', sig, { temperature: 0.1 }) * ``` */ public node( name: TName, signature: AxSignature ): AxFlow< IN, OUT, TNodes & { [K in TName]: AxGen }, // Add new node to registry TState // State unchanged >; /** * Declares a reusable computational node using a class that extends AxProgram. * This allows using custom program classes in the flow. * * @param name - The name of the node * @param programClass - Class that extends AxProgram to use for this node * @returns New AxFlow instance with updated TNodes type * * @example * ``` * class CustomProgram extends AxProgram<{ input: string }, { output: string }> { * async forward(ai, values) { return { output: values.input.toUpperCase() } } * } * flow.node('custom', CustomProgram) * ``` */ public node< TName extends string, TProgram extends new () => AxProgrammable, >( name: TName, programClass: TProgram ): AxFlow< IN, OUT, TNodes & { [K in TName]: InstanceType }, // Add new node to registry with exact type TState // State unchanged >; /** * Declares a reusable computational node using an AxProgrammable instance. * This allows using pre-configured AxGen instances or other programmable objects in the flow. * * @param name - The name of the node * @param programInstance - The AxProgrammable instance to use for this node * @returns New AxFlow instance with updated TNodes type */ public node>( name: TName, programInstance: TProgram ): AxFlow< IN, OUT, TNodes & { [K in TName]: TProgram }, // Add new node to registry with exact type TState // State unchanged >; // Implementation public node( name: TName, nodeValue: | string | AxSignature | AxProgrammable | (new () => AxProgrammable) ): AxFlow< IN, OUT, TNodes & { [K in TName]: any }, // Using any here as the implementation handles all cases TState > { if (typeof nodeValue === 'string' || nodeValue instanceof AxSignature) { // Using signature string (original behavior) const signature = nodeValue; // Validate that signature is provided if (!signature) { throw new Error( `Invalid signature for node '${name}': signature cannot be empty` ); } // Store node definition (simplified since we're using standard signatures) this.nodes.set(name, { inputs: {}, outputs: {}, }); // Create and store the AxGen instance for this node with the same arguments as AxGen const nodeGenerator = ax(signature as string); this.nodeGenerators.set(name, nodeGenerator); // Register the node with the program after program is initialized this.ensureProgram(); this.program!.register(nodeGenerator as any); } else if (typeof nodeValue === 'function') { // Using program class this.nodes.set(name, { inputs: {}, outputs: {}, }); // Create an instance of the program class and store it directly const programInstance = new nodeValue() as AxProgrammable; this.nodeGenerators.set(name, programInstance); // Register the node with the program after program is initialized this.ensureProgram(); this.program!.register(programInstance as any); } else if ( nodeValue && typeof nodeValue === 'object' && 'forward' in nodeValue ) { // Using existing AxGen instance or AxProgrammable instance this.nodes.set(name, { inputs: {}, outputs: {}, }); // Store the existing AxGen instance const nodeGenerator = nodeValue as AxProgrammable; this.nodeGenerators.set(name, nodeGenerator); // Register the node with the program after program is initialized this.ensureProgram(); this.program!.register(nodeGenerator as any); } else { // Invalid argument type throw new Error( `Invalid second argument for node '${name}': expected string, AxSignature, AxProgrammable instance, or constructor function` ); } // NOTE: This type assertion is necessary for the type-level programming pattern // The runtime value is the same object, but TypeScript can't track the evolving generic types return this as any; } /** * Short alias for node() - supports signature strings, AxSignature instances, AxGen instances, and program classes */ public n( name: TName, signature: TSig ): AxFlow }, TState>; public n( name: TName, signature: AxSignature ): AxFlow< IN, OUT, TNodes & { [K in TName]: AxGen }, TState >; public n< TName extends string, TProgram extends new () => AxProgrammable, >( name: TName, programClass: TProgram ): AxFlow }, TState>; public n>( name: TName, programInstance: TProgram ): AxFlow; public n( name: TName, signatureOrAxGenOrClass: | string | AxSignature | AxProgrammable | (new () => AxProgrammable) ): any { return this.node(name, signatureOrAxGenOrClass as any); } /** * Applies a synchronous transformation to the state object. * Returns a new AxFlow type with the evolved state. * * @param transform - Function that takes the current state and returns a new state * @returns New AxFlow instance with updated TState type * * @example * ``` * flow.map(state => ({ ...state, processedText: state.text.toLowerCase() })) * ``` */ public map( transform: (_state: TState) => TNewState ): AxFlow; /** * Applies an asynchronous transformation to the state object. * Returns a new AxFlow type with the evolved state. * * @param transform - Async function that takes the current state and returns a promise of new state * @returns New AxFlow instance with updated TState type * * @example * ``` * flow.map(async state => ({ * ...state, * apiResult: await fetchDataFromAPI(state.query) * })) * ``` */ public map( transform: (_state: TState) => Promise ): AxFlow; /** * Applies a transformation to the state object with optional parallel execution. * When parallel is enabled, the transform function should prepare data for parallel processing. * The actual parallel processing happens with the array of transforms provided. * * @param transforms - Array of transformation functions to apply in parallel * @param options - Options including parallel execution configuration * @returns New AxFlow instance with updated TState type * * @example * ``` * // Parallel map with multiple transforms * flow.map([ * state => ({ ...state, result1: processA(state.data) }), * state => ({ ...state, result2: processB(state.data) }), * state => ({ ...state, result3: processC(state.data) }) * ], { parallel: true }) * ``` */ public map( transforms: Array<(_state: TState) => TNewState>, options: { parallel: true } ): AxFlow; /** * Applies async transformations to the state object with optional parallel execution. * When parallel is enabled, all async transforms are executed concurrently. * * @param transforms - Array of async transformation functions to apply in parallel * @param options - Options including parallel execution configuration * @returns New AxFlow instance with updated TState type * * @example * ``` * // Parallel async map with multiple transforms * flow.map([ * async state => ({ ...state, result1: await apiCall1(state.data) }), * async state => ({ ...state, result2: await apiCall2(state.data) }), * async state => ({ ...state, result3: await apiCall3(state.data) }) * ], { parallel: true }) * ``` */ public map( transforms: Array<(_state: TState) => Promise>, options: { parallel: true } ): AxFlow; public map( transform: (_state: TState) => TNewState | Promise, options?: { parallel?: boolean } ): AxFlow; public map( transformOrTransforms: any, options?: { parallel?: boolean } ): AxFlow { // Check if parallel processing is requested if (options?.parallel) { // Determine if we have an array of transforms or a single transform const transforms = Array.isArray(transformOrTransforms) ? transformOrTransforms : [transformOrTransforms]; const parallelMapStep = async (state: AxFlowState) => { const orderedResults = await processBatches( transforms, async (transform, _index) => { const result = transform(state as TState); return Promise.resolve(result); }, this.autoParallelConfig.batchSize ); const merged = orderedResults.reduce( (acc, res) => ({ ...acc, ...res }), state ); return merged; }; // Add the parallel step to the flow if (this.branchContext?.currentBranchValue !== undefined) { const currentBranch = this.branchContext.branches.get( this.branchContext.currentBranchValue ) || []; currentBranch.push(parallelMapStep); this.branchContext.branches.set( this.branchContext.currentBranchValue, currentBranch ); } else { this.flowDefinition.push(parallelMapStep); if (this.autoParallelConfig.enabled) { this.executionPlanner.addExecutionStep( parallelMapStep, undefined, undefined, 'parallel-map', transforms as any ); } } } else { // Regular map operation (supports both sync and async) const step = async (state: AxFlowState) => { // For non-parallel mode, only single transforms are supported if (Array.isArray(transformOrTransforms)) { throw new Error('Array of transforms requires parallel: true option'); } // Handle both sync and async transforms const result = transformOrTransforms(state as TState); // If the result is a promise, await it; otherwise return it directly return Promise.resolve(result); }; if (this.branchContext?.currentBranchValue !== undefined) { const currentBranch = this.branchContext.branches.get( this.branchContext.currentBranchValue ) || []; currentBranch.push(step); this.branchContext.branches.set( this.branchContext.currentBranchValue, currentBranch ); } else { this.flowDefinition.push(step); // Add to execution planner for automatic parallelization if (this.autoParallelConfig.enabled) { this.executionPlanner.addExecutionStep( step, undefined, undefined, 'map', transformOrTransforms ); } } } // Initialize program when flow structure is updated (only if we have nodes) if (this.nodeGenerators.size > 0) { this.ensureProgram(); } // NOTE: This type assertion is necessary for the type-level programming pattern return this as unknown as AxFlow; } /** * Short alias for map() - supports parallel option and async functions */ public m( transform: (_state: TState) => TNewState ): AxFlow; public m( transform: (_state: TState) => Promise ): AxFlow; public m( transforms: Array<(_state: TState) => TNewState>, options: { parallel: true } ): AxFlow; public m( transforms: Array<(_state: TState) => Promise>, options: { parallel: true } ): AxFlow; public m( transformOrTransforms: | ((_state: TState) => TNewState | Promise) | Array<(_state: TState) => TNewState | Promise>, options?: { parallel?: boolean } ): AxFlow { return this.map(transformOrTransforms as any, options); } /** * Terminal transformation that sets the final output type of the flow. * Use this as the last transformation to get proper type inference for the flow result. * * @param transform - Function that transforms the current state to the final output * @returns A new flow with the output type set to the transform result * * @example * ```typescript * const flow = flow<{ input: string }>() * .map(state => ({ ...state, processed: true })) * .returns(state => ({ * result: state.processed ? "done" : "pending" * })) // TypeScript now knows the output is { result: string } * ``` */ public returns>( transform: (_state: TState) => TNewOut ): AxFlow { // Add the transformation to the flow definition // Note: We need to ensure the result extends AxFlowState (Record) const step: AxFlowStepFunction = async (state: AxFlowState) => { const result = transform(state as TState); return Promise.resolve(result as AxFlowState); }; if (this.branchContext?.currentBranchValue !== undefined) { const currentBranch = this.branchContext.branches.get( this.branchContext.currentBranchValue ) || []; currentBranch.push(step); this.branchContext.branches.set( this.branchContext.currentBranchValue, currentBranch ); } else { this.flowDefinition.push(step); // Add to execution planner for automatic parallelization if (this.autoParallelConfig.enabled) { this.executionPlanner.addExecutionStep( step, undefined, undefined, 'map', // Treat returns as a special map operation transform ); } } // Initialize program when flow structure is updated (only if we have nodes) if (this.nodeGenerators.size > 0) { this.ensureProgram(); } // Return a new flow with the updated output type return this as unknown as AxFlow; } /** * Short alias for returns() - r() is to returns() as m() is to map() * * @param transform - Function that transforms the current state to the final output * @returns A new flow with the output type set to the transform result */ public r>( transform: (_state: TState) => TNewOut ): AxFlow { return this.returns(transform); } /** * Labels a step for later reference (useful for feedback loops). * * @param label - The label to assign to the current step position * @returns this (for chaining, no type change) * * @example * ```typescript * flow.label('retry-point') * .execute('queryGen', ...) * ``` */ public label(label: string): this { if (this.branchContext?.currentBranchValue !== undefined) { throw new Error('Cannot create labels inside branch blocks'); } this.stepLabels.set(label, this.flowDefinition.length); return this; } /** * Short alias for label() */ public l(label: string): this { return this.label(label); } /** * Executes a previously defined node with full type safety. * The node name must exist in TNodes, and the mapping function is typed based on the node's signature. * * @param nodeName - The name of the node to execute (must exist in TNodes) * @param mapping - Typed function that takes the current state and returns the input for the node * @param dynamicContext - Optional object to override the AI service or options for this specific step * @returns New AxFlow instance with TState augmented with the node's result * * @example * ```typescript * flow.execute('summarizer', state => ({ text: state.originalText }), { ai: cheapAI }) * ``` */ public execute< TNodeName extends keyof TNodes & string, TAI extends Readonly, >( nodeName: TNodeName, mapping: (_state: TState) => GetGenIn, dynamicContext?: AxFlowDynamicContext ): AxFlow< IN, OUT, TNodes, AddNodeResult> > { if (!this.nodes.has(nodeName)) { throw new Error( `Node '${nodeName}' not found. Make sure to define it with .node() first.` ); } const nodeProgram = this.nodeGenerators.get(nodeName); if (!nodeProgram) { throw new Error(`Node program for '${nodeName}' not found.`); } const step = async ( state: AxFlowState, context: Readonly<{ mainAi: AxAIService; mainOptions?: AxProgramForwardOptions; }> ) => { // Determine AI service and options using fallback logic const ai = dynamicContext?.ai ?? context.mainAi; const options = { ...(context.mainOptions ?? {}), ...(dynamicContext?.options ?? {}), } as AxProgramForwardOptions | undefined; // Map the state to node inputs (with type safety) const nodeInputs = mapping(state as TState); // Create trace label for the node execution const traceLabel = options?.traceLabel ? `Node:${nodeName} (${options.traceLabel})` : `Node:${nodeName}`; // Execute the node with updated trace label // Handle both AxGen and AxProgram types let result: any; if ( 'forward' in nodeProgram && typeof nodeProgram.forward === 'function' ) { result = await nodeProgram.forward(ai, nodeInputs, { ...options, traceLabel, }); // Collect usage from the node after execution if ( 'getUsage' in nodeProgram && typeof nodeProgram.getUsage === 'function' ) { const nodeUsage = nodeProgram.getUsage(); if (nodeUsage && nodeUsage.length > 0) { // Store usage for this node const existingUsage = this.nodeUsage.get(nodeName) || []; this.nodeUsage.set(nodeName, [...existingUsage, ...nodeUsage]); } } // Collect traces from the node after execution if ( 'getTraces' in nodeProgram && typeof nodeProgram.getTraces === 'function' ) { const nodeTraces = nodeProgram.getTraces(); if (nodeTraces && nodeTraces.length > 0) { // Store traces for this node const existingTraces = this.nodeTraces.get(nodeName) || []; this.nodeTraces.set(nodeName, [...existingTraces, ...nodeTraces]); } } } else { throw new Error( `Node program for '${nodeName}' does not have a forward method` ); } // Merge result back into state under a key like `${nodeName}Result` return { ...state, [`${nodeName}Result`]: result, }; }; if (this.branchContext?.currentBranchValue !== undefined) { // We're inside a branch - add to current branch const currentBranch = this.branchContext.branches.get( this.branchContext.currentBranchValue ) || []; currentBranch.push(step); this.branchContext.branches.set( this.branchContext.currentBranchValue, currentBranch ); } else { // Normal execution - add to main flow this.flowDefinition.push(step); // Add to execution planner for automatic parallelization if (this.autoParallelConfig.enabled) { this.executionPlanner.addExecutionStep(step, nodeName, mapping); } } // Initialize program when flow structure is updated this.ensureProgram(); // NOTE: This type assertion is necessary for the type-level programming pattern return this as AxFlow< IN, OUT, TNodes, AddNodeResult> >; } /** * Apply optimized configuration to this flow and all node programs. */ public applyOptimization(optimizedProgram: AxOptimizedProgram): void { // Apply to underlying program if created if (this.program && 'applyOptimization' in this.program) { (this.program as any).applyOptimization(optimizedProgram); } // Propagate to all registered node generators for (const [_nodeName, nodeProgram] of Array.from(this.nodeGenerators)) { if ( nodeProgram && 'applyOptimization' in nodeProgram && typeof (nodeProgram as any).applyOptimization === 'function' ) { (nodeProgram as any).applyOptimization(optimizedProgram); } } } /** * Short alias for execute() */ public e< TNodeName extends keyof TNodes & string, TAI extends Readonly, >( nodeName: TNodeName, mapping: (_state: TState) => GetGenIn, dynamicContext?: AxFlowDynamicContext ): AxFlow< IN, OUT, TNodes, AddNodeResult> > { return this.execute(nodeName, mapping, dynamicContext); } /** * Starts a conditional branch based on a predicate function. * * @param predicate - Function that takes state and returns a value to branch on * @returns this (for chaining) * * @example * ```typescript * flow.branch(state => state.qualityResult.needsMoreInfo) * .when(true) * .execute('queryGen', ...) * .when(false) * .execute('answer', ...) * .merge() * ``` */ public branch(predicate: (_state: TState) => unknown): this { if (this.branchContext) { throw new Error('Nested branches are not supported'); } this.branchContext = { predicate: (state: AxFlowState) => predicate(state as TState), branches: new Map(), currentBranchValue: undefined, }; return this; } /** * Short alias for branch() */ public b(predicate: (_state: TState) => unknown): this { return this.branch(predicate); } /** * Defines a branch case for the current branch context. * * @param value - The value to match against the branch predicate result * @returns this (for chaining) */ public when(value: unknown): this { if (!this.branchContext) { throw new Error('when() called without matching branch()'); } this.branchContext.currentBranchValue = value; this.branchContext.branches.set(value, []); return this; } /** * Short alias for when() */ public w(value: unknown): this { return this.when(value); } /** * Merges the results of conditional branches into a single execution path. * * This method is called after defining conditional branches with branch() and when() methods. * It creates a merge point where the flow continues with the results from whichever * branch was executed based on the branch condition. * * How conditional merging works: * 1. The branch predicate is evaluated against the current state * 2. The matching branch's steps are executed sequentially * 3. If no branch matches, the state is returned unchanged * 4. The merged result becomes the new state for subsequent steps * * Type safety note: * The TMergedState generic allows for type-level tracking of what fields * will be available after the merge, though runtime behavior depends on * which branch actually executes. * * @returns AxFlow with updated state type reflecting the merged result * * @example * ```typescript * flow * .branch(state => state.complexity > 0.5) * .when(true) * .execute('complexProcessor', state => ({ input: state.text })) * .when(false) * .execute('simpleProcessor', state => ({ input: state.text })) * .merge() // Combines results from either branch * ``` */ public merge(): AxFlow< IN, OUT, TNodes, TMergedState > { if (!this.branchContext) { throw new Error('merge() called without matching branch()'); } // Capture the branch context before clearing it const branchContext = this.branchContext; this.branchContext = null; // Create the merge step that will execute at runtime const mergeStep = async (state: AxFlowState, context: any) => { // Evaluate the branch predicate to determine which branch to execute const branchValue = branchContext.predicate(state); const branchSteps = branchContext.branches.get(branchValue); if (this.flowLogger) { this.flowLogger({ name: 'BranchEvaluation', timestamp: Date.now(), branchValue, hasMatchingBranch: !!branchSteps, branchStepsCount: branchSteps?.length ?? 0, } as any); } if (!branchSteps) { // No matching branch found - return state unchanged // This can happen if the predicate returns a value that wasn't // defined with a when() clause return state; } // Execute all steps in the matched branch sequentially // Each step receives the output of the previous step as input let currentState = state; for (const step of branchSteps) { currentState = await step(currentState, context); } return currentState; }; // Add the merge step to the main flow execution this.flowDefinition.push(mergeStep); // Register with execution planner for automatic parallelization // This helps with signature inference and dependency analysis if (this.autoParallelConfig.enabled) { this.executionPlanner.addExecutionStep( mergeStep, undefined, undefined, 'merge' ); } // Initialize program when flow structure is updated this.ensureProgram(); // Type-level cast to update the state type while preserving the runtime object // This allows TypeScript to track what fields should be available after the merge return this as unknown as AxFlow; } /** * Short alias for merge() */ public mg(): AxFlow< IN, OUT, TNodes, TMergedState > { return this.merge(); } /** * Executes multiple operations in parallel and provides a merge method for combining results. * * This method enables true parallel execution of independent operations, which is particularly * useful for operations like: * - Multiple document retrievals * - Parallel processing of different data sources * - Independent LLM calls that can run simultaneously * * How parallel execution works: * 1. Each branch function receives a sub-context for defining operations * 2. All branches are executed simultaneously using Promise.all() * 3. Results are stored in _parallelResults for the merge operation * 4. The merge function combines the results into a single field * * Performance benefits: * - Reduces total execution time for independent operations * - Maximizes throughput for I/O-bound operations (like LLM calls) * - Maintains type safety through the merge operation * * @param branches - Array of functions that define parallel operations * @returns Object with merge method for combining results * * @example * ```typescript * flow.parallel([ * subFlow => subFlow.execute('retrieve1', state => ({ query: state.query1 })), * subFlow => subFlow.execute('retrieve2', state => ({ query: state.query2 })), * subFlow => subFlow.execute('retrieve3', state => ({ query: state.query3 })) * ]).merge('documents', (docs1, docs2, docs3) => [...docs1, ...docs2, ...docs3]) * ``` */ public parallel( branches: ( | AxFlowParallelBranch | AxFlowTypedParallelBranch )[] ): { merge( resultKey: TResultKey, mergeFunction: (..._results: unknown[]) => T ): AxFlow; } { // Create the parallel execution step const parallelStep = async ( state: AxFlowState, context: Readonly<{ mainAi: AxAIService; mainOptions?: AxProgramForwardOptions; }> ) => { // Execute branches with batch size control const results = await processBatches( branches, async (branchFn, _index) => { // Create a sub-context for this branch // This isolates each branch's operations from the others const subContext = new AxFlowSubContextImpl(this.nodeGenerators); // Type assertion needed because we support both typed and untyped branch functions // The runtime behavior is the same, but TypeScript needs this for type checking const populatedSubContext = branchFn( subContext as AxFlowSubContext & AxFlowTypedSubContext ); // Execute the sub-context steps and return the result return await populatedSubContext.executeSteps(state, context); }, this.autoParallelConfig.batchSize ); // Store results in a special field for the merge operation // This is a temporary storage that will be cleaned up by the merge return { ...state, _parallelResults: results, }; }; // Add the parallel step to the main flow execution this.flowDefinition.push(parallelStep); // Register with execution planner (marked as 'other' since it's a special case) if (this.autoParallelConfig.enabled) { this.executionPlanner.addExecutionStep( parallelStep, undefined, undefined, 'parallel', undefined, undefined ); } // Initialize program when flow structure is updated this.ensureProgram(); // Return an object with the merge method for combining parallel results return { /** * Merges the results of parallel operations into a single field. * * @param resultKey - Name of the field to store the merged result * @param mergeFunction - Function that combines the parallel results * @returns AxFlow with the merged result added to the state */ merge: ( resultKey: TResultKey, mergeFunction: (...results: unknown[]) => T ): AxFlow => { // Create the merge step that combines parallel results const parallelMergeStep = (state: AxFlowState) => { const results = state._parallelResults; if (!Array.isArray(results)) { throw new Error('No parallel results found for merge'); } // Apply the merge function to combine all parallel results const mergedValue = mergeFunction(...results); // Create new state with the merged result and clean up temporary storage const newState = { ...state }; delete newState._parallelResults; // Properly delete temporary field newState[resultKey] = mergedValue; return newState; }; // Add the merge step to the main flow execution this.flowDefinition.push(parallelMergeStep); // Register with execution planner for signature inference if (this.autoParallelConfig.enabled) { this.executionPlanner.addExecutionStep( parallelMergeStep, undefined, undefined, 'merge', undefined, { resultKey, mergeFunction } ); } // Initialize program when flow structure is updated this.ensureProgram(); // Type-level cast to include the new merged field in the state type return this as AxFlow< IN, OUT, TNodes, TState & { [K in TResultKey]: T } >; }, }; } /** * Short alias for parallel() */ public p( branches: ( | AxFlowParallelBranch | AxFlowTypedParallelBranch )[] ): { merge( resultKey: TResultKey, mergeFunction: (..._results: unknown[]) => T ): AxFlow; } { return this.parallel(branches); } /** * Creates a feedback loop that jumps back to a labeled step if a condition is met. * * @param condition - Function that returns true to trigger the feedback loop * @param targetLabel - The label to jump back to * @param maxIterations - Maximum number of iterations to prevent infinite loops (default: 10) * @returns this (for chaining) * * @example * ```typescript * flow.label('retry-point') * .execute('answer', ...) * .execute('qualityCheck', ...) * .feedback(state => state.qualityCheckResult.confidence < 0.7, 'retry-point') * ``` */ public feedback( condition: (_state: TState) => boolean, targetLabel: string, maxIterations = 10 ): this { if (!this.stepLabels.has(targetLabel)) { throw new Error( `Label '${targetLabel}' not found. Make sure to define it with .label() before the feedback point.` ); } const targetIndex = this.stepLabels.get(targetLabel)!; // Capture the current flow definition length before adding the feedback step // This prevents the feedback step from executing itself recursively const feedbackStepIndex = this.flowDefinition.length; this.flowDefinition.push(async (state, context) => { let currentState = state; let iterations = 1; // Start at 1 since we've already executed once before reaching feedback // Add iteration tracking to state if not present const iterationKey = `_feedback_${targetLabel}_iterations`; if (typeof currentState[iterationKey] !== 'number') { currentState = { ...currentState, [iterationKey]: 1 }; // Initial execution counts as iteration 1 } // Check if we should loop back (iterations < maxIterations since initial execution counts as 1) while (condition(currentState as TState) && iterations < maxIterations) { iterations++; currentState = { ...currentState, [iterationKey]: iterations }; // Execute steps from target index to just before the feedback step // Use feedbackStepIndex to avoid including the feedback step itself for (let i = targetIndex; i < feedbackStepIndex; i++) { const step = this.flowDefinition[i]; if (step) { currentState = await step(currentState, context); } } } return currentState; }); // Initialize program when flow structure is updated (only if we have nodes) if (this.nodeGenerators.size > 0) { this.ensureProgram(); } return this; } /** * Short alias for feedback() */ public fb( condition: (_state: TState) => boolean, targetLabel: string, maxIterations = 10 ): this { return this.feedback(condition, targetLabel, maxIterations); } /** * Marks the beginning of a loop block. * * @param condition - Function that takes the current state and returns a boolean * @param maxIterations - Maximum number of iterations to prevent infinite loops (default: 100) * @returns this (for chaining) * * @example * ```typescript * flow.while(state => state.iterations < 3, 10) * .map(state => ({ ...state, iterations: (state.iterations || 0) + 1 })) * .endWhile() * ``` */ public while( condition: (state: TState) => boolean, maxIterations = 100 ): this { // Store the condition and mark the start of the loop const loopStartIndex = this.flowDefinition.length; this.loopStack.push(loopStartIndex); // Add a placeholder step that will be replaced in endWhile() // We store the condition and maxIterations in the placeholder for later use interface LoopPlaceholder extends AxFlowStepFunction { _condition: (state: TState) => boolean; _maxIterations: number; _isLoopStart: boolean; } const placeholderStep: LoopPlaceholder = Object.assign( (state: AxFlowState) => state, { _condition: condition, _maxIterations: maxIterations, _isLoopStart: true, } ); this.flowDefinition.push(placeholderStep); // Initialize program when flow structure is updated (only if we have nodes) if (this.nodeGenerators.size > 0) { this.ensureProgram(); } return this; } /** * Short alias for while() */ public wh(condition: (_state: TState) => boolean, maxIterations = 100): this { return this.while(condition, maxIterations); } /** * Marks the end of a loop block. * * @returns this (for chaining) */ public endWhile(): this { if (this.loopStack.length === 0) { throw new Error('endWhile() called without matching while()'); } const loopStartIndex = this.loopStack.pop()!; // Get the condition from the placeholder step const placeholderStep = this.flowDefinition[loopStartIndex]; if (!placeholderStep || !('_isLoopStart' in placeholderStep)) { throw new Error('Loop start step not found or invalid'); } const condition = ( placeholderStep as unknown as { _condition: (state: TState) => boolean; _maxIterations: number; } )._condition; const maxIterations = ( placeholderStep as unknown as { _condition: (state: TState) => boolean; _maxIterations: number; } )._maxIterations; // Extract the loop body steps (everything between while and endWhile) const loopBodySteps = this.flowDefinition.splice(loopStartIndex + 1); // Replace the placeholder with the actual loop implementation this.flowDefinition[loopStartIndex] = async (state, context) => { let currentState = state; let iterations = 0; // Execute the loop while condition is true and within iteration limit while (condition(currentState as TState) && iterations < maxIterations) { iterations++; // Execute all steps in the loop body for (const step of loopBodySteps) { currentState = await step(currentState, context); } } // Check if we exceeded the maximum iterations if (iterations >= maxIterations && condition(currentState as TState)) { throw new Error( `While loop exceeded maximum iterations (${maxIterations}). Consider increasing maxIterations or ensuring the loop condition eventually becomes false.` ); } return currentState; }; // Initialize program when flow structure is updated (only if we have nodes) if (this.nodeGenerators.size > 0) { this.ensureProgram(); } return this; } /** * Short alias for endWhile() */ public end(): this { return this.endWhile(); } /** * Derives a new field from an existing field by applying a transform function. * * If the input field contains an array, the transform function is applied to each * array element in parallel with batch size control. If the input field contains * a scalar value, the transform function is applied directly. * * @param outputFieldName - Name of the field to store the result * @param inputFieldName - Name of the existing field to transform * @param transformFn - Function to apply to each element (for arrays) or the value directly (for scalars) * @param options - Options including batch size for parallel processing * @returns this (for chaining) * * @example * ```typescript * // Parallel processing of array items * flow.derive('processedItems', 'items', (item, index) => processItem(item), { batchSize: 5 }) * * // Direct transformation of scalar value * flow.derive('upperText', 'text', (text) => text.toUpperCase()) * ``` */ public derive( outputFieldName: string, inputFieldName: string, transformFn: (value: any, index?: number, state?: TState) => T, options?: { batchSize?: number } ): this { const step = async (state: AxFlowState) => { const inputValue = state[inputFieldName]; if (inputValue === undefined) { throw new Error(`Input field '${inputFieldName}' not found in state`); } let result: T | T[]; if (Array.isArray(inputValue)) { // Array input - use parallel processing with batch control if (this.autoParallelConfig.enabled) { const batchSize = options?.batchSize || this.autoParallelConfig.batchSize; result = await processBatches( inputValue, async (item, index) => { return transformFn(item, index, state as TState); }, batchSize ); } else { // Sequential processing when parallel is disabled result = inputValue.map((item: any, index: number) => transformFn(item, index, state as TState) ); } } else { // Scalar input - apply transform directly result = transformFn(inputValue, undefined, state as TState); } return { ...state, [outputFieldName]: result, }; }; if (this.branchContext?.currentBranchValue !== undefined) { // We're inside a branch - add to current branch const currentBranch = this.branchContext.branches.get( this.branchContext.currentBranchValue ) || []; currentBranch.push(step); this.branchContext.branches.set( this.branchContext.currentBranchValue, currentBranch ); } else { // Normal execution - add to main flow this.flowDefinition.push(step); // Register with execution planner for signature inference and automatic parallelization if (this.autoParallelConfig.enabled) { this.executionPlanner.addExecutionStep( step, undefined, undefined, 'derive', transformFn as any, undefined, { inputFieldName, outputFieldName, batchSize: options?.batchSize, } ); } } // Initialize program when flow structure is updated this.ensureProgram(); return this; } /** * Gets execution plan information for debugging automatic parallelization * * @returns Object with execution plan details */ public getExecutionPlan(): { totalSteps: number; parallelGroups: number; maxParallelism: number; autoParallelEnabled: boolean; steps?: AxFlowExecutionStep[]; groups?: AxFlowParallelGroup[]; } { const planInfo = this.executionPlanner.getExecutionPlan(); return { totalSteps: planInfo.totalSteps, parallelGroups: planInfo.parallelGroups, maxParallelism: planInfo.maxParallelism, autoParallelEnabled: this.autoParallelConfig.enabled, steps: planInfo.steps, groups: planInfo.groups, }; } public getSignature(): AxSignature { this.ensureProgram(); return this.program!.getSignature(); } /** * Creates a new AxFlow node from an existing signature by extending it with additional fields. * * @param name - The name of the new node * @param baseSignature - The base signature to extend (string or AxSignature) * @param extensions - Object defining how to extend the signature * @returns New AxFlow instance with the extended node * * @example * ```typescript * // Create a chain-of-thought node * flow.nodeExtended('reasoner', 'question:string -> answer:string', { * prependOutputs: [ * { name: 'reasoning', type: f.internal(f.string('Step-by-step reasoning')) } * ] * }) * * // Create a node with context and confidence * flow.nodeExtended('analyzer', 'input:string -> output:string', { * appendInputs: [{ name: 'context', type: f.optional(f.string('Context')) }], * appendOutputs: [{ name: 'confidence', type: f.number('Confidence score') }] * }) * ``` */ public nodeExtended( name: TName, baseSignature: string | AxSignature, extensions: { prependInputs?: Array<{ name: string; type: AxFieldType }>; appendInputs?: Array<{ name: string; type: AxFieldType }>; prependOutputs?: Array<{ name: string; type: AxFieldType }>; appendOutputs?: Array<{ name: string; type: AxFieldType }>; } ): AxFlow< IN, OUT, TNodes & { [K in TName]: AxGen }, TState > { // Create base signature const sig = typeof baseSignature === 'string' ? AxSignature.create(baseSignature) : baseSignature; // Apply extensions in the specified order let extendedSig = sig; // Apply prepend inputs first if (extensions.prependInputs) { for (const input of extensions.prependInputs) { extendedSig = extendedSig.prependInputField(input.name, input.type); } } // Apply append inputs if (extensions.appendInputs) { for (const input of extensions.appendInputs) { extendedSig = extendedSig.appendInputField(input.name, input.type); } } // Apply prepend outputs if (extensions.prependOutputs) { for (const output of extensions.prependOutputs) { extendedSig = extendedSig.prependOutputField(output.name, output.type); } } // Apply append outputs if (extensions.appendOutputs) { for (const output of extensions.appendOutputs) { extendedSig = extendedSig.appendOutputField(output.name, output.type); } } // Create the node using the extended signature return this.node(name, extendedSig); } /** * Short alias for nodeExtended() - creates nodes with extended signatures */ public nx( name: TName, baseSignature: string | AxSignature, extensions: { prependInputs?: Array<{ name: string; type: AxFieldType }>; appendInputs?: Array<{ name: string; type: AxFieldType }>; prependOutputs?: Array<{ name: string; type: AxFieldType }>; appendOutputs?: Array<{ name: string; type: AxFieldType }>; } ): AxFlow< IN, OUT, TNodes & { [K in TName]: AxGen }, TState > { return this.nodeExtended(name, baseSignature, extensions); } /** * Applies a final transformation to the state object that updates both state and output type. * This is specifically designed for terminal transformations that shape the final output. * * @param transform - Function that takes the current state and returns the final output * @returns New AxFlow instance with updated OUT and TState types * * @example * ``` * const result = await flow * .node('analyzer', 'userQuestion:string -> analysisResult:string') * .execute('analyzer', state => ({ userQuestion: state.userQuestion })) * .mapOutput(state => ({ * // Note: Node results are typed as AxFieldValue, so you may need to cast * finalAnswer: state.analyzerResult.analysisResult as string * })) * .forward(ai, { userQuestion: 'test' }); * * // result is typed as { finalAnswer: string } * ``` */ public mapOutput( transform: (_state: TState) => TOutput ): AxFlow { // Add the transformation as a regular map step const step = async (state: AxFlowState) => { const result = transform(state as TState); return { ...state, ...result }; }; if (this.branchContext?.currentBranchValue !== undefined) { const currentBranch = this.branchContext.branches.get( this.branchContext.currentBranchValue ) || []; currentBranch.push(step); this.branchContext.branches.set( this.branchContext.currentBranchValue, currentBranch ); } else { this.flowDefinition.push(step); // Add to execution planner for automatic parallelization if (this.autoParallelConfig.enabled) { this.executionPlanner.addExecutionStep( step, undefined, undefined, 'map', transform ); } } // Initialize program when flow structure is updated (only if we have nodes) if (this.nodeGenerators.size > 0) { this.ensureProgram(); } // Return with updated OUT type return this as unknown as AxFlow; } /** * Short alias for mapOutput() */ public mo( transform: (_state: TState) => TOutput ): AxFlow { return this.mapOutput(transform); } } /** * Factory function to create a new AxFlow instance * Similar to ai() for AI services, this creates a fluent flow * * @param options - Optional configuration for the flow * @returns New AxFlow instance * * @example * ```typescript * // Input type is required - provides type safety throughout the flow * const workflow = flow<{ userInput: string }>() * .node('summarizer', 'documentText:string -> summaryText:string') * .execute('summarizer', state => ({ documentText: state.userInput })); * * // Complex input types work great for multi-field workflows * const complexFlow = flow<{ userQuestion: string; context: string }>() * .map(state => ({ * ...state, * processedQuestion: state.userQuestion.toUpperCase() // TypeScript knows this exists! * })); * ``` */ /** * Creates a new AxFlow instance with required input type specification. * * The input type must be specified upfront to enable proper type inference * throughout the flow construction and execution. * * @example * ```typescript * // ✅ Define input type upfront for better type safety * const workflow = flow<{ userInput: string, options?: any }>() * .map(state => ({ ...state, processedInput: state.userInput.toUpperCase() })) * .node('analyzer', 'processedInput:string -> result:string') * * // ✅ Simple input types work too * const simpleFlow = flow<{ documentText: string }>() * .node('summarizer', 'documentText:string -> summary:string') * ``` */ export function flow< TInput extends Record, TOutput = {}, >(options?: { autoParallel?: boolean; batchSize?: number; logger?: AxFlowLoggerFunction; debug?: boolean; }): AxFlow { return AxFlow.create(options); }