import { MqttClient } from 'mqtt' import { Client } from '../client' import { ListenerCleanUpError, RPCResponseError } from '../errors' import { Event, RPCFailurePayload, RPCSuccessPayload } from '../types' import { Listener } from './listener' import { RPCRequest } from './rpcRequest' import { Timer } from './timer' export class RPCManager { private _client: Client private rpcRequest: RPCRequest private listeners: Listener[] private _timer: Timer private _mqttClient: MqttClient private _log: any constructor (client, rpcRequest, timeout) { this._client = client this._mqttClient = client['_client'] this.rpcRequest = rpcRequest this.listeners = [] this._log = client['_log'] const requestTimeout = timeout || client['_config'].rpcRequestTimeout || 4500 this._timer = new Timer(requestTimeout) } public rpc (): Promise { return new Promise(async (resolve, reject) => { try { const result = await Promise.race([ this._timer.run(), this._runRPCProcesses(), ]) as RPCSuccessPayload resolve(result) } catch (err) { this._log.error({ eventType: 'request', rpcRequestTopic: this.rpcRequest.publishTopic, err }) reject(err) } }) } private _runRPCProcesses (): Promise { return new Promise(async (resolve, reject) => { try { this._addListener( new Listener( Event.ERROR, async err => { try { await this._cleanUp() reject(new Error(err)) } catch (err) { reject(err) } }, this._mqttClient, ), ) this._addListener( new Listener( this.rpcRequest.eventTopic, async (payload: RPCSuccessPayload, err: RPCFailurePayload) => { try { await this._cleanUp() if (err) { reject(new RPCResponseError(err)) } resolve(payload) } catch (err) { reject(err) } }, this._mqttClient, ), ) this._attachListeners() await this._client['_subscribe'](this.rpcRequest.subscribeTopic) await this._client['_publish']( this.rpcRequest.publishTopic, this.rpcRequest.payload, ) } catch (err) { reject(err) } }) } private _addListener (listener): void { this.listeners.push(listener) } private _attachListeners (): void { this.listeners.forEach(listener => listener.attach()) } private _detachListeners (): void { this.listeners.forEach(listener => listener.detach()) this.listeners = [] } private _cleanUp (): Promise { return new Promise(async (resolve, reject) => { try { await this._client['_unsubscribe'](this.rpcRequest.subscribeTopic) this._detachListeners() this._timer.destroy() resolve() } catch (err) { reject(new ListenerCleanUpError(err)) } }) } }