{"version":3,"sources":["../../../packages/core/data/extension-broker/worker-set.ts"],"names":[],"mappings":"AAAA,OAAO,EAA0B,UAAU,EAA0B,MAAM,MAAM,CAAC;AAIlF,OAAO,EAAE,eAAe,EAAE,MAAM,oBAAoB,CAAC;AACrD,OAAO,EAAE,uBAAuB,EAAE,MAAM,4BAA4B,CAAC;AAErE,oBAAY,2BAA2B,GAAG,CAAC,MAAM,EAAE,MAAM,EAAE,OAAO,EAAE,MAAM,EAAE,IAAI,EAAE,GAAG,KAAK,GAAG,EAAE,CAAC;AAEhG,MAAM,WAAW,YAAY,CAAC,CAAC;IAC3B,WAAW,CAAC,EAAE,CAAC,CAAC;IAChB,KAAK,CAAC,EAAE,GAAG,CAAC;IACZ,MAAM,EAAE,uBAAuB,CAAC;CACnC;AAED;;GAEG;AACH,qBAAa,SAAS;IAgCN,OAAO,CAAC,eAAe;IAC/B,OAAO,CAAC,eAAe;IACvB,OAAO,CAAC,YAAY,CAAC;IAhCzB;;OAEG;IACH,OAAO,KAAK,aAAa,GAExB;IAED;;OAEG;IACH,OAAO,CAAC,OAAO,CAA2C;IAE1D;;OAEG;IACH,OAAO,CAAC,yBAAyB,CAA8B;IAE/D,SAAgB,kBAAkB,EAAE,UAAU,CAAC,OAAO,CAAC,CAAiD;IAExG,IAAW,oBAAoB,IAAI,OAAO,CAEzC;IAED;;;;;;OAMG;gBACiB,eAAe,EAAE,eAAe,EACxC,eAAe,EAAE,MAAM,EACvB,YAAY,CAAC,EAAE,2BAA2B;IAKtD;;OAEG;IACI,iBAAiB,IAAI,IAAI;IAkChC;;OAEG;IACI,cAAc,IAAI,IAAI;IAO7B;;OAEG;IACI,GAAG,CAAC,CAAC,EAAE,MAAM,EAAE,MAAM,EAAE,GAAG,IAAI,EAAE,GAAG,EAAE,GAAG,UAAU,CAAC,YAAY,CAAC,CAAC,CAAC,EAAE,CAAC;IAM5E;;OAEG;IACI,IAAI,CAAC,CAAC,EAAE,MAAM,EAAE,MAAM,EAAE,GAAG,IAAI,EAAE,GAAG,EAAE,GAAG,UAAU,CAAC,YAAY,CAAC,CAAC,CAAC,CAAC;IAM3E;;OAEG;IACH,OAAO,CAAC,YAAY;CAiDvB","file":"worker-set.d.ts","sourcesContent":["import { BehaviorSubject, merge, Observable, of, ReplaySubject, zip } from 'rxjs';\r\nimport { catchError, mergeMap, map, take } from 'rxjs/operators';\r\nimport { LogLevel } from '../../diagnostics/log-level';\r\nimport { Logging } from '../../diagnostics/logging';\r\nimport { ExtensionBroker } from './extension-broker';\r\nimport { WorkerExtensionInstance } from './model/extension-instance';\r\n\r\nexport type WorkerVersionArgumentMapper = (method: string, version: number, args: any) => any[];\r\n\r\nexport interface WorkerResult<T> {\r\n    returnValue?: T;\r\n    error?: any;\r\n    worker: WorkerExtensionInstance;\r\n}\r\n\r\n/**\r\n * Defines an abstraction for working with a set of workers given a designated extension target\r\n */\r\nexport class WorkerSet {\r\n\r\n    /**\r\n     * The source name to use when logging about this service\r\n     */\r\n    private get logSourceName() {\r\n        return 'WorkerSet';\r\n    }\r\n\r\n    /**\r\n     * Observable to track the creation of our workers\r\n     */\r\n    private workers: ReplaySubject<WorkerExtensionInstance[]>;\r\n\r\n    /**\r\n     * Determine if workers were initialized successfully\r\n     */\r\n    private workersInitializedSubject = new BehaviorSubject(false);\r\n\r\n    public readonly workersInitialized: Observable<boolean> = this.workersInitializedSubject.asObservable();\r\n\r\n    public get isWorkersInitialized(): boolean {\r\n        return this.workersInitializedSubject.value;\r\n    }\r\n\r\n    /**\r\n     * instantiates a new instance of the AzureCenterService service\r\n     * @param extensionBroker the extension broker instance\r\n     * @param extensionTarget the extension target id we want to get workers for\r\n     * @param mapArguments a function that maps different versions of worker arguments\r\n     * based on the the versions implemented by a worker @optional @deprecated\r\n     */\r\n    constructor(private extensionBroker: ExtensionBroker,\r\n        private extensionTarget: string,\r\n        private mapArguments?: WorkerVersionArgumentMapper\r\n    ) {\r\n        this.workers = new ReplaySubject<WorkerExtensionInstance[]>();\r\n    }\r\n\r\n    /**\r\n     * Initializes the workers\r\n     */\r\n    public initializeWorkers(): void {\r\n        this.extensionBroker.getTargetExtensions(this.extensionTarget).pipe(\r\n            mergeMap(fulfillments => {\r\n                const workersObservables = Object.keys(fulfillments).map(entryPointId =>\r\n                    this.extensionBroker.createWorker(entryPointId, this.extensionTarget)\r\n                );\r\n\r\n                if (workersObservables.length === 0) {\r\n                    // This is a valid case were the workers should be considered to be initialized,\r\n                    // but no workers exist that target this extension.\r\n                    return of([]);\r\n                }\r\n                return zip(...workersObservables);\r\n            }),\r\n            take(1)\r\n        ).subscribe({\r\n            next: workers => {\r\n                this.workers.next(workers);\r\n                this.workersInitializedSubject.next(true);\r\n            },\r\n            error: error => {\r\n                Logging.log({\r\n                    level: LogLevel.Critical,\r\n                    message: 'Failed to initialize workers',\r\n                    source: this.logSourceName,\r\n                    params: {\r\n                        error: error\r\n                    }\r\n                });\r\n                this.workers.next([]);\r\n            }\r\n        });\r\n    }\r\n\r\n    /**\r\n     * Destroys the workers\r\n     */\r\n    public destroyWorkers(): void {\r\n        this.workers.pipe(\r\n            mergeMap(workers => zip(...(workers.map(w => w.destroy())))),\r\n            take(1)\r\n        ).subscribe();\r\n    }\r\n\r\n    /**\r\n     * Calls a method on each worker in and emits when all workers to respond\r\n     */\r\n    public all<T>(method: string, ...args: any[]): Observable<WorkerResult<T>[]> {\r\n        return this.prepareCalls<T>(method, args).pipe(\r\n            mergeMap(calls => zip(...calls))\r\n        );\r\n    }\r\n\r\n    /**\r\n     * Calls a method on each worker in and emits when each worker responds\r\n     */\r\n    public each<T>(method: string, ...args: any[]): Observable<WorkerResult<T>> {\r\n        return this.prepareCalls<T>(method, args).pipe(\r\n            mergeMap(calls => merge(...calls))\r\n        );\r\n    }\r\n\r\n    /**\r\n     * Calls a function on the applicable workers\r\n     */\r\n    private prepareCalls<T>(method: string, ...args: any[]): Observable<Observable<WorkerResult<T>>[]> {\r\n        return this.workers.pipe(\r\n            map(workers => {\r\n                // find workers that support the version of the method we want\r\n                const workerCalls: Observable<WorkerResult<T>>[] = workers\r\n                    // map all our worker instances into a set of supported function calls\r\n                    .map(w => {\r\n                        const methodVersion = w.extenderDefinition.methodVersions[method];\r\n\r\n                        if (MsftSme.isNullOrUndefined(methodVersion)) {\r\n                            // this worker does not support this method at all\r\n                            return null;\r\n                        }\r\n\r\n                        // if this worker implements the mapArguments method then treat it like the old version of this method\r\n                        let mappedArguments = args;\r\n                        if (this.mapArguments) {\r\n                            mappedArguments = this.mapArguments(method, methodVersion, args[0]);\r\n\r\n                            if (MsftSme.isNullOrUndefined(mappedArguments)) {\r\n                                // this worker does not support a version this implementation can deal with\r\n                                return null;\r\n                            }\r\n                        }\r\n\r\n                        return w.call(method, methodVersion, ...mappedArguments).pipe(\r\n                            map(methodReturn => {\r\n                                const workerResult: WorkerResult<T> = {\r\n                                    worker: w,\r\n                                    returnValue: methodReturn\r\n                                };\r\n                                return workerResult;\r\n                            }),\r\n                            catchError((error) => {\r\n                                const workerResult: WorkerResult<T> = {\r\n                                    worker: w,\r\n                                    error: error\r\n                                };\r\n                                return of(workerResult);\r\n                            })\r\n                        );\r\n                    })\r\n                    // ignore workers that dont support this method\r\n                    .filter(w => !!w);\r\n\r\n                return workerCalls;\r\n            })\r\n        );\r\n    }\r\n}\r\n"]}