{"version":3,"sources":["../../../packages/core/notification/work-item-connection.ts"],"names":[],"mappings":"AAAA,OAAO,EAAe,UAAU,EAAkB,MAAM,MAAM,CAAC;AAE/D,OAAO,EAAE,iBAAiB,EAAE,MAAM,4BAA4B,CAAC;AAC/D,OAAO,EAAE,cAAc,EAAE,MAAM,yBAAyB,CAAC;AACzD,OAAO,EAAE,GAAG,EAAE,MAAM,YAAY,CAAC;AAKjC,OAAO,EAAE,sBAAsB,EAAE,MAAM,2BAA2B,CAAC;AAEnE,OAAO,EAAE,eAAe,EAAE,MAAM,qBAAqB,CAAC;AACtD,OAAO,EAAE,kBAAkB,EAAuB,cAAc,EAAE,qBAAqB,EAAE,MAAM,qBAAqB,CAAC;AAErH;;GAEG;AACH,qBAAa,kBAAkB;IAWvB,OAAO,CAAC,GAAG;IAVR,eAAe,EAAE,eAAe,CAAC;IAExC;;;;;;OAMG;gBAES,GAAG,EAAE,GAAG,EAChB,iBAAiB,EAAE,iBAAiB,EACpC,cAAc,EAAE,cAAc,EAC9B,sBAAsB,EAAE,sBAAsB;IAMlD;;;;;;OAMG;IACI,MAAM,CAAC,QAAQ,EAAE,MAAM,EAAE,OAAO,EAAE,qBAAqB,GAAG,UAAU,CAAC,cAAc,CAAC;IA6C3F;;;;;OAKG;IACI,KAAK,CAAC,EAAE,EAAE,MAAM,GAAG,UAAU,CAAC,cAAc,CAAC;IAepD;;;;;;;OAOG;IACI,IAAI,CAAC,QAAQ,EAAE,MAAM,EAAE,UAAU,EAAE,MAAM,EAAE,MAAM,EAAE,MAAM,GAAG,UAAU,CAAC,kBAAkB,CAAC;IAejG;;;;;;;;OAQG;IACI,aAAa,CACZ,QAAQ,EAAE,MAAM,EAChB,OAAO,EAAE,qBAAqB,EAC9B,OAAO,GAAE,MAAU,EACnB,QAAQ,GAAE,MAAa,GAAG,UAAU,CAAC,cAAc,CAAC;IAI5D;;;;;;;OAOG;IACI,IAAI,CAAC,cAAc,EAAE,cAAc,EAAE,OAAO,GAAE,MAAU,EAAE,QAAQ,GAAE,MAAa,GAAG,UAAU,CAAC,cAAc,CAAC;IAQrH;;;;;OAKG;IACI,UAAU,CAAC,MAAM,EAAE,cAAc,GAAG,OAAO;IAQlD;;;;;;;OAOG;IACH,OAAO,CAAC,cAAc;CAiBzB","file":"work-item-connection.d.ts","sourcesContent":["import { EMPTY, from, Observable, of, throwError } from 'rxjs';\r\nimport { delay, expand, filter, mergeMap } from 'rxjs/operators';\r\nimport { GatewayConnection } from '../data/gateway-connection';\r\nimport { NodeConnection } from '../data/node-connection';\r\nimport { Rpc } from '../rpc/rpc';\r\nimport { RpcWorkItemClient } from '../rpc/work-item/rpc-work-item-client';\r\nimport { RpcWorkItem } from '../rpc/work-item/rpc-work-item-model';\r\nimport { RpcWorkItemFindClient } from '../rpc/work-item-find/rpc-work-item-find-client';\r\nimport { RpcWorkItemFind } from '../rpc/work-item-find/rpc-work-item-find-model';\r\nimport { NotificationConnection } from './notification-connection';\r\nimport { NotificationState } from './notification-state';\r\nimport { WorkItemManager } from './work-item-manager';\r\nimport { WorkItemFindResult, WorkItemRequestType, WorkItemResult, WorkItemSubmitRequest } from './work-item-request';\r\n\r\n/**\r\n * Work item connection to submit a powershell work item, and to query its state.\r\n */\r\nexport class WorkItemConnection {\r\n    public workItemManager: WorkItemManager;\r\n\r\n    /**\r\n     * Initializes a new instance of the WorkItemConnection class.\r\n     *\r\n     * @param rpc the RPC.\r\n     * @param gatewayConnection the gateway connection.\r\n     * @param notificationConnection the notification connection.\r\n     */\r\n    constructor(\r\n        private rpc: Rpc,\r\n        gatewayConnection: GatewayConnection,\r\n        nodeConnection: NodeConnection,\r\n        notificationConnection: NotificationConnection) {\r\n        if (notificationConnection.notificationManager) {\r\n            this.workItemManager = new WorkItemManager(this.rpc, gatewayConnection, nodeConnection, notificationConnection);\r\n        }\r\n    }\r\n\r\n    /**\r\n     * Submit a work item either directly to NotificationManager or through RPC.\r\n     *\r\n     * @param nodeName the name of the node to submit the item against.\r\n     * @param request the work item request.\r\n     * @return Observable the observable of WorkItemResult object.\r\n     */\r\n    public submit(nodeName: string, request: WorkItemSubmitRequest): Observable<WorkItemResult> {\r\n        const data: RpcWorkItem = <any>{\r\n            ...request,\r\n            ...{\r\n                type: WorkItemRequestType.WorkItemSubmit,\r\n                sourceName: MsftSme.self().Environment.name,\r\n                nodeName: nodeName,\r\n                timestamp: Date.now()\r\n            }\r\n        };\r\n\r\n        if (this.workItemManager) {\r\n            return this.workItemManager.submitWorkItem(data);\r\n        }\r\n\r\n        // configure PowerShell module name before sending to RPC.\r\n        if (data.powerShellCommand && data.powerShellCommand.module == null && MsftSme.self().Init.powerShellModuleName) {\r\n            data.powerShellCommand.module = MsftSme.self().Init.powerShellModuleName;\r\n        }\r\n\r\n        const interval = 100;\r\n        return from(RpcWorkItemClient.submitOrQueryWorkItem(this.rpc, data))\r\n            .pipe(\r\n                expand((result) => {\r\n                    if (result.id) {\r\n                        return EMPTY;\r\n                    }\r\n\r\n                    return of(result)\r\n                        .pipe(\r\n                            delay(interval),\r\n                            mergeMap(() => {\r\n                                const pollingData = <RpcWorkItem>{\r\n                                    type: WorkItemRequestType.StateQuery,\r\n                                    sourceName: MsftSme.self().Environment.name,\r\n                                    timestamp: Date.now(),\r\n                                    sequenceId: result.sequenceId\r\n                                };\r\n\r\n                                return RpcWorkItemClient.submitOrQueryWorkItem(this.rpc, pollingData);\r\n                            }));\r\n                }),\r\n                filter((result) => !!result.id));\r\n    }\r\n\r\n    /**\r\n     * Query a work item either directly to NotificationManager or through RPC.\r\n     *\r\n     * @param id the ID of the work item.\r\n     * @return Observable the observable of WorkItemResult object.\r\n     */\r\n    public query(id: string): Observable<WorkItemResult> {\r\n        const data = <RpcWorkItem>{\r\n            type: WorkItemRequestType.StateQuery,\r\n            sourceName: MsftSme.self().Environment.name,\r\n            timestamp: Date.now(),\r\n            id: id\r\n        };\r\n\r\n        if (this.workItemManager) {\r\n            return this.workItemManager.queryWorkItem(data);\r\n        }\r\n\r\n        return from(RpcWorkItemClient.submitOrQueryWorkItem(this.rpc, data));\r\n    }\r\n\r\n    /**\r\n     * Find existing work item with state.\r\n     *\r\n     * @param nodeName the node name.\r\n     * @param moduleName the module name.\r\n     * @param typeId the type ID.\r\n     * @return Observable<WorkItemFindResult> the observable of WorkItemFindResult.\r\n     */\r\n    public find(nodeName: string, moduleName: string, typeId: string): Observable<WorkItemFindResult> {\r\n        const data = <RpcWorkItemFind>{\r\n            sourceName: MsftSme.self().Environment.name,\r\n            nodeName: nodeName,\r\n            moduleName: moduleName,\r\n            typeId: typeId\r\n        };\r\n\r\n        if (this.workItemManager) {\r\n            return of<WorkItemFindResult>(this.workItemManager.notificationConnection.notificationManager.workItemFind(data));\r\n        }\r\n\r\n        return from(RpcWorkItemFindClient.workItemFind(this.rpc, data));\r\n    }\r\n\r\n    /**\r\n     * Submit a work item either directly to NotificationManager or through RPC, and wait for completion.\r\n     *\r\n     * @param nodeName the name of the node to submit the work item against\r\n     * @param request the work item request.\r\n     * @param timeout the timeout milliseconds. (optional, default forever until unsubscribe)\r\n     * @param interval the interval period milliseconds. (optional, default 1 sec)\r\n     * @return Observable the observable of WorkItemResult object.\r\n     */\r\n    public submitAndWait(\r\n            nodeName: string,\r\n            request: WorkItemSubmitRequest,\r\n            timeout: number = 0,\r\n            interval: number = 1000): Observable<WorkItemResult> {\r\n        return this.waitObservable(this.submit(nodeName, request), timeout, interval);\r\n    }\r\n\r\n    /**\r\n     * Wait for existing work item with state.\r\n     *\r\n     * @param workItemResult the work item result to wait for the final result.\r\n     * @param timeout the timeout milliseconds. (optional, default forever until unsubscribe)\r\n     * @param interval the interval period milliseconds. (optional, default 1 sec)\r\n     * @return Observable<WorkItemFindResult> the observable of WorkItemFindResult.\r\n     */\r\n    public wait(workItemResult: WorkItemResult, timeout: number = 0, interval: number = 1000): Observable<WorkItemResult> {\r\n        if (!workItemResult) {\r\n            return of(null);\r\n        }\r\n\r\n        return this.waitObservable(of(workItemResult), timeout, interval);\r\n    }\r\n\r\n    /**\r\n     * Check if work item result was finished.\r\n     *\r\n     * @param result the work item result.\r\n     * @return boolean true if work item was completed with success or error.\r\n     */\r\n    public isFinished(result: WorkItemResult): boolean {\r\n        if (result.state !== NotificationState.Started && result.state !== NotificationState.InProgress) {\r\n            return true;\r\n        }\r\n\r\n        return false;\r\n    }\r\n\r\n    /**\r\n     * Wait for existing work item with state.\r\n     *\r\n     * @param workItemResult the work item result to wait for the final result.\r\n     * @param timeout the timeout milliseconds. (optional, default forever until unsubscribe)\r\n     * @param interval the interval period milliseconds. (optional, default 1 sec)\r\n     * @return Observable<WorkItemFindResult> the observable of WorkItemFindResult.\r\n     */\r\n    private waitObservable(observable: Observable<any>, timeout: number = 0, interval: number = 1000): Observable<WorkItemResult> {\r\n         return observable\r\n            .pipe(\r\n                expand((result, index) => {\r\n                    if (this.isFinished(result)) {\r\n                        return EMPTY;\r\n                    } else if (timeout > 0 && interval * index > timeout) {\r\n                        return throwError(() => new Error('timeout'));\r\n                    }\r\n\r\n                    return of(result)\r\n                        .pipe(\r\n                            delay(interval),\r\n                            mergeMap(() => this.query(result.id)));\r\n                }),\r\n                filter((result) => this.isFinished(result)));\r\n    }\r\n}\r\n"]}