import * as MQTT from 'mqtt' import axios, { AxiosResponse } from 'axios' import * as Errors from './errors' import * as Types from './types' import { Logger, MessageProcessor, MessageRequestTopic, RPCManager, RPCRequest, Timer } from './utils' const { CONNECT, MESSAGE, } = Types.Event export class Client { public isConnected: boolean public isCommissioned: boolean public deviceId: string | null private _client: MQTT.MqttClient | null private _config: Types.Config private _log: any constructor (config: Types.Config) { this._config = config this._client = null this.isConnected = false this.isCommissioned = config.isCommissioned || false this.deviceId = config.deviceId || null this._log = Logger(config.debugLevel || Types.DebugLevel.FATAL) } public async connect (config?: Types.Config): Promise { if (config) { this._config = config } const { host, ca, key, cert, clientId, } = this._config if (this._client) { this._client.end() this._client = null } return new Promise((resolve, reject) => { try { this._client = MQTT.connect({ host, ca, key, cert, clientId, protocol: 'mqtts', clean: true, }) this._log.info({ eventType: 'connect', }) const messageProcessor = new MessageProcessor(this) this._client.on(CONNECT, async connack => { if (connack) { this._log.info({ eventType: 'connect', }) this.isConnected = true if (this._client) { if (this.isCommissioned) { await this._subscribe(new MessageRequestTopic(clientId).topic) } this._client.on(MESSAGE, (topic: string, payload: Buffer) => { messageProcessor.process(topic, payload) }) } resolve(connack) } }) } catch (err) { this._log.error({ eventType: 'connect', }) reject(new Errors.ConnectionError(err)) } }) } public async disconnect (force: boolean = true): Promise { return new Promise((resolve, reject) => { try { if (!this._client) { throw new Errors.ClientInitializeError() } this._client.end(force, () => { this._log.info({ eventType: 'disconnect', }) this.isConnected = false resolve() }) } catch (err) { this._log.error({ eventType: 'disconnect', }) reject(err) } }) } public async getStatus (timeout?: number): Promise { try { const payload = await this._rpc( Types.TopicServiceName.IOT, Types.RPCRequestMethod.GET_STATUS, {}, timeout, ) this._log.info({ eventType: 'getStatus', payload, }) return payload.result as Types.GetStatusResult } catch (err) { throw new Errors.GetStatusError(err) } } public async commission ( params?: Types.CommissionParams, timeout?: number, ): Promise { const forceCommission = params && params.force if (!forceCommission && this.isCommissioned && this.deviceId && this._config.productId) { return { deviceId: this.deviceId, productId: this._config.productId, } } let productId if (this._config.productId) { productId = this._config.productId } else if (params && params.productId) { this._config.productId = params.productId productId = params.productId } else { throw new Errors.NoDeviceTypeError() } try { const payload = await this._rpc( Types.TopicServiceName.IOT, Types.RPCRequestMethod.COMMISSION, { productId, }, timeout, ) this._log.info({ eventType: 'commission', payload, }) const result = payload.result as Types.CommissionResult this.deviceId = result.deviceId this.isCommissioned = true await this._subscribe(new MessageRequestTopic(this._config.clientId).topic) return result } catch (err) { this.isCommissioned = false throw new Errors.CommissionError(err) } } public async decommission (timeout?: number): Promise<{}> { try { const payload = await this._rpc( Types.TopicServiceName.IOT, Types.RPCRequestMethod.DECOMMISSION, {}, timeout, ) this._log.info({ eventType: 'decommission', payload, }) this.isCommissioned = false this.deviceId = null return payload.result as {} } catch (err) { throw new Errors.DecommissionError(err) } } public async getDeviceInfo ( timeout?: number, ): Promise { try { const payload = await this._rpc( Types.TopicServiceName.DEVICES, Types.RPCRequestMethod.GET_DEVICE_INFO, {}, timeout, ) this._log.info({ eventType: 'getDeviceInfo', payload, }) return payload.result as Types.GetDeviceInfoResult } catch (err) { throw new Errors.GetDeviceInfoError(err) } } public async getProductInfo ( productId?: string, timeout?: number, ): Promise { try { if (!this._config.productId && !productId) { throw new Error('No product ID provided') } const payload = await this._rpc( Types.TopicServiceName.PRODUCT, Types.RPCRequestMethod.GET_PRODUCT_INFO, {}, timeout, // LM (10/28/19): This will be used to construct the product RPC request topic correctly productId || this._config.productId, ) this._log.info({ eventType: 'getProductInfo', payload, }) return payload.result as Types.GetProductInfoResult } catch (err) { throw new Errors.GetProductInfoError(err) } } public async associateUser(params: Types.AssociateUserParams, timeout?: number): Promise { try { const payload = await this._rpc( Types.TopicServiceName.DEVICES, Types.RPCRequestMethod.ASSOCIATE_USER, params, timeout, ) this._log.info({ eventType: 'associateUser', payload, }) return payload.result as Types.AssociateUserResult } catch (err) { throw new Errors.AssociateUserError(err) } } public async getInvitationCode ( params: Types.GetInvitationCodeParams, timeout?: number, ): Promise { try { const payload = await this._rpc( Types.TopicServiceName.USER, Types.RPCRequestMethod.GET_INVITATION_CODE, params, timeout, ) this._log.info({ eventType: 'getInvitationCode', payload, }) return payload.result as Types.GetInvitationCodeResult } catch (err) { throw new Errors.GetInvitationCodeError(err) } } public async getState ( deviceId?: string, timeout?: number, ): Promise { try { const payload = await this._rpc( Types.TopicServiceName.STATE, Types.RPCRequestMethod.GET, {}, timeout, deviceId, ) this._log.info({ eventType: 'getState', payload, }) return payload.result as Types.GetStateResult } catch (err) { throw new Errors.GetStateError(err) } } public async setState ( params: Types.SetStateParams, deviceId?: string, timeout?: number ): Promise { try { if (!this.isConnected) { throw new Errors.ClientNotConnectedError() } const payload = await this._rpc( Types.TopicServiceName.STATE, Types.RPCRequestMethod.SET, params, timeout, deviceId, ) this._log.info({ eventType: 'setState', payload, }) return payload.result as Types.SetStateResult } catch (err) { throw new Errors.SetStateError(err) } } public async createAndPair ( params: Types.CreateAndPairParams, timeout?: number, ): Promise { try { const payload = await this._rpc( Types.TopicServiceName.GATEWAY_CHILDREN, Types.RPCRequestMethod.CREATE_AND_PAIR, params, timeout, ) this._log.info({ eventType: 'createAndPair', payload, }) return payload.result as Types.CreateAndPairResult } catch (err) { throw new Errors.CreateAndPairError(err) } } public async pairChild(params: Types.PairChildParams, timeout?: number): Promise { try { const payload = await this._rpc( Types.TopicServiceName.GATEWAY_CHILDREN, Types.RPCRequestMethod.PAIR_CHILD, params, timeout ) this._log.info({ eventType: 'pairChild', payload, }) return payload.result as Types.PairChildResult } catch (err) { throw new Errors.PairChildError(params.deviceId, err) } } public async listChildren(timeout?: number): Promise { try { const payload = await this._rpc( Types.TopicServiceName.GATEWAY_CHILDREN, Types.RPCRequestMethod.LIST_CHILDREN, {}, timeout ) this._log.info({ eventType: 'listChildren', payload, }) return payload.result as Types.ListChildrenResult[] } catch (err) { throw new Errors.ListChildrenError(err) } } public async removeChild( params: Types.RemoveChildParams, timeout?: number ): Promise { try { const payload = await this._rpc( Types.TopicServiceName.GATEWAY_CHILDREN, Types.RPCRequestMethod.REMOVE_CHILD, params, timeout ) this._log.info({ eventType: 'removeChild', payload, }) return } catch (err) { throw new Errors.RemoveChildError(err) } } public on (event: Types.Event, cb): void { if (!this._client) { throw new Errors.ClientInitializeError() } this._log.info({ eventType: 'eventListener:on', eventName: event, }) this._client.on(event, cb) } public off (event: Types.Event, cb): void { if (!this._client) { throw new Errors.ClientInitializeError() } this._log.info({ eventType: 'eventListener:off', eventName: event, }) this._client.removeListener(event, cb) } public async getFirmware (params: Types.GetFirmwareParams, timeout?: number): Promise { try { const timer = new Timer(timeout || 4500) const { data } = await Promise.race([ timer.run(), axios.get(params.url), ]) as AxiosResponse return data } catch (err) { this._log.error({ eventType: 'getFirmware', url: params.url, }) throw new Errors.GetFirmwareError(err) } } private async _rpc ( serviceName: Types.TopicServiceName, method: Types.RPCRequestMethod, params: Types.RPCParams, timeout?: number, deviceId?: string, ): Promise { const rpcRequest = new RPCRequest( this._config.clientId, serviceName, method, params, deviceId, ) return new RPCManager( this, rpcRequest, timeout, ).rpc() } private async _publish ( topic: string, payload: Types.PublishParams | Types.StatePayload, opts?: MQTT.IClientPublishOptions): Promise<{ id: string }> { return new Promise((resolve, reject) => { try { if (!this._client) { throw new Errors.ClientInitializeError() } const stringifiedPayload = JSON.stringify(payload) const callback = (err): void => { if (err) { this._log.error({ eventType: 'publish', topic, opts, }) throw new Errors.PublishError( topic, stringifiedPayload, err, ) } else { this._log.info({ eventType: 'publish', topic, opts, }) resolve({ id: payload.requestId, }) } } if (opts) { this._client.publish(topic, stringifiedPayload, opts, callback) } else { this._client.publish(topic, stringifiedPayload, callback) } } catch (err) { this._log.error({ eventType: 'publish', topic, opts, }) reject(err) } }) } private _subscribe ( topic: string, opts?: MQTT.IClientSubscribeOptions, ): Promise<{ topic: string }> { return new Promise((resolve, reject) => { try { if (!this._client) { throw new Errors.ClientInitializeError() } const callback = (err, granted: { topic: string qos: number }[]): void => { if (err) { this._log.error({ eventType: 'subscribe', topic, opts, }) throw new Errors.SubscribeError(topic, err) } if (granted) { this._log.info({ eventType: 'subscribe', topic, opts, }) resolve({ topic }) } } if (opts) { this._client.subscribe(topic, opts, callback) } else { this._client.subscribe(topic, callback) } } catch (err) { this._log.error({ eventType: 'subscribe', topic, opts, }) reject(err) } }) } // tslint:disable private _unsubscribe (topic: string): Promise<{ topic: string }> { return new Promise((resolve, reject) => { try { if (!this._client) { throw new Errors.ClientInitializeError() } this._client.unsubscribe(topic, err => { if (err) { this._log.error({ eventType: 'unsubscribe', topic, }) throw new Errors.UnsubscribeError(topic, err) } else { this._log.info({ eventType: 'unsubscribe', topic, }) resolve({ topic }) } }) } catch (err) { this._log.error({ eventType: 'unsubscribe', topic, }) reject(err) } }) } }