{"version":3,"sources":["../../../packages/core/data/powershell-stream.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,UAAU,EAAY,aAAa,EAAc,MAAM,MAAM,CAAC;AAKvE,OAAO,EAAE,oBAAoB,EAAE,MAAM,mCAAmC,CAAC;AACzE,OAAO,EAAE,kBAAkB,EAAE,MAAM,mBAAmB,CAAC;AACvD,OAAO,EAAc,iBAAiB,EAAE,MAAM,cAAc,CAAC;AAC7D,OAAO,EACH,eAAe,EACf,8BAA8B,EAC9B,+BAA+B,EAC/B,wBAAwB,EACxB,yBAAyB,EACzB,sBAAsB,EAGzB,MAAM,oBAAoB,CAAC;AAE5B;;GAEG;AACH,MAAM,WAAW,gBAAgB;IAC7B;;OAEG;IACH,SAAS,EAAE,MAAM,CAAC;IAElB;;OAEG;IACH,SAAS,EAAE,MAAM,GAAG,OAAO,CAAC;IAE5B;;OAEG;IACH,QAAQ,CAAC,EAAE,GAAG,EAAE,CAAC;IAEjB;;OAEG;IACH,OAAO,CAAC,EAAE,GAAG,EAAE,CAAC;IAEhB;;OAEG;IACH,MAAM,CAAC,EAAE,GAAG,EAAE,CAAC;IAEf;;OAEG;IACH,OAAO,CAAC,EAAE,GAAG,EAAE,CAAC;CACnB;AAED;;GAEG;AACH,MAAM,WAAW,uBAAwB,SAAQ,kBAAkB;IAC/D;;;OAGG;IACH,OAAO,CAAC,EAAE,OAAO,CAAC;IAElB;;OAEG;IACH,KAAK,CAAC,EAAE,OAAO,CAAC;IAEhB;;OAEG;IACH,OAAO,CAAC,EAAE,MAAM,CAAC;IAEjB;;OAEG;IACH,UAAU,CAAC,EAAE,MAAM,CAAC;IAEpB;;OAEG;IACH,WAAW,CAAC,EAAE,MAAM,CAAC;CACxB;AAED;;GAEG;AACH,MAAM,WAAW,uBAAuB;IACpC;;OAEG;IACH,EAAE,EAAE,MAAM,CAAC;IAEX;;OAEG;IACH,MAAM,EAAE,yBAAyB,CAAC;IAElC;;OAEG;IACH,YAAY,EAAE,+BAA+B,CAAC;IAE9C;;OAEG;IACH,MAAM,EAAE,MAAM,CAAC;IAEf;;OAEG;IACH,OAAO,EAAE,MAAM,CAAC;IAEhB;;OAEG;IACH,UAAU,EAAE,GAAG,CAAC;IAEhB;;OAEG;IACH,MAAM,EAAE,MAAM,CAAC;IAEf;;OAEG;IACH,OAAO,EAAE,uBAAuB,CAAC;CACpC;AAED;;GAEG;AACH,MAAM,WAAW,wBAAwB;IACrC;;OAEG;IACH,EAAE,EAAE,MAAM,CAAC;IAEX;;OAEG;IACH,KAAK,EAAE,wBAAwB,CAAC;IAEhC;;OAEG;IACH,KAAK,EAAE,MAAM,CAAC;IAEd;;OAEG;IACH,QAAQ,EAAE,gBAAgB,CAAC;CAC9B;AAED,MAAM,WAAW,iCAAiC,CAAC,CAAC;IAChD,QAAQ,EAAE,MAAM,CAAC;IACjB,OAAO,EAAE,iBAAiB,CAAC;IAC3B,OAAO,CAAC,EAAE,uBAAuB,CAAC;IAClC,IAAI,EAAE,CAAC,CAAC;CACX;AAED;;GAEG;AACH,MAAM,WAAW,0BAA0B,CAAC,CAAC;IACzC,IAAI,EAAE,MAAM,CAAC;IACb,UAAU,EAAE,CAAC,QAAQ,EAAE,MAAM,EAAE,OAAO,EAAE,iBAAiB,EAAE,OAAO,CAAC,EAAE,uBAAuB,KACxF,UAAU,CAAC,iCAAiC,CAAC,GAAG,CAAC,CAAC,CAAC;IACvD,cAAc,EAAE,CAAC,MAAM,EAAE,CAAC,EAAE,OAAO,EAAE,iCAAiC,CAAC,GAAG,CAAC,KAAK,UAAU,CAAC,CAAC,CAAC,CAAC;IAC9F,YAAY,EAAE,CAAC,KAAK,EAAE,GAAG,EAAE,OAAO,EAAE,iCAAiC,CAAC,GAAG,CAAC,KAAK,UAAU,CAAC,CAAC,CAAC,CAAC;CAChG;AA2BD;;GAEG;AACH,qBAAa,gBAAiB,YAAW,sBAAsB;IA+C/C,OAAO,CAAC,eAAe;IAAmB,OAAO,CAAC,oBAAoB;IA9ClF,OAAO,CAAC,MAAM,CAAC,QAAQ,CAAC,aAAa,CAAsB;IAC3D;;OAEG;IACH,OAAO,CAAC,MAAM,CAAC,WAAW,CAAyC;IACnE,OAAO,CAAC,MAAM,CAAC,aAAa,CAAK;IACjC,OAAO,CAAC,UAAU,CAAwD;IAC1E,OAAO,CAAC,MAAM,CAAgE;IAC9E,OAAO,CAAC,OAAO,CAAoF;IAEnG;;;;OAIG;WACW,kBAAkB,CAAC,UAAU,EAAE,0BAA0B,CAAC,GAAG,CAAC,GAAG,IAAI;IASnF;;;;;OAKG;WACW,kBAAkB,CAAC,IAAI,EAAE,MAAM,GAAG,OAAO;IAUvD;;;;;OAKG;gBACiB,eAAe,EAAE,eAAe,EAAU,oBAAoB,EAAE,oBAAoB;IAIxG;;;;;;;OAOG;IACI,GAAG,CACF,QAAQ,EAAE,MAAM,EAChB,eAAe,EAAE,MAAM,GAAG,iBAAiB,EAC3C,OAAO,CAAC,EAAE,uBAAuB,GAAG,UAAU,CAAC,gBAAgB,CAAC;IAKxE;;;;;;OAMG;IACI,MAAM,CAAC,QAAQ,EAAE,MAAM,EAAE,EAAE,EAAE,MAAM,EAAE,OAAO,CAAC,EAAE,uBAAuB,GAAG,IAAI;IAsBpF;;OAEG;IACI,KAAK,IAAI,IAAI;IAUpB;;;;OAIG;IACI,OAAO,CAAC,OAAO,EAAE,wBAAwB,GAAG,IAAI;IA+BvD;;;;OAIG;IACI,cAAc,CAAC,QAAQ,EAAE,MAAM;IAItC;;OAEG;IACI,iCAAiC,IAAI,8BAA8B;IAI1E;;OAEG;IACH,IAAW,cAAc,IAAI,aAAa,CAAC,8BAA8B,CAAC,CAEzE;IAED,OAAO,CAAC,aAAa;IAuCrB,OAAO,CAAC,iBAAiB;IAQzB,OAAO,CAAC,cAAc;IAItB,OAAO,CAAC,YAAY;IAepB,OAAO,CAAC,aAAa;IA+DrB,OAAO,CAAC,mBAAmB;IAoB3B,OAAO,CAAC,WAAW;IAyCnB,OAAO,CAAC,SAAS;IASjB,OAAO,CAAC,oBAAoB;IAc5B,OAAO,CAAC,OAAO;CAiBlB","file":"powershell-stream.d.ts","sourcesContent":["import { Observable, Observer, ReplaySubject, throwError } from 'rxjs';\r\nimport { catchError, filter, mergeMap, switchMap, take } from 'rxjs/operators';\r\nimport { LogLevel } from '../diagnostics/log-level';\r\nimport { Logging } from '../diagnostics/logging';\r\nimport { Strings } from '../generated/strings';\r\nimport { AuthorizationManager } from '../security/authorization-manager';\r\nimport { NodeRequestOptions } from './node-connection';\r\nimport { PowerShell, PowerShellCommand } from './powershell';\r\nimport {\r\n    WebsocketStream,\r\n    WebsocketStreamConnectionState,\r\n    WebsocketStreamDataRequestState,\r\n    WebsocketStreamDataState,\r\n    WebsocketStreamDataTarget,\r\n    WebsocketStreamHandler,\r\n    WebsocketStreamName,\r\n    WebsocketStreamProcessor\r\n} from './websocket-stream';\r\n\r\n/**\r\n * PowerShell result object including an error.\r\n */\r\nexport interface PowerShellResult {\r\n    /**\r\n     * The runspace pool instance ID.\r\n     */\r\n    sessionId: string;\r\n\r\n    /**\r\n     * Completed state.\r\n     */\r\n    completed: 'True' | 'False';\r\n\r\n    /**\r\n     * Progress data.\r\n     */\r\n    progress?: any[];\r\n\r\n    /**\r\n     * Results data.\r\n     */\r\n    results?: any[];\r\n\r\n    /**\r\n     * Errors data.\r\n     */\r\n    errors?: any[];\r\n\r\n    /**\r\n     * Warning data.\r\n     */\r\n    warning?: any[];\r\n}\r\n\r\n/**\r\n * PowerShell Stream options.\r\n */\r\nexport interface PowerShellStreamOptions extends NodeRequestOptions {\r\n    /**\r\n     * Partial data response.\r\n     * (default is waiting for completion.)\r\n     */\r\n    partial?: boolean;\r\n\r\n    /**\r\n     * Close the session each time, so it gets priority.\r\n     */\r\n    close?: boolean;\r\n\r\n    /**\r\n     * Override Query ID so it can request cancel.\r\n     */\r\n    queryId?: string;\r\n\r\n    /**\r\n     * Buffering time period by milliseconds.\r\n     */\r\n    bufferTime?: number;\r\n\r\n    /**\r\n     * Buffering count.\r\n     */\r\n    bufferCount?: number;\r\n}\r\n\r\n/**\r\n * The request packet of Cim Stream to the gateway.\r\n */\r\nexport interface PowerShellStreamRequest {\r\n    /**\r\n     * The identification string (auto generated or supplied as queryId option.)\r\n     */\r\n    id: string;\r\n\r\n    /**\r\n     * The stream target\r\n     */\r\n    target: WebsocketStreamDataTarget;\r\n\r\n    /**\r\n     * The date request state.\r\n     */\r\n    requestState: WebsocketStreamDataRequestState;\r\n\r\n    /**\r\n     * The name of PowerShell module.\r\n     */\r\n    module: string;\r\n\r\n    /**\r\n     * The command.\r\n     */\r\n    command: string;\r\n\r\n    /**\r\n     * The parameters.\r\n     */\r\n    parameters: any;\r\n\r\n    /**\r\n     * The script.\r\n     */\r\n    script: string;\r\n\r\n    /**\r\n     * The Cim stream options.\r\n     */\r\n    options: PowerShellStreamOptions;\r\n}\r\n\r\n/**\r\n * Cim stream response.\r\n */\r\nexport interface PowerShellStreamResponse {\r\n    /**\r\n     * The identification string (auto generated.)\r\n     */\r\n    id: string;\r\n\r\n    /**\r\n     * Web socket data stream state.\r\n     */\r\n    state: WebsocketStreamDataState;\r\n\r\n    /**\r\n     * Index number of response for the original request.\r\n     */\r\n    index: number;\r\n\r\n    /**\r\n     * The PowerShell result.\r\n     */\r\n    response: PowerShellResult;\r\n}\r\n\r\nexport interface PowerShellStreamMonitorSetContext<T> {\r\n    nodeName: string;\r\n    command: PowerShellCommand;\r\n    options?: PowerShellStreamOptions;\r\n    data: T;\r\n}\r\n\r\n/**\r\n * Set of monitors for pre and post process of powershell stream.\r\n */\r\nexport interface PowerShellStreamMonitorSet<T> {\r\n    name: string;\r\n    preMonitor: (nodeName: string, command: PowerShellCommand, options?: PowerShellStreamOptions) =>\r\n        Observable<PowerShellStreamMonitorSetContext<any>>;\r\n    successMonitor: (result: T, context: PowerShellStreamMonitorSetContext<any>) => Observable<T>;\r\n    errorMonitor: (error: any, context: PowerShellStreamMonitorSetContext<any>) => Observable<T>;\r\n}\r\n\r\n/**\r\n * PowerShell Processor interface.\r\n */\r\nclass PowerShellProcessor extends WebsocketStreamProcessor<PowerShellResult, PowerShellStreamOptions>  {\r\n    /**\r\n     * * Holding result if waitCompleted option is specified for multiple instances.\r\n     */\r\n    declare public response?: PowerShellResult;\r\n}\r\n\r\n/**\r\n * PowerShell stream queue.\r\n */\r\ninterface PowerShellStreamQueue {\r\n    /**\r\n     * Number of active request.\r\n     */\r\n    outstandingCount: number;\r\n\r\n    /**\r\n     * Holding requests.\r\n     */\r\n    pendingRequests: PowerShellStreamRequest[];\r\n}\r\n\r\n/**\r\n * The PowerShell stream class.\r\n */\r\nexport class PowerShellStream implements WebsocketStreamHandler {\r\n    private static readonly logSourceName = 'PowerShellStream';\r\n    /**\r\n     * The collection of set of monitors.\r\n     */\r\n    private static monitorSets: PowerShellStreamMonitorSet<any>[] = [];\r\n    private static maxRunPerNode = 5;\r\n    private processors: Map<string /* id */, PowerShellProcessor> = new Map();\r\n    private queues: Map<string /* nodeName */, PowerShellStreamQueue> = new Map();\r\n    private strings = MsftSme.getStrings<Strings>().MsftSmeShell.Core.WebsocketStream.PowerShellStream;\r\n\r\n    /**\r\n     * Register the set of monitors.\r\n     *\r\n     * @param monitorSet The set of monitors.\r\n     */\r\n    public static registerMonitorSet(monitorSet: PowerShellStreamMonitorSet<any>): void {\r\n        const found = PowerShellStream.monitorSets.find(monitors => monitors.name === monitorSet.name);\r\n        if (found) {\r\n            return;\r\n        }\r\n\r\n        PowerShellStream.monitorSets.push(monitorSet);\r\n    }\r\n\r\n    /**\r\n     * Unregister the set of monitors.\r\n     *\r\n     * @param name The name of set of monitors.\r\n     * @returns boolean true if unregistered the named set.\r\n     */\r\n    public static unregisterMonitors(name: string): boolean {\r\n        const found = PowerShellStream.monitorSets.find(monitors => monitors.name === name);\r\n        if (found) {\r\n            PowerShellStream.monitorSets.remove(found);\r\n            return true;\r\n        }\r\n\r\n        return false;\r\n    }\r\n\r\n    /**\r\n     * Initializes a new instance of the PowerShellStream class.\r\n     *\r\n     * @param websocketStream the websocket stream object.\r\n     * @param authorizationManager the authorization manager object.\r\n     */\r\n    constructor(private websocketStream: WebsocketStream, private authorizationManager: AuthorizationManager) {\r\n        websocketStream.registerProcessor(WebsocketStreamName.PowerShellStreamName, this);\r\n    }\r\n\r\n    /**\r\n     * PowerShell script run.\r\n     *\r\n     * @param nodeName the node name.\r\n     * @param script the script to run.\r\n     * @param options the options for this request.\r\n     * @return Observable<PowerShellResult> the query observable.\r\n     */\r\n    public run(\r\n            nodeName: string,\r\n            commandOrScript: string | PowerShellCommand,\r\n            options?: PowerShellStreamOptions): Observable<PowerShellResult> {\r\n        const command = PowerShell.getPowerShellCommand(commandOrScript);\r\n        return this.monitorCreateRequest(nodeName, command, PowerShell.newEndpointOptions(options));\r\n    }\r\n\r\n    /**\r\n     * Cancel active powershell script.\r\n     * Result response comes back to the original query to end.\r\n     *\r\n     * @param nodeName the node name.\r\n     * @param id the id of original request specified as options.queryId.\r\n     */\r\n    public cancel(nodeName: string, id: string, options?: PowerShellStreamOptions): void {\r\n        const target = this.getTarget(nodeName, PowerShell.newEndpointOptions(options));\r\n        const requestState = WebsocketStreamDataRequestState.Cancel;\r\n        const request = <PowerShellStreamRequest>{ id, target, requestState, script: null };\r\n\r\n        // remove from queue if not submitted yet.\r\n        const queue = this.queues.get(target.nodeName);\r\n        if (queue) {\r\n            const pendingRequest = queue.pendingRequests.find(entry => entry.id === id);\r\n            if (pendingRequest) {\r\n                queue.pendingRequests.remove(pendingRequest);\r\n                queue.outstandingCount--;\r\n                const processor = this.processors.get(id);\r\n                this.processors.delete(id);\r\n                processor.complete();\r\n                return;\r\n            }\r\n        }\r\n\r\n        this.websocketStream.sendNext(WebsocketStreamName.PowerShellStreamName, request);\r\n    }\r\n\r\n    /**\r\n     * Reset data for connection cleanup.\r\n     */\r\n    public reset(): void {\r\n        Logging.log({ level: LogLevel.Warning, message: this.strings.ResetError.message, source: PowerShellStream.logSourceName });\r\n        const processors: PowerShellProcessor[] = [];\r\n        this.processors.forEach((value) => processors.push(value));\r\n        this.processors.clear();\r\n        processors.forEach((processor) => {\r\n            processor.error(new Error(this.strings.ResetError.message));\r\n        });\r\n    }\r\n\r\n    /**\r\n     * Process the socket message.\r\n     *\r\n     * @param message the socket message.\r\n     */\r\n    public process(message: PowerShellStreamResponse): void {\r\n        if (!message) {\r\n            throw new Error(this.strings.NoContentError.message);\r\n        }\r\n\r\n        const processor = this.processors.get(message.id);\r\n        if (!processor) {\r\n            Logging.log({ level: LogLevel.Warning, message: this.strings.UnexpectedReceivedError.message, source: PowerShellStream.logSourceName });\r\n            return;\r\n        }\r\n\r\n        switch (message.state) {\r\n            case WebsocketStreamDataState.Data:\r\n                this.operationNext(processor, message.response);\r\n                break;\r\n\r\n            case WebsocketStreamDataState.Completed:\r\n                this.operationComplete(processor, message.response);\r\n                this.operationEnd(message.id);\r\n                break;\r\n\r\n            case WebsocketStreamDataState.Error:\r\n                this.operationError(processor, { xhr: message });\r\n                this.operationEnd(message.id);\r\n                break;\r\n\r\n            case WebsocketStreamDataState.Noop:\r\n                break;\r\n        }\r\n    }\r\n\r\n    /**\r\n     * Gets the JEA powershell endpoint, if it exists\r\n     *\r\n     * @param nodeName The node name\r\n     */\r\n    public getJeaEndpoint(nodeName: string) {\r\n        return this.authorizationManager.getJeaEndpoint(nodeName);\r\n    }\r\n\r\n    /**\r\n     * Gets websocket stream connection state.\r\n     */\r\n    public getWebsocketStreamConnectionState(): WebsocketStreamConnectionState {\r\n        return this.websocketStream.socketStateRaw;\r\n    }\r\n\r\n    /**\r\n     * Gets WebSocket State.\r\n     */\r\n    public get websocketState(): ReplaySubject<WebsocketStreamConnectionState> {\r\n        return this.websocketStream.socketState;\r\n    }\r\n\r\n    private operationNext(processor: PowerShellProcessor, response: PowerShellResult): boolean {\r\n        const partial = processor.options && processor.options.partial;\r\n\r\n        // buffering result.\r\n        if (!partial) {\r\n            if (!processor.response) {\r\n                processor.response = response;\r\n            } else {\r\n                if (response.errors) {\r\n                    if (!processor.response.errors) {\r\n                        processor.response.errors = response.errors;\r\n                    } else {\r\n                        response.errors.forEach(value => processor.response.errors.push(value));\r\n                    }\r\n                }\r\n\r\n                if (response.progress) {\r\n                    if (!processor.response.progress) {\r\n                        processor.response.progress = response.progress;\r\n                    } else {\r\n                        response.progress.forEach(value => processor.response.progress.push(value));\r\n                    }\r\n                }\r\n\r\n                if (response.results) {\r\n                    if (!processor.response.results) {\r\n                        processor.response.results = response.results;\r\n                    } else {\r\n                        response.results.forEach(value => processor.response.results.push(value));\r\n                    }\r\n                }\r\n            }\r\n        } else {\r\n            processor.next(response);\r\n        }\r\n\r\n        return !partial;\r\n    }\r\n\r\n    private operationComplete(processor: PowerShellProcessor, response: PowerShellResult): void {\r\n        if (this.operationNext(processor, response)) {\r\n            processor.next(processor.response);\r\n        }\r\n\r\n        processor.complete();\r\n    }\r\n\r\n    private operationError(processor: PowerShellProcessor, error: any): void {\r\n        processor.error(error);\r\n    }\r\n\r\n    private operationEnd(id: string): void {\r\n        const processor = this.processors.get(id);\r\n        this.processors.delete(id);\r\n        const queue = this.queues.get(processor.target.nodeName);\r\n        if (--queue.outstandingCount === 0) {\r\n            this.queues.delete(processor.target.nodeName);\r\n        }\r\n\r\n        if (queue.pendingRequests.length > 0) {\r\n            // if there is queued item, then send request.\r\n            const request = queue.pendingRequests.shift();\r\n            this.websocketStream.sendNext(WebsocketStreamName.PowerShellStreamName, request);\r\n        }\r\n    }\r\n\r\n    private createRequest<T>(\r\n        nodeName: string,\r\n        command: PowerShellCommand,\r\n        options?: PowerShellStreamOptions): Observable<T> {\r\n        // publish object is created two ways.\r\n        // 1) socket is connected so submit the request immediately with simple observable.\r\n        //   (if-block and this is the most of cases.)\r\n        // 2) socket is not connected so wait for the socket to ready and submit request with\r\n        //    complex observable. Initial connect and re-connection takes this observable.\r\n        //   (else-block and this is a few cases.)\r\n        let publish: Observable<T>;\r\n        const endpoint = this.authorizationManager.getJeaEndpoint(nodeName);\r\n        const newOptions = {...(options || {})};\r\n        if (endpoint) {\r\n            newOptions.powerShellEndpoint = endpoint;\r\n        }\r\n\r\n        if (this.websocketStream.socketStateRaw === WebsocketStreamConnectionState.Connected) {\r\n            publish = this.createRequestSimple(nodeName, command, newOptions);\r\n        } else {\r\n            publish = this.websocketStream.socketState\r\n                .pipe(\r\n                    filter(state => state === WebsocketStreamConnectionState.Connected\r\n                        || state === WebsocketStreamConnectionState.Failed\r\n                        || state === WebsocketStreamConnectionState.NotConfigured),\r\n                    take(1),\r\n                    mergeMap(state => {\r\n                        if (state === WebsocketStreamConnectionState.Connected) {\r\n                            return this.createRequestSimple<T>(nodeName, command, newOptions);\r\n                        }\r\n\r\n                        return throwError(() => new Error(this.strings.ConnectionError.message));\r\n                    }));\r\n        }\r\n\r\n        return publish\r\n            .pipe(\r\n                catchError((error) => {\r\n                    // retry if reset connection of socket was observed.\r\n                    if (error && error.message === this.strings.ResetError.message) {\r\n                        return <Observable<T>>this.monitorCreateRequest(nodeName, command, newOptions);\r\n                    }\r\n\r\n                    if ((!options || options.noAuth !== true) && !this.authorizationManager.signOnManager.isSignOnTokenEnabled) {\r\n                        if (this.authorizationManager.canHandleStreamFailure(error.xhr)) {\r\n                            return this.authorizationManager.handleStreamFailure(nodeName, options, error.xhr)\r\n                                .pipe(switchMap(updatedOptions =>\r\n                                    <Observable<T>>this.monitorCreateRequest(nodeName, command, updatedOptions)));\r\n                        }\r\n                    }\r\n\r\n                    if (this.authorizationManager.signOnManager.isSignOnTokenEnabled) {\r\n                        if (this.authorizationManager.signOnManager.canHandleStreamUnauthorizedLogin(error.xhr)) {\r\n                            return this.authorizationManager.signOnManager.handleStreamUnauthorizedLogin(options, error.xhr)\r\n                                .pipe(switchMap(updatedOptions =>\r\n                                    <Observable<T>>this.monitorCreateRequest(nodeName, command, updatedOptions)));\r\n                        }\r\n                    }\r\n\r\n                    return throwError(() => error);\r\n                }));\r\n    }\r\n\r\n    private createRequestSimple<T>(\r\n        nodeName: string,\r\n        command: PowerShellCommand,\r\n        options?: PowerShellStreamOptions): Observable<T> {\r\n        return new Observable(observer => {\r\n            const target = this.getTarget(nodeName, options);\r\n            const requestState = WebsocketStreamDataRequestState.Normal;\r\n            const id = this.sendRequest(observer, target, requestState, command, options);\r\n            return () => {\r\n                const processor = this.processors.get(id);\r\n                if (processor) {\r\n                    processor.end = true;\r\n                    if (!processor.closed && !processor.closing) {\r\n                        this.cancel(processor.target.nodeName, id);\r\n                    }\r\n                }\r\n            };\r\n        });\r\n    }\r\n\r\n    private sendRequest(\r\n        observer: Observer<any>,\r\n        target: WebsocketStreamDataTarget,\r\n        requestState: WebsocketStreamDataRequestState,\r\n        command: PowerShellCommand,\r\n        options?: PowerShellStreamOptions): string {\r\n        const id = (options && options.queryId) || MsftSme.getUniqueId();\r\n        const request = <PowerShellStreamRequest>{\r\n            ...{\r\n                id,\r\n                target,\r\n                requestState,\r\n                options\r\n            },\r\n            ...command\r\n        };\r\n        const processor = new PowerShellProcessor(observer, target, options);\r\n        const queue = this.queues.get(target.nodeName);\r\n        this.processors.set(id, processor);\r\n\r\n        // During a send request, if caller provides 'options.close' as true,\r\n        // we shouldn't manage the request via a queue and on Gateway, we should create a\r\n        // new, one time use Runspace, which is disposed after use, instead of using one from the pool.\r\n        // As currently this is not handled on Gateway, just ignore the 'options.close' for now.\r\n        /*\r\n        if (options && options.close) {\r\n            // disposing session.\r\n            this.websocketStream.sendNext(WebsocketStreamName.PowerShellStreamName, request);\r\n            return id;\r\n        }\r\n        */\r\n\r\n        if (++queue.outstandingCount > PowerShellStream.maxRunPerNode) {\r\n            queue.pendingRequests.push(request);\r\n            return id;\r\n        }\r\n\r\n        this.websocketStream.sendNext(WebsocketStreamName.PowerShellStreamName, request);\r\n        return id;\r\n    }\r\n\r\n    private getTarget(nodeName: string, options: PowerShellStreamOptions): WebsocketStreamDataTarget {\r\n        if (!this.queues.has(nodeName)) {\r\n            const queue = <PowerShellStreamQueue>{ outstandingCount: 0, pendingRequests: [] };\r\n            this.queues.set(nodeName, queue);\r\n        }\r\n\r\n        return this.websocketStream.getTarget(this.authorizationManager, nodeName, options.powerShellEndpoint);\r\n    }\r\n\r\n    private monitorCreateRequest<T>(\r\n        nodeName: string,\r\n        command: PowerShellCommand,\r\n        options?: PowerShellStreamOptions): Observable<T> {\r\n        let monitored: (nodeName: string, command: PowerShellCommand, options?: PowerShellStreamOptions) => Observable<T>\r\n            = (nodeName1: string, command1: PowerShellCommand, options1?: PowerShellStreamOptions) =>\r\n                this.createRequest(nodeName1, command1, options1);\r\n        for (const monitorSet of PowerShellStream.monitorSets) {\r\n            monitored = this.monitor<T>(monitored, monitorSet);\r\n        }\r\n\r\n        return monitored(nodeName, command, options);\r\n    }\r\n\r\n    private monitor<T>(\r\n        target: (nodeName: string, command: PowerShellCommand, options?: PowerShellStreamOptions) => Observable<T>,\r\n        monitorSet: PowerShellStreamMonitorSet<T>): (nodeName: string, command: PowerShellCommand, options?: PowerShellStreamOptions) => Observable<T> {\r\n\r\n        return function(nodeName: string, command: PowerShellCommand, options?: PowerShellStreamOptions): Observable<T> {\r\n            let context: PowerShellStreamMonitorSetContext<any>;\r\n            return monitorSet.preMonitor(nodeName, command, options)\r\n                .pipe(\r\n                    switchMap(packet => {\r\n                        context = packet;\r\n                        return target(packet.nodeName, packet.command, packet.options);\r\n                    }),\r\n                    catchError((error: any) => monitorSet.errorMonitor(error, context)),\r\n                    switchMap(response => monitorSet.successMonitor(response, context))\r\n                );\r\n        };\r\n    }\r\n}\r\n"]}