{"version":3,"sources":["../../../packages/core/data/ssh-stream.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,UAAU,EAAwB,MAAM,MAAM,CAAC;AAKxD,OAAO,EAAE,oBAAoB,EAAE,MAAM,mCAAmC,CAAC;AACzE,OAAO,EACH,eAAe,EAEf,+BAA+B,EAC/B,wBAAwB,EACxB,yBAAyB,EACzB,sBAAsB,EAGzB,MAAM,oBAAoB,CAAC;AAE5B;;GAEG;AACH,oBAAY,WAAW;IACnB,YAAY,iBAAiB;IAC7B,oBAAoB,yBAAyB;IAC7C,mBAAmB,wBAAwB;IAC3C,mBAAmB,wBAAwB;IAC3C,0BAA0B,+BAA+B;IACzD,oBAAoB,yBAAyB;CAChD;AAED;;GAEG;AACH,oBAAY,cAAc;IACtB,IAAI,SAAS;IACb,KAAK,UAAU;IACf,IAAI,SAAS;CAChB;AAED;;GAEG;AACH,oBAAY,oBAAoB;IAC5B,KAAK,UAAU;IACf,MAAM,WAAW;IACjB,IAAI,SAAS;CAChB;AAED;;GAEG;AACH,MAAM,WAAW,aAAa,CAAC,CAAC,GAAG,GAAG;IAClC,SAAS,CAAC,EAAE,MAAM,CAAC;IACnB,QAAQ,EAAE,WAAW,CAAC;IACtB,OAAO,CAAC,EAAE,MAAM,CAAC;IACjB,IAAI,CAAC,EAAE,IAAI,CAAC;IACZ,OAAO,CAAC,EAAE;QACN,WAAW,EAAE,cAAc,CAAC;QAC5B,IAAI,EAAE,CAAC,CAAC;KACX,CAAC;CACL;AAED;;;;;GAKG;AACH,MAAM,WAAW,oBAAqB,SAAQ,aAAa;IACvD,UAAU,CAAC,EAAE,MAAM,EAAE,CAAC;IACtB,OAAO,CAAC,EAAE,gBAAgB,CAAC;IAC3B,MAAM,CAAC,EAAE,OAAO,CAAC;CACpB;AAED;;;;GAIG;AACH,MAAM,WAAW,gBAAgB;IAC7B,MAAM,EAAE,oBAAoB,CAAC;IAC7B,QAAQ,EAAE,MAAM,CAAC;IACjB,KAAK,EAAE,MAAM,CAAC;IACd,aAAa,CAAC,EAAE,MAAM,CAAC;CAC1B;AAED;;GAEG;AACH,MAAM,WAAW,gBAAgB;IAC7B;;OAEG;IACH,MAAM,EAAE,yBAAyB,CAAC;IAElC;;OAEG;IACH,YAAY,EAAE,+BAA+B,CAAC;IAE9C;;OAEG;IACH,OAAO,EAAE,oBAAoB,CAAC;CACjC;AAED;;GAEG;AACH,MAAM,WAAW,iBAAiB;IAC9B;;;OAGG;IACH,EAAE,CAAC,EAAE,MAAM,CAAC;IAEZ;;OAEG;IACH,KAAK,EAAE,wBAAwB,CAAC;IAEhC;;;OAGG;IACH,QAAQ,EAAE,MAAM,CAAC;CACpB;AAgCD;;GAEG;AACH,qBAAa,SAAU,YAAW,sBAAsB;IAahD,OAAO,CAAC,eAAe;IACvB,OAAO,CAAC,oBAAoB;IAbhC,OAAO,CAAC,MAAM,CAAC,QAAQ,CAAC,aAAa,CAAe;IACpD,OAAO,CAAC,MAAM,CAAC,QAAQ,CAAC,eAAe,CAAc;IACrD,OAAO,CAAC,UAAU,CAAiD;IACnE,OAAO,CAAC,OAAO,CAA6E;IAE5F;;;;;OAKG;gBAES,eAAe,EAAE,eAAe,EAChC,oBAAoB,EAAE,oBAAoB;IAItD;;;;;;;;OAQG;IACI,iBAAiB,CAAC,CAAC,GAAG,GAAG,EAAE,QAAQ,EAAE,MAAM,EAAE,OAAO,EAAE,oBAAoB,GAAG,UAAU,CAAC,aAAa,CAAC,CAAC,CAAC,CAAC;IAKhH;;;;;;OAMG;IACI,mBAAmB,CAAC,QAAQ,EAAE,MAAM,EAAE,OAAO,EAAE,oBAAoB,GAAG,IAAI;IAajF;;;;;;OAMG;IACI,MAAM,CAAC,QAAQ,EAAE,MAAM,EAAE,SAAS,EAAE,MAAM,EAAE,QAAQ,EAAE,MAAM,EAAE,UAAU,EAAE,MAAM,EAAE,GAAG,IAAI;IAUhG;;OAEG;IACI,KAAK,IAAI,IAAI;IAUpB;;;;OAIG;IACI,OAAO,CAAC,OAAO,EAAE,iBAAiB,GAAG,IAAI;IAiChD,OAAO,CAAC,aAAa;IAQrB,OAAO,CAAC,iBAAiB;IAKzB,OAAO,CAAC,cAAc;IAItB,OAAO,CAAC,YAAY;IAIpB,OAAO,CAAC,aAAa;IAyCrB,OAAO,CAAC,mBAAmB;IAmB3B,OAAO,CAAC,WAAW;IAYnB,OAAO,CAAC,WAAW;CAWtB","file":"ssh-stream.d.ts","sourcesContent":["import { Observable, Observer, throwError } from 'rxjs';\r\nimport { catchError, filter, mergeMap, take } from 'rxjs/operators';\r\nimport { LogLevel } from '../diagnostics/log-level';\r\nimport { Logging } from '../diagnostics/logging';\r\nimport { Strings } from '../generated/strings';\r\nimport { AuthorizationManager } from '../security/authorization-manager';\r\nimport {\r\n    WebsocketStream,\r\n    WebsocketStreamConnectionState,\r\n    WebsocketStreamDataRequestState,\r\n    WebsocketStreamDataState,\r\n    WebsocketStreamDataTarget,\r\n    WebsocketStreamHandler,\r\n    WebsocketStreamName,\r\n    WebsocketStreamProcessor\r\n} from './websocket-stream';\r\n\r\n/**\r\n * The SSH stream provider.\r\n */\r\nexport enum SshProvider {\r\n    BeatProvider = 'beatProvider',\r\n    ShellCommandProvider = 'shellCommandProvider',\r\n    FileContentProvider = 'fileContentProvider',\r\n    ProcessStatProvider = 'processStatProvider',\r\n    PerformanceReadingProvider = 'performanceReadingProvider',\r\n    OverviewInfoProvider = 'overviewInfoProvider',\r\n}\r\n\r\n/**\r\n * The SSH content type.\r\n */\r\nexport enum SshContentType {\r\n    Text = \"text\",\r\n    Error = \"error\",\r\n    Json = \"json\",\r\n}\r\n\r\n/**\r\n * The SSH stream request type.\r\n */\r\nexport enum SshStreamRequestType {\r\n    Start = \"start\",\r\n    Update = \"update\",\r\n    Stop = \"stop\",\r\n}\r\n\r\n/**\r\n * SSH stream data.\r\n */\r\nexport interface SshStreamData<T = any> {\r\n    requestId?: string;\r\n    provider: SshProvider;\r\n    version?: string;\r\n    time?: Date;\r\n    payload?: {\r\n        contentType: SshContentType;\r\n        data: T;\r\n    };\r\n}\r\n\r\n/**\r\n * SSH stream request data.\r\n * Use options for persistent queries to dynamic providers such as @see SshProvider.PerformanceReadingProvider and @see SshProvider.ProcessStatProvider.\r\n * If specifying options, update: true will auto update the stream request with existing parameters.\r\n * For manual update, use @see SshStream.updateStreamRequest.\r\n */\r\nexport interface SshStreamRequestData extends SshStreamData {\r\n    parameters?: string[];\r\n    options?: SshStreamOptions;\r\n    update?: boolean;\r\n}\r\n\r\n/**\r\n * SSH request options. These are only needed for dynamic/persistent queries.\r\n * Specifying update: true  while starting a stream request will auto update the stream request with existing options.\r\n * For manual update, use @see SshStream.updateStreamRequest.\r\n */\r\nexport interface SshStreamOptions {\r\n    action: SshStreamRequestType;\r\n    interval: number;\r\n    count: number;\r\n    historyLength?: number;\r\n}\r\n\r\n/**\r\n * The request packet of the SSH Stream to the gateway.\r\n */\r\nexport interface SshStreamRequest {\r\n    /**\r\n     * The stream target.\r\n     */\r\n    target: WebsocketStreamDataTarget;\r\n\r\n    /**\r\n     * The date request state.\r\n     */\r\n    requestState: WebsocketStreamDataRequestState;\r\n\r\n    /**\r\n     * The request data.\r\n     */\r\n    request: SshStreamRequestData;\r\n}\r\n\r\n/**\r\n * SSH stream response.\r\n */\r\nexport interface SshStreamResponse {\r\n    /**\r\n     * The identification string (same as the requestId of the request/response data)\r\n     * Used to map the response to the request in case the response doesn't contain the requestId.\r\n     */\r\n    id?: string;\r\n\r\n    /**\r\n     * The state of response data.\r\n     */\r\n    state: WebsocketStreamDataState;\r\n\r\n    /**\r\n     * The response data.\r\n     * Stringified JSON representing @see SshStreamData.\r\n     */\r\n    response: string;\r\n}\r\n\r\n/**\r\n * SSH processor interface. Each SSH query creates new observable.\r\n */\r\nclass SshProcessor extends WebsocketStreamProcessor<SshStreamData> {\r\n    private dataCount = 0;\r\n\r\n    /**\r\n     * Initializes a new instance of the SshProcessor class.\r\n     * @param observer Observer to send back result to caller.\r\n     * @param target Stream target object.\r\n     */\r\n    constructor(\r\n        observer: Observer<SshStreamData>,\r\n        target: WebsocketStreamDataTarget,\r\n        public readonly streamRequest: SshStreamRequestData) {\r\n        super(observer, target);\r\n    }\r\n\r\n    public shouldUpdateRequest(): boolean {\r\n        this.dataCount++;\r\n        const streamOptions = this.streamRequest.options;\r\n        // Threshold is 80% of the count. Update the request with existing parameters if update is true.\r\n        if (!MsftSme.isNullOrUndefined(streamOptions) && streamOptions.count > 0 && this.streamRequest.update && this.dataCount >= streamOptions.count * 0.8) {\r\n            this.dataCount = 0;\r\n            return true;\r\n        }\r\n        return false;\r\n    }\r\n}\r\n\r\n/**\r\n * The SSH stream class.\r\n */\r\nexport class SshStream implements WebsocketStreamHandler {\r\n    private static readonly logSourceName = 'SshStream';\r\n    private static readonly sshAgentVersion = '06132023';\r\n    private processors: Map<string /* id */, SshProcessor> = new Map();\r\n    private strings = MsftSme.getStrings<Strings>().MsftSmeShell.Core.WebsocketStream.SshStream;\r\n\r\n    /**\r\n     * Initializes a new instance of the SshStream class.\r\n     *\r\n     * @param websocketStream the websocket stream object.\r\n     * @param authorizationManager the authorization manager object.\r\n     */\r\n    constructor(\r\n        private websocketStream: WebsocketStream,\r\n        private authorizationManager: AuthorizationManager) {\r\n        websocketStream.registerProcessor(WebsocketStreamName.SshStreamName, this);\r\n    }\r\n\r\n    /**\r\n     * Starts/sends a request to the SSH stream.\r\n     *\r\n     * @param nodeName the name of the node to use for this request\r\n     * @param request The @see SshStreamRequestData to send.\r\n     * The @see SshStreamOptions.update will determine whether the stream auto-updates. @see SshStream.updateStreamRequest can be used to manually update the stream.\r\n     * If the stream request is not updated, the stream will end after all data has been emitted.\r\n     * @return Observable<SshStreamData<T>> the query observable.\r\n     */\r\n    public sendStreamRequest<T = any>(nodeName: string, request: SshStreamRequestData): Observable<SshStreamData<T>> {\r\n        const requestState = WebsocketStreamDataRequestState.Normal;\r\n        return this.createRequest(nodeName, requestState, request);\r\n    }\r\n\r\n    /**\r\n     * Updates a previous SSH stream request.\r\n     *\r\n     * @param nodeName the name of the node to use for this request\r\n     * @param request The @see SshStreamRequestData to send.\r\n     * @return Observable<SshStreamData<T>> the query observable.\r\n     */\r\n    public updateStreamRequest(nodeName: string, request: SshStreamRequestData): void {\r\n        const target = this.websocketStream.getTarget(this.authorizationManager, nodeName);\r\n        const requestState = WebsocketStreamDataRequestState.Normal;\r\n        const processor = this.processors.get(request.requestId);\r\n        if (!processor || processor.streamRequest.provider !== request.provider) {\r\n            Logging.log({ level: LogLevel.Warning, message: this.strings.UpdateError.message, source: SshStream.logSourceName });\r\n            return;\r\n        }\r\n        const updateRequestData = this.fullRequest(request);\r\n        const updateRequest = <SshStreamRequest>{ target, requestState, request: updateRequestData };\r\n        this.websocketStream.sendNext(WebsocketStreamName.SshStreamName, updateRequest);\r\n    }\r\n\r\n    /**\r\n     * Cancel active SSH query.\r\n     * Result response comes back to the original query to end.\r\n     *\r\n     * @param nodeName the node name.\r\n     * @param id the id of original request specified as options.queryId.\r\n     */\r\n    public cancel(nodeName: string, requestId: string, provider: string, parameters: string[]): void {\r\n        const target = this.websocketStream.getTarget(this.authorizationManager, nodeName);\r\n        const requestState = WebsocketStreamDataRequestState.Cancel;\r\n        const cancelRequestData = <SshStreamRequestData>{\r\n            requestId, provider, parameters, options: { action: SshStreamRequestType.Stop, interval: 0, count: 0 }\r\n        };\r\n        const cancelRequest = <SshStreamRequest>{ target, requestState, request: this.fullRequest(cancelRequestData) };\r\n        this.websocketStream.sendNext(WebsocketStreamName.SshStreamName, cancelRequest);\r\n    }\r\n\r\n    /**\r\n     * Reset data for connection cleanup.\r\n     */\r\n    public reset(): void {\r\n        Logging.log({ level: LogLevel.Warning, message: this.strings.ResetError.message, source: SshStream.logSourceName });\r\n        const processors: SshProcessor[] = [];\r\n        this.processors.forEach((value, key, map) => processors.push(value));\r\n        this.processors.clear();\r\n        processors.forEach((processor, key, map) => {\r\n            processor.error(new Error(this.strings.ResetError.message));\r\n        });\r\n    }\r\n\r\n    /**\r\n     * Process the socket message.\r\n     *\r\n     * @param message the socket message.\r\n     */\r\n    public process(message: SshStreamResponse): void {\r\n        if (!message) {\r\n            throw new Error(this.strings.NoContentError.message);\r\n        }\r\n\r\n        const innerMessage: SshStreamData = JSON.parse(message.response);\r\n\r\n        const processor = this.processors.get(innerMessage.requestId ?? message.id);\r\n        if (!processor) {\r\n            Logging.log({ level: LogLevel.Debug, message: this.strings.UnexpectedReceivedError.message, source: SshStream.logSourceName });\r\n            return;\r\n        }\r\n\r\n        switch (message.state) {\r\n            case WebsocketStreamDataState.Data:\r\n                this.operationNext(processor, innerMessage);\r\n                break;\r\n\r\n            case WebsocketStreamDataState.Completed:\r\n                this.operationComplete(processor, innerMessage);\r\n                this.operationEnd(innerMessage.requestId);\r\n                break;\r\n\r\n            case WebsocketStreamDataState.Error:\r\n                this.operationError(processor, { xhr: message });\r\n                this.operationEnd(innerMessage.requestId);\r\n                break;\r\n\r\n            case WebsocketStreamDataState.Noop:\r\n                break;\r\n        }\r\n    }\r\n\r\n    private operationNext(processor: SshProcessor, response: SshStreamData): void {\r\n        processor.next(response);\r\n        if (processor.shouldUpdateRequest()) {\r\n            processor.streamRequest.options.action = SshStreamRequestType.Update;\r\n            this.updateStreamRequest(processor.target.nodeName, processor.streamRequest);\r\n        }\r\n    }\r\n\r\n    private operationComplete(processor: SshProcessor, response: SshStreamData): void {\r\n        processor.next(response);\r\n        processor.complete();\r\n    }\r\n\r\n    private operationError(processor: SshProcessor, error: any): void {\r\n        processor.error(error);\r\n    }\r\n\r\n    private operationEnd(id: string): void {\r\n        this.processors.delete(id);\r\n    }\r\n\r\n    private createRequest<T>(\r\n        nodeName: string,\r\n        requestState: WebsocketStreamDataRequestState,\r\n        request: SshStreamRequestData): Observable<T> {\r\n        // publish object is created in two ways.\r\n        // 1) socket is connected so submit the request immediately with simple observable.\r\n        //   (if-block and this is the most of cases.)\r\n        // 2) socket is not connected so wait for the socket to be ready and submit the request with\r\n        //    complex observable.\r\n        //   (else-block and this is a few cases.)\r\n        let publish: Observable<T>;\r\n        if (this.websocketStream.socketStateRaw === WebsocketStreamConnectionState.Connected) {\r\n            publish = this.createRequestSimple(nodeName, requestState, request);\r\n        } else {\r\n            publish = this.websocketStream.socketState\r\n                .pipe(\r\n                    filter(state => state === WebsocketStreamConnectionState.Connected\r\n                        || state === WebsocketStreamConnectionState.Failed\r\n                        || state === WebsocketStreamConnectionState.NotConfigured),\r\n                    take(1),\r\n                    mergeMap(state => {\r\n                        if (state === WebsocketStreamConnectionState.Connected) {\r\n                            return this.createRequestSimple<T>(nodeName, requestState, request);\r\n                        }\r\n\r\n                        return throwError(new Error(this.strings.ConnectionError.message));\r\n                    }));\r\n        }\r\n\r\n        return publish\r\n            .pipe(\r\n                catchError((error, _) => {\r\n                    // retry if reset connection of socket was observed.\r\n                    if (error && error.message === this.strings.ResetError.message) {\r\n                        return <Observable<T>>this.createRequestSimple(nodeName, requestState, request);\r\n                    }\r\n\r\n                    return throwError(error);\r\n                }));\r\n    }\r\n\r\n    private createRequestSimple<T>(\r\n        nodeName: string,\r\n        requestState: WebsocketStreamDataRequestState,\r\n        request: SshStreamRequestData): Observable<T> {\r\n        return new Observable(observer => {\r\n            const target = this.websocketStream.getTarget(this.authorizationManager, nodeName);\r\n            const id = this.sendRequest(observer, target, requestState, request);\r\n            return () => {\r\n                const processor = this.processors.get(id);\r\n                if (processor) {\r\n                    processor.end = true;\r\n                    if (!processor.closed && !processor.closing) {\r\n                        this.cancel(processor.target.nodeName, id, processor.streamRequest.provider, processor.streamRequest.parameters ?? [\"\"]);\r\n                    }\r\n                }\r\n            };\r\n        });\r\n    }\r\n\r\n    private sendRequest(\r\n        observer: Observer<any>,\r\n        target: WebsocketStreamDataTarget,\r\n        requestState: WebsocketStreamDataRequestState,\r\n        request: SshStreamRequestData): string {\r\n        const fullRequest = this.fullRequest(request);\r\n        const processor = new SshProcessor(observer, target, fullRequest);\r\n        this.processors.set(fullRequest.requestId, processor);\r\n        this.websocketStream.sendNext(WebsocketStreamName.SshStreamName, <SshStreamRequest>{ target, requestState, request: fullRequest });\r\n        return fullRequest.requestId;\r\n    }\r\n\r\n    private fullRequest(request: SshStreamRequestData): SshStreamRequestData {\r\n        const requestId = request.requestId ?? MsftSme.getUniqueId();\r\n        const fullRequest: SshStreamRequestData = {\r\n            version: SshStream.sshAgentVersion,\r\n            time: new Date(),\r\n            requestId,\r\n            parameters: [\"\"],\r\n            ...request\r\n        };\r\n        return fullRequest;\r\n    }\r\n}\r\n"]}