{"version":3,"sources":["../../../packages/core/data/websocket-stream.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,QAAQ,EAAE,aAAa,EAAkB,MAAM,MAAM,CAAC;AAM/D,OAAO,EAAE,oBAAoB,EAAE,MAAM,mCAAmC,CAAC;AACzE,OAAO,EAAE,iBAAiB,EAAE,MAAM,sBAAsB,CAAC;AAIzD;;GAEG;AACH,oBAAY,8BAA8B;IACtC;;OAEG;IACH,YAAY,IAAI;IAEhB;;OAEG;IACH,SAAS,IAAA;IAET;;OAEG;IACH,YAAY,IAAA;IAEZ;;OAEG;IACH,MAAM,IAAA;IAEN;;OAEG;IACH,aAAa,IAAA;CAChB;AAED;;GAEG;AACH,oBAAY,oBAAoB;IAC5B;;OAEG;IACH,IAAI,IAAI;IAER;;OAEG;IACH,IAAI,IAAA;IAEJ;;OAEG;IACH,KAAK,IAAA;CACR;AAED;;GAEG;AACH,oBAAY,+BAA+B;IACvC;;OAEG;IACH,IAAI,IAAI;IAER;;OAEG;IACH,MAAM,IAAA;IAEN;;OAEG;IACH,MAAM,IAAA;CACT;AAED;;GAEG;AACH,oBAAY,wBAAwB;IAChC;;OAEG;IACH,IAAI,IAAI;IAER;;OAEG;IACH,SAAS,IAAA;IAET;;OAEG;IACH,IAAI,IAAA;IAEJ;;OAEG;IACH,KAAK,IAAA;IAEL;;OAEG;IACH,SAAS,IAAA;CACZ;AAED;;GAEG;AACH,MAAM,WAAW,yBAAyB;IACtC;;OAEG;IACH,QAAQ,EAAE,MAAM,CAAC;IAEjB;;OAEG;IACH,OAAO,EAAE,OAAO,CAAC,SAAS,CAAC,MAAM,CAAC,CAAC;CACtC;AAED,oBAAY,mBAAmB;IAC3B,aAAa,YAAY;IACzB,oBAAoB,mBAAmB;IACvC,aAAa,YAAY;IACzB,gBAAgB,WAAW;CAC9B;AAED;;GAEG;AACH,MAAM,WAAW,qBAAqB;IAClC;;;OAGG;IACH,UAAU,EAAE,mBAAmB,CAAC;IAEhC;;OAEG;IACH,KAAK,EAAE,oBAAoB,CAAC;IAE5B;;OAEG;IACH,IAAI,EAAE,GAAG,CAAC;IAEV;;OAEG;IACH,OAAO,CAAC,EAAE,GAAG,CAAC;CACjB;AAED;;GAEG;AACH,MAAM,WAAW,sBAAsB;IACnC;;OAEG;IACH,KAAK,IAAI,IAAI,CAAC;IAEd;;OAEG;IACH,OAAO,CAAC,OAAO,EAAE,GAAG,GAAG,IAAI,CAAC;CAC/B;AAED;;GAEG;AACH,qBAAa,wBAAwB,CAAC,KAAK,EAAE,QAAQ,GAAG,IAAI;IAgCrC,QAAQ,EAAE,QAAQ,CAAC,KAAK,CAAC;IAAS,MAAM,EAAE,yBAAyB;IAAS,OAAO,CAAC,EAAE,QAAQ;IA/BjH;;OAEG;IACI,QAAQ,CAAC,EAAE,GAAG,CAAC;IAEtB;;OAEG;IACI,OAAO,CAAC,EAAE,OAAO,CAAC;IAEzB;;OAEG;IACI,MAAM,CAAC,EAAE,OAAO,CAAC;IAExB;;OAEG;IACI,GAAG,CAAC,EAAE,OAAO,CAAC;IAErB;;OAEG;IACI,QAAQ,CAAC,EAAE,OAAO,CAAC;IAE1B;;;;;OAKG;gBACgB,QAAQ,EAAE,QAAQ,CAAC,KAAK,CAAC,EAAS,MAAM,EAAE,yBAAyB,EAAS,OAAO,CAAC,EAAE,QAAQ;IAGjH;;;OAGG;IACI,IAAI,CAAC,MAAM,EAAE,KAAK,GAAG,IAAI;IAKhC;;OAEG;IACI,QAAQ,IAAI,IAAI;IAOvB;;OAEG;IACI,KAAK,CAAC,KAAK,EAAE,GAAG,GAAG,IAAI;CAKjC;AAED;;GAEG;AACH,qBAAa,eAAe;IAgBL,OAAO,EAAE,iBAAiB;IAf7C,OAAO,CAAC,MAAM,CAAC,QAAQ,CAAC,aAAa,CAAqB;IAC1D,OAAO,CAAC,MAAM,CAAC,oBAAoB,CAAM;IACzC,OAAO,CAAC,MAAM,CAAC,iBAAiB,CAAO;IAChC,cAAc,EAAE,8BAA8B,CAA+C;IAC7F,WAAW,gDAAuD;IACzE,OAAO,CAAC,MAAM,CAAmD;IACjE,OAAO,CAAC,iBAAiB,CAAwC;IACjE,OAAO,CAAC,QAAQ,CAA6C;IAC7D,OAAO,CAAC,OAAO,CAAmE;IAElF;;;;OAIG;gBACgB,OAAO,EAAE,iBAAiB;IAiB7C;;;;OAIG;IACI,iBAAiB,CAAC,IAAI,EAAE,mBAAmB,EAAE,OAAO,EAAE,sBAAsB,GAAG,IAAI;IAI1F;;;;;;OAMG;IACI,QAAQ,CAAC,UAAU,EAAE,mBAAmB,EAAE,IAAI,EAAE,GAAG,EAAE,OAAO,CAAC,EAAE,GAAG,GAAG,IAAI;IAUhF;;;;;;OAMG;IACI,SAAS,CAAC,UAAU,EAAE,mBAAmB,EAAE,KAAK,EAAE,MAAM,EAAE,OAAO,CAAC,EAAE,GAAG,GAAG,IAAI;IAUrF;;;;;;OAMG;IACI,SAAS,CAAC,oBAAoB,EAAE,oBAAoB,EAAE,QAAQ,EAAE,MAAM,EAAE,QAAQ,CAAC,EAAE,MAAM,GAAG,yBAAyB;IAU5H,OAAO,CAAC,UAAU;IAiElB,OAAO,CAAC,OAAO;IAOf,OAAO,CAAC,SAAS;IAUjB,OAAO,CAAC,QAAQ;CAMnB","file":"websocket-stream.d.ts","sourcesContent":["import { Observer, ReplaySubject, mergeMap, take } from 'rxjs';\r\nimport { webSocket, WebSocketSubject } from 'rxjs/webSocket';\r\nimport { LogLevel } from '../diagnostics/log-level';\r\nimport { Logging } from '../diagnostics/logging';\r\nimport { Strings } from '../generated/strings';\r\nimport { EnvironmentModule } from '../manifest/environment-modules';\r\nimport { AuthorizationManager } from '../security/authorization-manager';\r\nimport { GatewayConnection } from './gateway-connection';\r\nimport { headerConstants } from './http-constants';\r\nimport { Net } from './net';\r\n\r\n/**\r\n * The state of Websocket connection.\r\n */\r\nexport enum WebsocketStreamConnectionState {\r\n    /**\r\n     * Initializing.\r\n     */\r\n    Initializing = 1,\r\n\r\n    /**\r\n     * Connected.\r\n     */\r\n    Connected,\r\n\r\n    /**\r\n     * Disconnected.\r\n     */\r\n    Disconnected,\r\n\r\n    /**\r\n     * Failed.\r\n     */\r\n    Failed,\r\n\r\n    /**\r\n     * Not configured.\r\n     */\r\n    NotConfigured\r\n}\r\n\r\n/**\r\n * The state of Websocket stream packet.\r\n */\r\nexport enum WebsocketStreamState {\r\n    /**\r\n     * Empty packet.\r\n     */\r\n    Noop = 1,\r\n\r\n    /**\r\n     * Data packet.\r\n     */\r\n    Data,\r\n\r\n    /**\r\n     * Error packet. (reserved for socket level error communication if any)\r\n     */\r\n    Error\r\n}\r\n\r\n/**\r\n * The request state of data such as CIM and PowerShell stream.\r\n */\r\nexport enum WebsocketStreamDataRequestState {\r\n    /**\r\n     * empty packet.\r\n     */\r\n    Noop = 1,\r\n\r\n    /**\r\n     * Data packet.\r\n     */\r\n    Normal,\r\n\r\n    /**\r\n     * Cancel\r\n     */\r\n    Cancel\r\n}\r\n\r\n/**\r\n * The response state of data such as CIM and PowerShell stream.\r\n */\r\nexport enum WebsocketStreamDataState {\r\n    /**\r\n     * empty packet.\r\n     */\r\n    Noop = 1,\r\n\r\n    /**\r\n     * Completed packet.\r\n     */\r\n    Completed,\r\n\r\n    /**\r\n     * Data packet.\r\n     */\r\n    Data,\r\n\r\n    /**\r\n     * Error\r\n     */\r\n    Error,\r\n\r\n    /**\r\n     * Cancelled\r\n     */\r\n    Cancelled\r\n}\r\n\r\n/**\r\n * Websocket Stream data target object.\r\n */\r\nexport interface WebsocketStreamDataTarget {\r\n    /**\r\n     * The node name.\r\n     */\r\n    nodeName: string;\r\n\r\n    /**\r\n     * The headers equivalent to REST API.\r\n     */\r\n    headers: MsftSme.StringMap<string>;\r\n}\r\n\r\nexport enum WebsocketStreamName {\r\n    CimStreamName = 'SME-CIM',\r\n    PowerShellStreamName = 'SME-PowerShell',\r\n    SshStreamName = 'SME-SSH',\r\n    SystemStreamName = 'System'\r\n};\r\n\r\n/**\r\n * The request/response packet of Websocket Stream to the gateway.\r\n */\r\nexport interface WebsocketStreamPacket {\r\n    /**\r\n     * The identification string of protocol.\r\n     *\r\n     */\r\n    streamName: WebsocketStreamName;\r\n\r\n    /**\r\n     * The state of packet.\r\n     */\r\n    state: WebsocketStreamState;\r\n\r\n    /**\r\n     * The request/response/error data.\r\n     */\r\n    data: any;\r\n\r\n    /**\r\n     * The options (reserved)\r\n     */\r\n    options?: any;\r\n}\r\n\r\n/**\r\n * Websocket stream handler.\r\n */\r\nexport interface WebsocketStreamHandler {\r\n    /**\r\n     * Call reset() when connection was lost to reset context.\r\n     */\r\n    reset(): void;\r\n\r\n    /**\r\n     * Call process() when message was received for the data type.\r\n     */\r\n    process(message: any): void;\r\n}\r\n\r\n/**\r\n * Websocket Stream Processor class.\r\n */\r\nexport class WebsocketStreamProcessor<TData, TOptions = void> {\r\n    /**\r\n     * Holding result if waitCompleted option is specified for multiple instances.\r\n     */\r\n    public response?: any;\r\n\r\n    /**\r\n     * Track closing state.\r\n     */\r\n    public closing?: boolean;\r\n\r\n    /**\r\n     * Track closed state.\r\n     */\r\n    public closed?: boolean;\r\n\r\n    /**\r\n     * Track observer end call by unsubscribe or observer completion.\r\n     */\r\n    public end?: boolean;\r\n\r\n    /**\r\n     * Sent once.\r\n     */\r\n    public sendOnce?: boolean;\r\n\r\n    /**\r\n     * Initializes a new instance of the CimProcessor class.\r\n     * @param observer Observer to send back result to caller.\r\n     * @param target Stream Target object.\r\n     * @param options Options for Cim stream query.\r\n     */\r\n    constructor(public observer: Observer<TData>, public target: WebsocketStreamDataTarget, public options?: TOptions) {\r\n    }\r\n\r\n    /**\r\n     * Push the result to the observer.\r\n     * @param result the result of TData.\r\n     */\r\n    public next(result: TData): void {\r\n        this.observer?.next(result);\r\n        this.sendOnce = true;\r\n    }\r\n\r\n    /**\r\n     * Complete the observer.\r\n     */\r\n    public complete(): void {\r\n        this.closing = true;\r\n        this.observer?.complete();\r\n\r\n        this.closed = true;\r\n    }\r\n\r\n    /**\r\n     * Error the observer.\r\n     */\r\n    public error(error: any): void {\r\n        this.closing = true;\r\n        this.observer?.error(error);\r\n        this.closed = true;\r\n    }\r\n}\r\n\r\n/**\r\n * The Websocket stream class.\r\n */\r\nexport class WebsocketStream {\r\n    private static readonly logSourceName = 'WebsocketStream';\r\n    private static maxConnectionRetries = 10;\r\n    private static reconnectWaitTime = 500;\r\n    public socketStateRaw: WebsocketStreamConnectionState = WebsocketStreamConnectionState.Disconnected;\r\n    public socketState = new ReplaySubject<WebsocketStreamConnectionState>();\r\n    private socket: WebSocketSubject<WebsocketStreamPacket | string>;\r\n    private connectionRetries = WebsocketStream.maxConnectionRetries;\r\n    private handlers = new Map<string, WebsocketStreamHandler>();\r\n    private strings = MsftSme.getStrings<Strings>().MsftSmeShell.Core.WebsocketStream;\r\n\r\n    /**\r\n     * Initializes a new instance of the WebsocketStream class.\r\n     *\r\n     * @param gateway the gateway connection object.\r\n     */\r\n    constructor(public gateway: GatewayConnection) {\r\n        // initialize only after gateway data was populated via RPC.\r\n        this.gateway.initialize().pipe(\r\n            mergeMap(() => this.gateway.navigationReadyObservable),\r\n            take(1)\r\n        ).subscribe(() => {\r\n            // enable websocket stream only when the module added the options at initialization.\r\n            const global = MsftSme.self();\r\n            if (!global.Init.websocket && !global.Init.sshWebsocket) {\r\n                this.socketState.next(WebsocketStreamConnectionState.NotConfigured);\r\n                this.socketStateRaw = WebsocketStreamConnectionState.NotConfigured;\r\n                return;\r\n            }\r\n            this.initialize(true);\r\n        });\r\n    }\r\n\r\n    /**\r\n     * Register the processor for the stream name.\r\n     * @param name the name of stream.\r\n     * @param handler the handler to process packet.\r\n     */\r\n    public registerProcessor(name: WebsocketStreamName, handler: WebsocketStreamHandler): void {\r\n        this.handlers.set(name, handler);\r\n    }\r\n\r\n    /**\r\n     * Send next stream data to websocket.\r\n     *\r\n     * @param streamName the stream name.\r\n     * @param data the data to send.\r\n     * @param options the options.\r\n     */\r\n    public sendNext(streamName: WebsocketStreamName, data: any, options?: any): void {\r\n        if (!this.socket) {\r\n            throw new Error('WebsocketStream: socket is not ready.');\r\n        }\r\n\r\n        const packet: WebsocketStreamPacket = { streamName, state: WebsocketStreamState.Data, data, options };\r\n        this.debugLog('Socket sending data.', packet);\r\n        this.socket.next(JSON.stringify(packet));\r\n    }\r\n\r\n    /**\r\n     * Send error stream data to websocket.\r\n     *\r\n     * @param streamName the stream name.\r\n     * @param error the error to send.\r\n     * @param options the options.\r\n     */\r\n    public sendError(streamName: WebsocketStreamName, error: string, options?: any): void {\r\n        if (!this.socket) {\r\n            throw new Error('WebsocketStream: socket is not ready.');\r\n        }\r\n\r\n        const packet: WebsocketStreamPacket = { streamName, state: WebsocketStreamState.Error, data: error, options };\r\n        this.debugLog('Socket sending error.', packet);\r\n        this.socket.next(JSON.stringify(packet));\r\n    }\r\n\r\n    /**\r\n     * Get target data.\r\n     * @param authorizationManager the authorization manager.\r\n     * @param nodeName the node Name\r\n     * @param endpoint the endpoint data.\r\n     * @return  WebsocketStreamDataTarget target data.\r\n     */\r\n    public getTarget(authorizationManager: AuthorizationManager, nodeName: string, endpoint?: string): WebsocketStreamDataTarget {\r\n        const headers = authorizationManager.createTokenHeaders(nodeName);\r\n        if (endpoint) {\r\n            headers[headerConstants.POWERSHELL_ENDPOINT] = endpoint;\r\n        }\r\n\r\n        const target = <WebsocketStreamDataTarget>{ nodeName, headers };\r\n        return target;\r\n    }\r\n\r\n    private initialize(firstTime: boolean): void {\r\n        // get gateway socket url.\r\n        const gatewaySocketUrl = this.gateway.gatewayUrl.replace('http', 'ws');\r\n        const moduleName = MsftSme.self().Init.moduleName;\r\n        let url = Net.streamSocket.format(gatewaySocketUrl, moduleName);\r\n        if (EnvironmentModule.isGatewayV200) {\r\n            url = Net.streamSocketV200.format(gatewaySocketUrl, moduleName);\r\n        }\r\n        const isSsh = MsftSme.self().Init.sshWebsocket;\r\n        if (isSsh) {\r\n            url = Net.sshStreamSocket.format(gatewaySocketUrl);\r\n        }\r\n        this.debugLog('Socket initializing...: {0}'.format(url));\r\n        if (!firstTime) {\r\n            this.handlers.forEach((value) => value.reset());\r\n        }\r\n\r\n        // create stream socket.\r\n        this.socketState.next(WebsocketStreamConnectionState.Initializing);\r\n        this.socket = webSocket<WebsocketStreamPacket | string>({\r\n            url: url,\r\n            openObserver: {\r\n                next: () => {\r\n                    this.debugLog('Socket opened: {0}'.format(url));\r\n                    this.socketState.next(WebsocketStreamConnectionState.Connected);\r\n                    this.socketStateRaw = WebsocketStreamConnectionState.Connected;\r\n                    this.connectionRetries = WebsocketStream.maxConnectionRetries;\r\n                }\r\n            },\r\n            closeObserver: {\r\n                next: () => {\r\n                    this.debugLog('Socket closed: {0}'.format(url));\r\n                    this.socketState.next(WebsocketStreamConnectionState.Disconnected);\r\n                    this.socketStateRaw = WebsocketStreamConnectionState.Disconnected;\r\n                    this.reconnect(new Error(this.strings.Common.ConnectionRetiesError.message));\r\n                }\r\n            },\r\n            // for compatibility with older version of rxjs websocket\r\n            serializer: (value: string) => value\r\n        });\r\n        this.socket.subscribe({\r\n            next: received => {\r\n                const message = <WebsocketStreamPacket>received;\r\n                this.debugLog('Socket received data.', message);\r\n                if (message.state === WebsocketStreamState.Data) {\r\n                    const handler = this.handlers.get(message.streamName);\r\n                    if (handler) {\r\n                        handler.process(message.data);\r\n                    } else {\r\n                        throw new Error(this.strings.Common.HandlerRegistrationError.message.format(message.streamName));\r\n                    }\r\n                } else if (message.state === WebsocketStreamState.Error) {\r\n                    let errorMessage = this.strings.Common.CommunicationError.message;\r\n                    if (message.data && message.data.error && message.data.error.message) {\r\n                        errorMessage = this.strings.Common.CommunicationErrorDetail.message.format(message.data.error.message);\r\n                    }\r\n\r\n                    Logging.log({ level: LogLevel.Error, source: WebsocketStream.logSourceName, message: errorMessage });\r\n                    this.reconnect(new Error(errorMessage));\r\n                }\r\n            },\r\n            error: error => this.reconnect(error)\r\n        });\r\n    }\r\n\r\n    private dispose(): void {\r\n        if (this.socket) {\r\n            this.socket.unsubscribe();\r\n            this.socket = null;\r\n        }\r\n    }\r\n\r\n    private reconnect(error: any): void {\r\n        if (this.connectionRetries-- > 0) {\r\n            this.dispose();\r\n            setTimeout(() => this.initialize(false), WebsocketStream.reconnectWaitTime);\r\n        } else {\r\n            this.socketState.next(WebsocketStreamConnectionState.Failed);\r\n            throw error;\r\n        }\r\n    }\r\n\r\n    private debugLog(message: string, object?: any) {\r\n        Logging.log({ level: LogLevel.Debug, source: WebsocketStream.logSourceName, message: message });\r\n        if (object) {\r\n            Logging.debug(object);\r\n        }\r\n    }\r\n}\r\n"]}