{"version":3,"sources":["../../../packages/core/rpc/rpc-observable-client.ts"],"names":[],"mappings":"AAAA,OAAO,EAAQ,UAAU,EAA4B,MAAM,MAAM,CAAC;AAGlE,OAAO,EAAE,GAAG,EAAE,MAAM,OAAO,CAAC;AAE5B,OAAO,EAAwB,2BAA2B,EAAE,MAAM,0BAA0B,CAAC;AAC7F,OAAO,EAAuB,0BAA0B,EAAE,MAAM,yBAAyB,CAAC;AAC1F,OAAO,EAAE,WAAW,EAAE,MAAM,gBAAgB,CAAC;AAG7C;;GAEG;AACH,qBAAa,mBAAmB,CAC5B,QAAQ,SAAS,0BAA0B,EAC3C,OAAO,SAAS,2BAA2B,EAC3C,MAAM,SAAS,2BAA2B;IAe9B,SAAS,CAAC,GAAG,EAAE,GAAG;IAAE,SAAS,CAAC,OAAO,EAAE,MAAM;IAAE,SAAS,CAAC,OAAO,EAAE,MAAM;IAbpF,OAAO,CAAC,EAAE,CAAK;IACf,OAAO,CAAC,cAAc,CAAS;IAC/B,OAAO,CAAC,aAAa,CAAS;IAC9B,OAAO,CAAC,YAAY,CAA8C;IAClE,OAAO,CAAC,eAAe,CAAwD;IAE/E;;;;;;OAMG;gBACmB,GAAG,EAAE,GAAG,EAAY,OAAO,EAAE,MAAM,EAAY,OAAO,EAAE,MAAM;IAQpF;;;;;;OAMG;IACH,SAAS,CAAC,cAAc,CAAC,OAAO,EAAE,QAAQ,EAAE,QAAQ,CAAC,EAAE,WAAW,GAAG,UAAU,CAAC,OAAO,CAAC;IAuCxF;;;;;;OAMG;IACH,OAAO,CAAC,OAAO;IAQf;;OAEG;IACH,OAAO,CAAC,YAAY;IAoBpB;;;;OAIG;IACH,OAAO,CAAC,OAAO;CAWlB","file":"rpc-observable-client.d.ts","sourcesContent":["import { from, Observable, Observer, of, throwError } from 'rxjs';\r\nimport { filter, mergeMap, take } from 'rxjs/operators';\r\nimport { Logging } from '../diagnostics/logging';\r\nimport { Rpc } from './rpc';\r\nimport { RpcInboundClient } from './rpc-inbound-client';\r\nimport { RpcObservableRequest, RpcObservableRequestContext } from './rpc-observable-request';\r\nimport { RpcObservableResult, RpcObservableResultContext } from './rpc-observable-result';\r\nimport { RpcOutbound } from './rpc-outbound';\r\nimport { RpcOutboundClient } from './rpc-outbound-client';\r\n\r\n/**\r\n * RPC Observable Client class.\r\n */\r\nexport class RpcObservableClient<\r\n    TRequest extends RpcObservableResultContext,\r\n    TResult extends RpcObservableRequestContext,\r\n    TError extends RpcObservableRequestContext> {\r\n\r\n    private id = 0;\r\n    private requestCommand: string;\r\n    private resultCommand: string;\r\n    private resultBuffer: RpcObservableResult<TResult, TError>[] = [];\r\n    private resultObservers: Observer<RpcObservableResult<TResult, TError>>[] = [];\r\n\r\n    /**\r\n     * Initializes a new instance of the RpcObservableClient class.\r\n     *\r\n     * @param rpc The rpc object.\r\n     * @param command the command string.\r\n     * @param version the version string.\r\n     */\r\n    constructor(protected rpc: Rpc, protected command: string, protected version: string) {\r\n        this.requestCommand = `${command}-Request`;\r\n        this.resultCommand = `${command}-Result`;\r\n        this.rpc.register(this.resultCommand, this.handler.bind(this));\r\n        this.rpc.registerInboundHandler(this.resultCommand, this.handler.bind(this));\r\n        this.rpc.registerOutboundHandler(this.resultCommand, this.handler.bind(this));\r\n    }\r\n\r\n    /**\r\n     * Make observable call.\r\n     *\r\n     * @param request The the request data.\r\n     * @param outbound The outbound RPC channel. (request initiated from Shell.)\r\n     * @return An observable for the response message\r\n     */\r\n    protected observableCall(request: TRequest, outbound?: RpcOutbound): Observable<TResult> {\r\n        const id = this.id++;\r\n        const observableRequest: RpcObservableRequest<TRequest> = { id, request };\r\n        Logging.logDebug(this.requestCommand, `Sending request: \"${this.requestCommand}\"`, observableRequest);\r\n        const replayResult = this.replayResult();\r\n\r\n        return from(this.rpcCall(observableRequest, outbound))\r\n            .pipe(\r\n                mergeMap(() => replayResult),\r\n                filter(observableResult => {\r\n                    if (observableResult.id !== observableRequest.id) {\r\n                        return false;\r\n                    }\r\n\r\n                    // clean up the buffer.\r\n                    const index = this.resultBuffer.findIndex(item => item.id === observableResult.id);\r\n                    if (index >= 0) {\r\n                        this.resultBuffer.splice(index, 1);\r\n                    }\r\n\r\n                    return true;\r\n                }),\r\n                take(1),\r\n                mergeMap((result) => {\r\n                    if (result.error) {\r\n                        result.error.sourceName = result.sourceName;\r\n                        result.error.sourceSubName = result.sourceSubName;\r\n                        result.error.sourceVersion = result.sourceVersion;\r\n                        return throwError(() => result.error);\r\n                    }\r\n\r\n                    result.result = result.result || <TResult>{};\r\n                    result.result.sourceName = result.sourceName;\r\n                    result.result.sourceSubName = result.sourceSubName;\r\n                    result.result.sourceVersion = result.sourceVersion;\r\n                    return of(result.result);\r\n                }));\r\n    }\r\n\r\n    /**\r\n     * Making rpc call to shell or module.\r\n     *\r\n     * @param request the request packet data.\r\n     * @param outbound the outbound object from Shell to module request.\r\n     * @return Promise<void> the promise object.\r\n     */\r\n    private rpcCall(request: RpcObservableRequest<TRequest>, outbound?: RpcOutbound): Promise<void> {\r\n        if (outbound) {\r\n            return RpcOutboundClient.callOutbound(this.rpc.rpcManager.rpcChannel, outbound, this.requestCommand, this.version, request);\r\n        }\r\n\r\n        return RpcInboundClient.call(this.rpc, this.requestCommand,  this.version, request);\r\n    }\r\n\r\n    /**\r\n     * Create a replay observable. Replays result data if results exists.\r\n     */\r\n    private replayResult(): Observable<RpcObservableResult<TResult, TError>> {\r\n        return new Observable(observer => {\r\n            this.resultObservers.push(observer);\r\n\r\n            // replay.\r\n            for (const item of this.resultBuffer) {\r\n                observer.next(item);\r\n            }\r\n\r\n            return () => {\r\n                const index = this.resultObservers.findIndex(item => item === observer);\r\n                if (index >= 0) {\r\n                    this.resultObservers.splice(index, 1);\r\n                }\r\n\r\n                observer.complete();\r\n            };\r\n        });\r\n    }\r\n\r\n    /**\r\n     * Handles rpc response messages.\r\n     *\r\n     * @param result the result of observable request.\r\n     */\r\n    private handler(result: RpcObservableResult<TResult, TError>): Promise<any> {\r\n        // store.\r\n        this.resultBuffer.push(result);\r\n\r\n        // distributed to all active observers.\r\n        for (const observer of this.resultObservers) {\r\n            observer.next(result);\r\n        }\r\n\r\n        return Promise.resolve();\r\n    }\r\n}\r\n"]}