import { GraphBuilder } from "./GraphBuilder"; import { Graph, GraphFunctions, GraphNode, GraphNodeID, GraphNodeInput, GraphNodeOutput, isGraphReference, StringKeyOf } from "./GraphTypes"; export enum GraphExecutionNodeState { NotStarted = "NotStarted", Started = "Started", Finished = "Finished", Error = "Error", } export class GraphExecutionNode = StringKeyOf> { public id: GraphNodeID; public node: GraphNode; public input?: GraphNodeInput; public output?: GraphNodeOutput; public error?: Error; public state = GraphExecutionNodeState.NotStarted; public dependents = new Set>(); constructor(id: GraphNodeID, node: GraphNode) { this.id = id; this.node = node; } } export function equals(a: any, b: any) { return a === b || JSON.stringify(a) === JSON.stringify(b); } /** * @returns true if this node is not dependent upon other nodes outputs. */ function isIndependent = StringKeyOf>(node: GraphNode) { for (const arg of node.input) { if (isGraphReference(arg)) { return false; } } return true; } export class GraphExecutor { public readonly nodes = new Map>(); constructor(private readonly functions: GFS, graph?: Graph) { if (graph) { this.update(graph); } } /** * @returns a new empty GraphBuilder with the same GraphFunctions type. */ builder() { return new GraphBuilder({}); } /** * Updates the execution graph while retaining all previous calculations. * This allows a second execution to take place much more quickly. */ public update(graph: Graph) { // update or add all new nodes for (const [id, node] of Object.entries(graph)) { const previous = this.nodes.get(id); if (previous) { if (equals(node, previous.node)) { previous.node = node; // hard reset this node by deleting the previous inputs. previous.input = undefined; } previous.state = GraphExecutionNodeState.NotStarted previous.dependents.clear(); } else { this.nodes.set(id, new GraphExecutionNode(id, node)); } } // delete any missing nodes for (const id of this.nodes.keys()) { if (!graph[id]) { this.nodes.delete(id); } } } public getNode(id: GraphNodeID) { return this.nodes.get(id)!; } private areAllFinished() { return this.getNodesByState(GraphExecutionNodeState.Finished).length === this.nodes.size; } public getNodesByState(state: GraphExecutionNodeState): GraphExecutionNode[] { return [...this.nodes.values()].filter(node => node.state === state); } public getNodesByType>(type: Type): GraphExecutionNode[] { return [...this.nodes.values()].filter(node => node.node.type === type) as GraphExecutionNode[]; } public getOutputsByType>(type: Type): GraphNodeOutput[] { return [...this.nodes.values()].filter(node => node.node.type === type).map(node => node.output!); } private getInputsIfFinished>(node: GraphNode): GraphNodeInput | undefined { let inputs: any[] = []; for (let input of node.input as any[]) { if (isGraphReference(input)) { let node = this.nodes.get(input.ref)!; if (node.state === GraphExecutionNodeState.Finished) { inputs.push(node.output!); } else { return undefined; } } else { inputs.push(input); } } return inputs as GraphNodeInput; } private calculateDependents() { for (const [id, node] of this.nodes) { for (const arg of node.node.input) { if (isGraphReference(arg)) { this.getNode(arg.ref).dependents.add(node); } } } } /** * Returns a promise that will resolve when the entire graph is executed or reject if any node throws. */ public async execute() { this.calculateDependents(); return new Promise((resolve, reject) => { this.executeFrame(resolve, reject); }); } /** * start execution of all operations which are not awaiting other operations. */ private async executeFrame(finished: (value: boolean) => void, error: (e: any) => void) { let DEBUG = false; if (this.areAllFinished()) { return finished(true); } for (let node of this.getNodesByState(GraphExecutionNodeState.NotStarted)) { const inputs = this.getInputsIfFinished(node.node); if (inputs) { if (node.input) { // this node did not need to be recalculated. node.state = GraphExecutionNodeState.Finished; continue; } node.input = inputs; node.state = GraphExecutionNodeState.Started; const handler = this.functions[node.node.type]; if (DEBUG) { console.log("Started " + JSON.stringify(node.node.type)); } handler(...inputs) .then((result) => { // console.log("====> RESULT: " + JSON.stringify(nodeState.operation), result); // if we return a thing. // mark self and any ancestors finished node.output = result; node.state = GraphExecutionNodeState.Finished; // update dependents (only necessary when re-executing an updated graph) for (const dependent of node.dependents) { dependent.node.input.forEach((arg, index) => { if (dependent.input) { if (isGraphReference(arg) && arg.ref === node.id) { const changed = equals(dependent.input[index],node.output); if (changed) { // delete the dependent input which forces a new execution since input has changed. dependent.input = undefined; } } } }); } if (DEBUG) { console.log("STATE", [...this.nodes.values()].map(o => JSON.stringify({ input: o.node.input, id: o.id, type: o.node.type }).slice(0, 140) + " ... -> " + o.state)); console.log("Finished " + JSON.stringify(node.node.type)); } this.executeFrame(finished, error); }, (e) => { if (DEBUG) { console.log("====> ERROR: " + JSON.stringify(node.node.type), e); } node.error = e; node.state = GraphExecutionNodeState.Error; error(e); }) } } } }