{"version":3,"sources":["../../../packages/core/workflow/persistent-workflow-runner.ts"],"names":[],"mappings":"AAAA,OAAO,EAAgB,UAAU,EAAkB,MAAM,MAAM,CAAC;AAMhE,OAAO,EAAE,yBAAyB,EAAE,MAAM,+BAA+B,CAAC;AAC1E,OAAO,EAA6B,qBAAqB,EAAE,MAAM,+BAA+B,CAAC;AACjG,OAAO,EAAE,uBAAuB,EAAE,MAAM,6BAA6B,CAAC;AAOtE;;GAEG;AACH,MAAM,WAAW,wBAAwB,CAAC,OAAO;IAC7C;;OAEG;IACH,UAAU,EAAE,MAAM,CAAC;IAEnB;;OAEG;IACH,MAAM,EAAE,OAAO,CAAC;IAEhB;;OAEG;IACH,IAAI,EAAE,qBAAqB,EAAE,CAAC;IAE9B;;OAEG;IACH,KAAK,CAAC,EAAE,GAAG,CAAC;IAEZ;;;OAGG;IACH,KAAK,IAAI,UAAU,CAAC,IAAI,CAAC,CAAC;CAC7B;AAWD;;GAEG;AACH,qBAAa,wBAAwB;IAiBrB,OAAO,CAAC,eAAe;IAA2B,OAAO,CAAC,OAAO;IAhB7E;;OAEG;IACH,gBAAuB,UAAU,KAAK;IAEtC;;OAEG;IACH,OAAO,CAAC,kBAAkB,CAAoD;IAE9E;;;;;OAKG;gBACiB,eAAe,EAAE,uBAAuB,EAAU,OAAO,EAAE,yBAAyB;IAGxG;;;;OAIG;IACI,KAAK,CAAC,OAAO,EAAE,WAAW,EAAE,GAAG,GAAG,UAAU,CAAC,wBAAwB,CAAC,OAAO,CAAC,CAAC;IAwBtF;;OAEG;IACI,cAAc,CAAC,OAAO,KAAK,UAAU,CAAC,wBAAwB,CAAC,OAAO,CAAC,CAAC;IA0C/E,OAAO,CAAC,OAAO;IAqDf,OAAO,CAAC,aAAa;IAgErB,OAAO,CAAC,WAAW;IA6DnB,OAAO,CAAC,WAAW;IAYnB,OAAO,CAAC,UAAU;IAelB,OAAO,CAAC,IAAI;CAGf","file":"persistent-workflow-runner.d.ts","sourcesContent":["import { EMPTY, merge, Observable, of, throwError } from 'rxjs';\r\nimport { catchError, expand, filter, map, switchMap, take, tap } from 'rxjs/operators';\r\nimport { PersistentWorkItem } from './persistent-work-item';\r\nimport { PersistentWorkItemApplyState } from './persistent-work-item-apply-state';\r\nimport { PersistentWorkItemState } from './persistent-work-item-state';\r\nimport { PersistentWorkflow } from './persistent-workflow';\r\nimport { PersistentWorkflowBuilder } from './persistent-workflow-builder';\r\nimport { PersistentWorkflowContext, PersistentWorkflowLog } from './persistent-workflow-context';\r\nimport { PersistentWorkflowStore } from './persistent-workflow-store';\r\n\r\ntype PersistentDataType = any;\r\ntype TransitDataType = any;\r\ntype PersistentContextType = PersistentWorkflowContext<TransitDataType, PersistentDataType>;\r\ntype PersistentWorkItemType = PersistentWorkItem<PersistentContextType>;\r\n\r\n/**\r\n * The result data structure of start() and stateFromStore().\r\n */\r\nexport interface PersistentWorkflowResult<TResult> {\r\n    /**\r\n     * The instance ID.\r\n     */\r\n    instanceId: number;\r\n\r\n    /**\r\n     * The result object which is the last transitData defined at finalize() function.\r\n     */\r\n    result: TResult;\r\n\r\n    /**\r\n     * The record of logs.\r\n     */\r\n    logs: PersistentWorkflowLog[];\r\n\r\n    /**\r\n     * The error object produced from any of calls.\r\n     */\r\n    error?: any;\r\n\r\n    /**\r\n     * Clear the snapshot of the context.\r\n     * Must call this function to stop tracking the workflow.\r\n     */\r\n    clear(): Observable<void>;\r\n}\r\n\r\n/**\r\n * Internal looping context.\r\n */\r\ninterface LoopContext {\r\n    completed: boolean;\r\n    context: PersistentContextType;\r\n    workItem: PersistentWorkItemType;\r\n}\r\n\r\n/**\r\n * The runner of workflow.\r\n */\r\nexport class PersistentWorkflowRunner {\r\n    /**\r\n     * Workflow always starts from startingId which is \"1\".\r\n     */\r\n    public static readonly startingId = 1;\r\n\r\n    /**\r\n     * The collection of workflows. It allows multiple instances of workflows to track.\r\n     */\r\n    private workflowCollection: { [instanceId: number]: PersistentWorkflow } = {};\r\n\r\n    /**\r\n     * Initializes a new instance of the PersistentWorkflowRunner class.\r\n     *\r\n     * @param persistentStore The persistent store instance.\r\n     * @param builder The builder of the workflow.\r\n     */\r\n    constructor(private persistentStore: PersistentWorkflowStore, private builder: PersistentWorkflowBuilder) {\r\n    }\r\n\r\n    /**\r\n     * Start a new single workflow.\r\n     *\r\n     * @param transitData the transit data for initialization for the first workflow.\r\n     */\r\n    public start<TResult>(transitData: any): Observable<PersistentWorkflowResult<TResult>> {\r\n        const workflow = this.builder.build();\r\n        const firstWorkItem = workflow.collection.find(item => item.id === PersistentWorkflowRunner.startingId);\r\n        if (!firstWorkItem || firstWorkItem.state !== PersistentWorkItemState.NotStarted) {\r\n            return throwError(() => new Error('Couldn\\'t find first work item.'));\r\n        }\r\n\r\n        const context: PersistentContextType = {\r\n            transitData,\r\n            persistentData: null,\r\n            applyState: PersistentWorkItemApplyState.Required,\r\n            store: {\r\n                save: this.save.bind(this)\r\n            },\r\n            logs: [],\r\n            workflow\r\n        };\r\n        return this.checkpoint(firstWorkItem, context, PersistentWorkItemState.NotStarted)\r\n            .pipe(switchMap(instanceId => {\r\n                this.workflowCollection[instanceId] = workflow;\r\n                return this.process(firstWorkItem, context);\r\n            }));\r\n    }\r\n\r\n    /**\r\n     * Start workflows from existing snapshots data from the store.\r\n     */\r\n    public startFromStore<TResult>(): Observable<PersistentWorkflowResult<TResult>> {\r\n        return this.persistentStore.restore()\r\n        .pipe(switchMap(list => {\r\n            if (list.length === 0) {\r\n                return EMPTY;\r\n            }\r\n\r\n            const observables: Observable<PersistentWorkflowResult<TransitDataType>>[] = [];\r\n            for (const snapshot of list) {\r\n                const workflow = this.builder.build();\r\n                const firstWorkItem = workflow.collection.find(item => item.id === snapshot.workItemId);\r\n                if (!firstWorkItem) {\r\n                    return throwError(() => new Error('Couldn\\'t locate the work item to restart.'));\r\n                }\r\n\r\n                const context: PersistentContextType = {\r\n                    transitData: null,\r\n                    persistentData: snapshot.workItemData,\r\n                    applyState: PersistentWorkItemApplyState.Required,\r\n                    instanceId: snapshot.instanceId,\r\n                    store: {\r\n                        save: this.save.bind(this)\r\n                    },\r\n                    logs: [],\r\n                    workflow,\r\n                    restoreVersion: snapshot.version\r\n                };\r\n                firstWorkItem.state = snapshot.workItemState;\r\n                if (firstWorkItem.state === PersistentWorkItemState.Applying) {\r\n                    // at default apply() call will be omitted.\r\n                    context.applyState = PersistentWorkItemApplyState.Skip;\r\n                    firstWorkItem.state = PersistentWorkItemState.PreValidateByRestore;\r\n                }\r\n\r\n                this.workflowCollection[snapshot.instanceId] = workflow;\r\n                observables.push(this.process(firstWorkItem, context));\r\n            }\r\n\r\n            return merge(...observables);\r\n        }));\r\n    }\r\n\r\n    private process(\r\n        workItem: PersistentWorkItemType, context: PersistentContextType): Observable<PersistentWorkflowResult<TransitDataType>> {\r\n        return this.processNext(workItem, context)\r\n            .pipe(\r\n                catchError((error) => this.processResult(workItem, context, error)),\r\n                switchMap(innerContext => {\r\n                    const loopContext: LoopContext = {\r\n                        completed: !!context.error,\r\n                        context: innerContext,\r\n                        workItem\r\n                    };\r\n                    return this.processResult(loopContext.workItem, loopContext.context)\r\n                        .pipe(map(() => loopContext));\r\n                }),\r\n                expand((loopContext) => {\r\n                    if (loopContext.completed) {\r\n                        return EMPTY;\r\n                    }\r\n\r\n                    return this.processNext(loopContext.workItem, loopContext.context)\r\n                        .pipe(\r\n                            tap(() => {\r\n                                if (loopContext.workItem.state === PersistentWorkItemState.Completed) {\r\n                                    loopContext.workItem = this.processStep(loopContext.workItem, loopContext.context);\r\n                                    if (!loopContext.workItem) {\r\n                                        loopContext.completed = true;\r\n                                    }\r\n                                }\r\n                            }),\r\n                            catchError((error) => this.processResult(loopContext.workItem, loopContext.context, error)),\r\n                            switchMap(inner2Context => {\r\n                                loopContext.completed = loopContext.completed || !!inner2Context.error;\r\n                                loopContext.context = inner2Context;\r\n                                return this.processResult(loopContext.workItem, loopContext.context)\r\n                                        .pipe(map(() => loopContext));\r\n                            })\r\n                        );\r\n                }),\r\n                filter(loopContext => loopContext.completed),\r\n                take(1),\r\n                map(innerContext => {\r\n                    const result: PersistentWorkflowResult<TransitDataType> = {\r\n                        logs: innerContext.context.logs,\r\n                        result: innerContext.context.transitData,\r\n                        instanceId: innerContext.context.instanceId,\r\n                        clear: () => this.persistentStore.clear(innerContext.context.instanceId),\r\n                        error: innerContext.context.error\r\n                    };\r\n                    return result;\r\n                })\r\n            );\r\n    }\r\n\r\n    private processResult(\r\n        workItem: PersistentWorkItemType, context: PersistentContextType, error?: any): Observable<PersistentContextType> {\r\n        if (!workItem) {\r\n            return of(context);\r\n        }\r\n\r\n        if (context.error) {\r\n            return of(context);\r\n        }\r\n\r\n        let checkpoint: () => Observable<number> = () => of(null);\r\n        if (!error) {\r\n            switch (workItem.state) {\r\n                case PersistentWorkItemState.PreValidatingByRestore:\r\n                    checkpoint = () => this.checkpoint(workItem, context, PersistentWorkItemState.PreValidateByRestoreSuccess);\r\n                    break;\r\n                case PersistentWorkItemState.PreValidating:\r\n                    checkpoint = () => this.checkpoint(workItem, context, PersistentWorkItemState.PreValidateSuccess);\r\n                    break;\r\n                case PersistentWorkItemState.Applying:\r\n                    checkpoint = () => this.checkpoint(workItem, context, PersistentWorkItemState.ApplySuccess);\r\n                    break;\r\n                case PersistentWorkItemState.PostValidating:\r\n                    checkpoint = () => this.checkpoint(workItem, context, PersistentWorkItemState.PostValidateSuccess);\r\n                    break;\r\n                case PersistentWorkItemState.NotStarted:\r\n                case PersistentWorkItemState.Completed:\r\n                    break;\r\n                default:\r\n                    checkpoint = () => throwError(() => new Error('Unexpected success handling state.'));\r\n            }\r\n\r\n            return of(null)\r\n                .pipe(\r\n                    switchMap(checkpoint),\r\n                    map(() => context)\r\n                );\r\n        }\r\n\r\n        context.error = error;\r\n        switch (workItem.state) {\r\n            case PersistentWorkItemState.PreValidatingByRestore:\r\n                checkpoint = () => this.checkpoint(workItem, context, PersistentWorkItemState.PreValidateByRestoreError);\r\n                break;\r\n            case PersistentWorkItemState.PreValidating:\r\n                checkpoint = () => this.checkpoint(workItem, context, PersistentWorkItemState.PreValidateError);\r\n                break;\r\n            case PersistentWorkItemState.Applying:\r\n                checkpoint = () => this.checkpoint(workItem, context, PersistentWorkItemState.ApplyError);\r\n                break;\r\n            case PersistentWorkItemState.PostValidating:\r\n                checkpoint = () => this.checkpoint(workItem, context, PersistentWorkItemState.PostValidateError);\r\n                break;\r\n            default:\r\n                return throwError(() => new Error('Unexpected error handling state.'));\r\n        }\r\n\r\n        return of(null)\r\n            .pipe(\r\n                switchMap(checkpoint),\r\n                map(() => context)\r\n            );\r\n    }\r\n\r\n    private processNext(workItem: PersistentWorkItemType, context: PersistentContextType): Observable<PersistentContextType> {\r\n        let checkpointFunction: () => Observable<number>;\r\n        let observableFunction: () => Observable<PersistentContextType> = null;\r\n        switch (workItem.state) {\r\n            case PersistentWorkItemState.PreValidateByRestore:\r\n            case PersistentWorkItemState.PreValidatingByRestore:\r\n                checkpointFunction = () => this.checkpoint(workItem, context, PersistentWorkItemState.PreValidatingByRestore);\r\n                observableFunction = () => workItem.preValidate(context);\r\n                break;\r\n\r\n            case PersistentWorkItemState.NotStarted:\r\n            case PersistentWorkItemState.PreValidating:\r\n                workItem.init(context);\r\n                checkpointFunction = () => this.checkpoint(workItem, context, PersistentWorkItemState.PreValidating);\r\n                observableFunction = () => workItem.preValidate(context);\r\n                break;\r\n\r\n            case PersistentWorkItemState.PreValidateByRestoreSuccess:\r\n            case PersistentWorkItemState.PreValidateSuccess:\r\n                if (context.applyState === PersistentWorkItemApplyState.Skip) {\r\n                    checkpointFunction = () => this.checkpoint(workItem, context, PersistentWorkItemState.PostValidating);\r\n                    observableFunction = () => workItem.postValidate(context);\r\n                } else {\r\n                    checkpointFunction = () => this.checkpoint(workItem, context, PersistentWorkItemState.Applying);\r\n                    observableFunction = () => workItem.apply(context);\r\n                }\r\n\r\n                break;\r\n\r\n            case PersistentWorkItemState.PostValidating:\r\n            case PersistentWorkItemState.ApplySuccess:\r\n                checkpointFunction = () => this.checkpoint(workItem, context, PersistentWorkItemState.PostValidating);\r\n                observableFunction = () => workItem.postValidate(context);\r\n                break;\r\n\r\n            case PersistentWorkItemState.PostValidateSuccess:\r\n            case PersistentWorkItemState.Completed:\r\n                workItem.finalize(context);\r\n                context.applyState = PersistentWorkItemApplyState.Required;\r\n                checkpointFunction = () => this.checkpoint(workItem, context, PersistentWorkItemState.Completed);\r\n                break;\r\n\r\n            default:\r\n                return throwError(() => new Error('Unexpected process next state.'));\r\n        }\r\n\r\n        if (!observableFunction) {\r\n            return of(null)\r\n                .pipe(\r\n                    switchMap(checkpointFunction),\r\n                    map(() => context)\r\n                );\r\n        }\r\n\r\n        return of(null)\r\n            .pipe(\r\n                switchMap(checkpointFunction),\r\n                switchMap(observableFunction)\r\n            );\r\n    }\r\n\r\n    private processStep(workItem: PersistentWorkItemType, context: PersistentContextType): PersistentWorkItemType {\r\n        if (workItem.state === PersistentWorkItemState.ApplyError || workItem.state === PersistentWorkItemState.PostValidateError) {\r\n            return null;\r\n        }\r\n\r\n        if (workItem.state === PersistentWorkItemState.Completed && workItem.nextId < 0) {\r\n            return null;\r\n        }\r\n\r\n        return this.workflowCollection[context.instanceId].collection.find(item => item.id === workItem.nextId);\r\n    }\r\n\r\n    private checkpoint(\r\n        workItem: PersistentWorkItemType, context: PersistentContextType, state: PersistentWorkItemState): Observable<number> {\r\n        return this.persistentStore.save(\r\n            context.workflow.version,\r\n            workItem.id,\r\n            state,\r\n            context.persistentData,\r\n            context.instanceId)\r\n            .pipe(map(snapshot => {\r\n                workItem.state = state;\r\n                context.logs.push({ time: Date.now(), name: workItem.name, state: PersistentWorkItemState[state] });\r\n                return context.instanceId = snapshot.instanceId;\r\n            }));\r\n    }\r\n\r\n    private save(workItem: PersistentWorkItemType, context: PersistentContextType): Observable<number> {\r\n        return this.checkpoint(workItem, context, workItem.state);\r\n    }\r\n}\r\n"]}