import { MqttClient } from 'mqtt' import { Command, MessageTopicParser, RPCInternalResponseTopic, State, TopicParser, NotificationMessageEvent, } from '.' import { Client } from '../client' import { InvalidMessageError, MessageProcessorError } from '../errors' import { Event, MessageMethod, MessageType, TopicType } from '../types' export class MessageProcessor { private _client: Client private _mqttClient: MqttClient private _log: any constructor (client) { this._client = client this._mqttClient = client['_client'] this._log = client['_log'] } public process (msgTopic: string, msgPayload: Buffer): void { try { let payload let eventPayload let eventTopic const invalidMessageError = new InvalidMessageError(msgTopic, msgPayload.toString()) const topic = new TopicParser(msgTopic) try { payload = this._toJSON(msgPayload) } catch (err) { this._emitError(invalidMessageError) } this._log.info({ eventType: 'message', topic: msgTopic, data: payload }) if (topic.type === TopicType.RPC_RESPONSE) { const { requestId, result, error } = payload if (!requestId || (!result && !error)) { this._emitError(invalidMessageError) } eventTopic = new RPCInternalResponseTopic(topic.type, requestId).topic if (error) { this._mqttClient.emit(eventTopic, undefined, payload) } eventPayload = payload } else if (topic.type === TopicType.MESSAGE) { const { method, params } = payload if (!method || !params) { this._emitError(invalidMessageError) } const messageTopic = new MessageTopicParser(msgTopic) const { messageType, childDeviceId } = messageTopic switch (messageType) { case MessageType.COMMAND: eventTopic = Event.COMMAND eventPayload = new Command(this._client, payload, childDeviceId) break case MessageType.STATE: const statePayload = { ...payload, childDeviceId } eventTopic = Event.STATE eventPayload = new State(this._client, payload, childDeviceId) break case MessageType.PRODUCT: if (method === MessageMethod.FIRMWARE_UPDATE) eventTopic = Event.FIRMWARE_UPDATE else eventTopic = Event.PRODUCT eventPayload = new NotificationMessageEvent(this._client, payload, childDeviceId) break case MessageType.DEVICE: eventTopic = Event.DEVICE_UPDATE eventPayload = new NotificationMessageEvent(this._client, payload, childDeviceId) break } } this._log.info({ eventType: 'emit', listener: eventTopic, data: eventPayload }) this._mqttClient.emit(eventTopic, eventPayload) } catch (err) { this._log.info({ eventType: 'message', err: new MessageProcessorError(err) }) } } private _emitError (error): void { this._log.info({ eventType: 'message', err: new MessageProcessorError(error) }) this._mqttClient.emit(Event.ERROR, error) } private _toJSON (payload: Buffer): object { return JSON.parse(payload.toString()) } }