/* nodejs-poolController. An application to control pool equipment. Copyright (C) 2016, 2017. Russell Goldin, tagyoureit. russ.goldin@gmail.com This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ import { connect, MqttClient, Client, IClientPublishOptions, CloseCallback } from 'mqtt'; import * as http2 from "http2"; import * as http from "http"; import * as https from "https"; import extend = require("extend"); import { logger } from "../../logger/Logger"; import { PoolSystem, sys } from "../../controller/Equipment"; import { State, state } from "../../controller/State"; import { InterfaceEvent, BaseInterfaceBindings, InterfaceContext, IInterfaceEvent } from "./baseInterface"; import { sys as sysAlias } from "../../controller/Equipment"; import { state as stateAlias } from "../../controller/State"; import { webApp as webAppAlias } from '../Server'; import { Timestamp, Utils, utils } from "../../controller/Constants"; import { ServiceParameterError } from '../../controller/Errors'; export class MqttInterfaceBindings extends BaseInterfaceBindings { constructor(cfg) { super(cfg); this.subscribed = false; } public client: MqttClient; private topics: MqttTopicSubscription[] = []; declare events: MqttInterfaceEvent[]; declare subscriptions: MqttTopicSubscription[]; private subscribed: boolean; // subscribed to events or not private sentInitialMessages = false; private init = () => { (async () => { await this.initAsync(); })(); } public async initAsync() { try { if (this.client) await this.stopAsync(); logger.info(`Initializing MQTT client ${this.cfg.name}`); let baseOpts = extend(true, { headers: {} }, this.cfg.options, this.context.options); if ((typeof baseOpts.hostname === 'undefined' || !baseOpts.hostname) && (typeof baseOpts.host === 'undefined' || !baseOpts.host || baseOpts.host === '*')) { logger.warn(`Interface: ${this.cfg.name} has not resolved to a valid host.`); return; } const url = `${baseOpts.protocol || 'mqtt://'}${baseOpts.host}:${baseOpts.port || 1883}`; let toks = {}; const opts = { clientId: this.tokensReplacer(baseOpts.clientId, undefined, toks, { vars: {} } as any, {}), username: baseOpts.username, password: baseOpts.password, rejectUnauthorized: !baseOpts.selfSignedCertificate, url } this.setWillOptions(opts); this.client = connect(url, opts); this.client.on('connect', async () => { try { logger.info(`MQTT connected to ${url}`); await this.subscribe(); // make sure status is up to date immediately // especially in the case of a re-connect this.bindEvent("controller", state.controllerState); } catch (err) { logger.error(`Error connecting to MQTT Broker ${this.cfg.name} ${err.message}`); } }); this.client.on('reconnect', () => { try { logger.info(`Re-connecting to MQTT broker ${this.cfg.name}`); } catch (err) { logger.error(`Error reconnecting to MQTT Brokder ${this.cfg.name} ${err.message}`); } }); this.client.on('error', (error) => { logger.error(`MQTT error ${error}`) this.clearWillState(); }); } catch (err) { logger.error(`Error initializing MQTT client ${this.cfg.name}: ${err}`); } } public async stopAsync() { try { if (typeof this.client !== 'undefined') { await this.unsubscribe(); await new Promise((resolve, reject) => { this.client.end(true, { reasonCode: 0, reasonString: `Shutting down MQTT Client` }, () => { resolve(true); logger.info(`Successfully shut down MQTT Client`); }); }); if (this.client) this.client.removeAllListeners(); this.client = null; } } catch (err) { logger.error(`Error stopping MQTT Client: ${err.message}`); } } public async reload(data) { try { await this.unsubscribe(); this.context = Object.assign(new InterfaceContext(), data.context); this.events = Object.assign([], data.events); this.subscriptions = Object.assign([], data.subscriptions); await this.subscribe(); } catch (err) { logger.error(`Error reloading MQTT bindings`); } } private async unsubscribe() { try { this.client.off('message', this.messageHandler); while (this.topics.length > 0) { let topic = this.topics.pop(); if (typeof topic !== 'undefined') { await new Promise((resolve, reject) => { this.client.unsubscribe(topic.topicPath, (err, packet) => { if (err) { logger.error(`Error unsubscribing from MQTT topic ${topic.topicPath}: ${err}`); resolve(false); } else { logger.debug(`Unsubscribed from MQTT topic ${topic.topicPath}`); resolve(true); } }); }); } } this.subscribed = false; } catch (err) { logger.error(`Error unsubcribing to MQTT topic: ${err.message}`); } } protected async subscribe() { if (this.topics.length > 0) await this.unsubscribe(); let root = this.rootTopic(); if (typeof this.subscriptions !== 'undefined') { for (let i = 0; i < this.subscriptions.length; i++) { let sub = this.subscriptions[i]; if(sub.enabled !== false) this.topics.push(new MqttTopicSubscription(root, sub)); } } else if (typeof root !== 'undefined') { let arrTopics = [ `state/+/setState`, `state/+/setstate`, `state/+/toggleState`, `state/+/togglestate`, `state/body/setPoint`, `state/body/setpoint`, `state/body/heatSetpoint`, `state/body/coolSetpoint`, `state/body/heatMode`, `state/body/heatmode`, `state/+/setTheme`, `state/+/settheme`, `state/temps`, `config/tempSensors`, `config/chemController`, `state/chemController`, `config/chlorinator`, `state/chlorinator`]; for (let i = 0; i < arrTopics.length; i++) { this.topics.push(new MqttTopicSubscription(root, { topic: arrTopics[i] })); } } for (let i = 0; i < this.topics.length; i++) { let topic = this.topics[i]; this.client.subscribe(topic.topicPath, (err, granted) => { if (!err) logger.verbose(`MQTT subscribed to ${JSON.stringify(granted)}`); else logger.error(`MQTT Subscribe: ${err}`); }); } this.client.on('message', this.messageHandler); this.subscribed = true; } // this will take in the MQTT Formatter options and format each token that is bound // otherwise, it's the same as the base buildTokens fn. // This could be combined into one fn but for now it's specific to MQTT formatting of topics protected buildTokensWithFormatter(input: string, eventName: string, toks: any, e: InterfaceEvent, data, formatter: any): any { toks = toks || []; let s = input; let regx = /(?<=@bind\=\s*).*?(?=\;)/g; let match; let sys = sysAlias; let state = stateAlias; let webApp = webAppAlias; let vars = this.bindVarTokens(e, eventName, data); // Map all the returns to the token list. We are being very basic // here an the object graph is simply based upon the first object occurrence. // We simply want to eval against that object reference. while (match = regx.exec(s)) { let bind = match[0]; if (typeof toks[bind] !== 'undefined') continue; let tok: any = {}; toks[bind] = tok; try { // we may error out if data can't be found (eg during init) tok.reg = new RegExp("@bind=" + this.escapeRegex(bind) + ";", "g"); tok.value = eval(bind); if (typeof formatter !== 'undefined') { formatter.forEach(entry => { if (typeof entry.transform !== 'undefined') { let transform = `('${tok.value}')${entry.transform}`; tok.value = eval(transform); } else if (typeof entry === 'object') { let rexp = new RegExp(entry.regexkey, 'g') tok.value = tok.value.replace(rexp, entry.replace); } }) } } catch (err) { // leave value undefined so it isn't sent to bindings tok[bind] = null; } } return toks; } private setWillOptions = (connectOpts) => { const baseOpts = extend(true, { headers: {} }, this.cfg.options, this.context.options); if (baseOpts.willTopic !== 'undefined') { const rootTopic = this.rootTopic(); const topic = `${rootTopic}/${baseOpts.willTopic}`; const publishOptions = { retain: typeof baseOpts.retain !== 'undefined' ? baseOpts.retain : true, qos: typeof baseOpts.qos !== 'undefined' ? baseOpts.qos : 2 }; connectOpts.will = { topic: topic, payload: baseOpts.willPayload, retain: publishOptions.retain, qos: publishOptions.qos }; } } private clearWillState() { if (typeof this.client.options.will === 'undefined') return; let willTopic = this.client.options.will.topic; let willPayload = this.client.options.will.payload; if (typeof this.events !== 'undefined') this.events.forEach(evt => { if (typeof evt.topics !== 'undefined') evt.topics.forEach(t => { if (typeof t.lastSent !== 'undefined') { let lm = t.lastSent.find(elem => elem.topic === willTopic); if (typeof lm !== 'undefined') { lm.message = willPayload.toString(); } } }); }); } public rootTopic = () => { let toks = {}; let baseOpts = extend(true, { headers: {} }, this.cfg.options, this.context.options); let topic = ''; this.buildTokensWithFormatter(baseOpts.rootTopic, undefined, toks, undefined, undefined, baseOpts.formatter); topic = this.replaceTokens(baseOpts.rootTopic, toks); return topic; } public bindEvent(evt: string, ...data: any) { try { if (!this.sentInitialMessages && evt === 'controller' && data[0].status.val === 1) { // Emitting all the equipment messages state.emitAllEquipmentChanges(); this.sentInitialMessages = true; } // Find the binding by first looking for the specific event name. // If that doesn't exist then look for the "*" (all events). if (typeof this.events !== 'undefined') { if (typeof this.client === 'undefined') this.init(); let evts = this.events.filter(elem => elem.name === evt); // If we don't have an explicitly defined event then see if there is a default. if (evts.length === 0) { let e = this.events.find(elem => elem.name === '*'); evts = e ? [e] : []; } if (evts.length > 0) { let toks = {}; let replacer = ''; for (let i = 0; i < evts.length; i++) { let e = evts[i]; if (typeof e.enabled !== 'undefined' && !e.enabled) continue; let baseOpts = extend(true, { headers: {} }, this.cfg.options, this.context.options); let opts = extend(true, baseOpts, e.options); // Figure out whether we need to check the filter. if (typeof e.filter !== 'undefined') { this.buildTokens(e.filter, evt, toks, e, data[0]); if (eval(this.replaceTokens(e.filter, toks)) === false) continue; } let rootTopic = this.rootTopic(); if (typeof opts.replacer !== 'undefined') replacer = opts.replacer; if (typeof e.topics !== 'undefined') e.topics.forEach(t => { let topicToks = {}; if (typeof t.enabled !== 'undefined' && !t.enabled) return; if (typeof t.filter !== 'undefined') { this.buildTokens(t.filter, evt, topicToks, e, data[0]); if (eval(this.replaceTokens(t.filter, topicToks)) === false) return; } let topicFormatter = t.formatter || opts.formatter; let topic = ''; let message: any; // build tokens for Topic // we need to keep separated topic tokens because otherwise // a value like @bind=data.name; would be eval'd the same // across all topics this.buildTokensWithFormatter(t.topic, evt, topicToks, e, data[0], topicFormatter); topic = this.replaceTokens(t.topic, topicToks); if (t.useRootTopic !== false) topic = `${rootTopic}/${topic}`; // Filter out any topics where there may be undefined in it. We don't want any of this if that is the case. if (topic.endsWith('/undefined') || topic.indexOf('/undefined/') !== -1 || topic.startsWith('null/') || topic.indexOf('/null') !== -1) return; let publishOptions: IClientPublishOptions = { retain: typeof baseOpts.retain !== 'undefined' ? baseOpts.retain : true, qos: typeof baseOpts.qos !== 'undefined' ? baseOpts.qos : 2 }; let changesOnly = typeof baseOpts.changesOnly !== 'undefined' ? baseOpts.changesOnly : true; if (typeof e.options !== 'undefined') { if (typeof e.options.retain !== 'undefined') publishOptions.retain = e.options.retain; if (typeof e.options.qos !== 'undefined') publishOptions.retain = e.options.qos; if (typeof e.options.changesOnly !== 'undefined') changesOnly = e.options.changesOnly; } if (typeof t.options !== 'undefined') { if (typeof t.options.retain !== 'undefined') publishOptions.retain = t.options.retain; if (typeof t.options.qos !== 'undefined') publishOptions.qos = t.options.qos; if (typeof t.options.changeOnly !== 'undefined') changesOnly = t.options.changesOnly; } if (typeof t.processor !== 'undefined') { if (t.ignoreProcessor) message = "err"; else { if (typeof t._fnProcessor !== 'function') { let fnBody = Array.isArray(t.processor) ? t.processor.join('\n') : t.processor; try { // Try to compile it. t._fnProcessor = new Function('ctx', 'pub', 'sys', 'state', 'data', fnBody) as (ctx: any, pub: MQTTPublishTopic, sys: PoolSystem, state: State, data: any) => any; } catch (err) { logger.error(`Error compiling subscription processor: ${err} -- ${fnBody}`); t.ignoreProcessor = true; } } if (typeof t._fnProcessor === 'function') { let vars = this.bindVarTokens(e, evt, data); let ctx = { util: utils, rootTopic: rootTopic, topic: topic, opts: opts, vars: vars } try { message = t._fnProcessor(ctx, t, sys, state, data[0]).toString(); topic = ctx.topic; } catch (err) { logger.error(`Error publishing MQTT data for topic ${t.topic}: ${err.message}`); message = "err"; } } } } else { this.buildTokens(t.message, evt, topicToks, e, data[0]); message = this.tokensReplacer(t.message, evt, topicToks, e, data[0]); } if (changesOnly) { if (typeof t.lastSent === 'undefined') t.lastSent = []; let lm = t.lastSent.find(elem => elem.topic === topic); if (typeof lm === 'undefined' || lm.message !== message) { setImmediate(() => { this.client.publish(topic, message, publishOptions); }); logger.silly(`MQTT send:\ntopic: ${topic}\nmessage: ${message}\nopts:${JSON.stringify(publishOptions)}`); } if (typeof lm === 'undefined') t.lastSent.push({ topic: topic, message: message }); else lm.message = message; } else { logger.silly(`MQTT send:\ntopic: ${topic}\nmessage: ${message}\nopts:${JSON.stringify(publishOptions)}`); setImmediate(() => { this.client.publish(topic, message, publishOptions); }); if (typeof t.lastSent !== 'undefined') t.lastSent = undefined; } }) } } } } catch (err) { logger.error(`Error binding MQTT event ${evt}: ${err.message}`); } } // This needed to be refactored so we could extract it from an anonymous function. We want to be able to unbind // from it private messageHandler = (topic, message) => { (async () => { await this.processMessage(topic, message); })(); } private processMessage = async (topic, message) => { try { if (!state.isInitialized){ logger.info(`MQTT: **TOPIC IGNORED, SYSTEM NOT READY** Inbound ${topic}: ${message.toString()}`); return; } let msg = message.toString(); if (msg[0] === '{') msg = JSON.parse(msg); let sub: MqttTopicSubscription = this.topics.find(elem => topic === elem.topicPath); if (typeof sub !== 'undefined') { logger.debug(`MQTT: Inbound ${topic} ${message.toString()}`); // Alright so now lets process our results. if (typeof sub.fnProcessor === 'function') { sub.executeProcessor(this, msg); return; } } const topics = topic.split('/'); if (topic.startsWith(this.rootTopic() + '/') && typeof msg === 'object') { // RKS: Not sure why there is no processing of state vs config here. Right now the topics are unique // between them so it doesn't matter but it will become an issue. switch (topics[topics.length - 1].toLowerCase()) { case 'setstate': { let id = parseInt(msg.id, 10); if (typeof id !== 'undefined' && isNaN(id)) { logger.error(`Inbound MQTT ${topics} has an invalid id (${id}) in the message (${msg}).`) }; let isOn = typeof msg.isOn !== 'undefined' ? utils.makeBool(msg.isOn) : typeof msg.state !== 'undefined' ? utils.makeBool(msg.state) : undefined; switch (topics[topics.length - 2].toLowerCase()) { case 'circuits': case 'circuit': { try { if(typeof isOn !== 'undefined') await sys.board.circuits.setCircuitStateAsync(id, isOn); } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; } case 'features': case 'feature': { try { if (typeof isOn !== 'undefined') await sys.board.features.setFeatureStateAsync(id, isOn); } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; } case 'lightgroups': case 'lightgroup': { try { if (typeof isOn !== 'undefined') await sys.board.circuits.setLightGroupStateAsync(id, isOn); } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; } case 'circuitgroups': case 'circuitgroup': { try { if (typeof isOn !== 'undefined') await sys.board.circuits.setCircuitGroupStateAsync(id, isOn); } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; } default: logger.warn(`MQTT: Inbound topic ${topics[topics.length - 1]} not matched to event ${topics[topics.length - 2].toLowerCase()}. Message ${msg} `) } break; } case 'togglestate': { let id = parseInt(msg.id, 10); if (typeof id !== 'undefined' && isNaN(id)) { logger.error(`Inbound MQTT ${topics} has an invalid id (${id}) in the message (${msg}).`) }; switch (topics[topics.length - 2].toLowerCase()) { case 'circuits': case 'circuit': try { await sys.board.circuits.toggleCircuitStateAsync(id); } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; case 'features': case 'feature': try { await sys.board.features.toggleFeatureStateAsync(id); } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; default: logger.warn(`MQTT: Inbound topic ${topics[topics.length - 1]} not matched to event ${topics[topics.length - 2].toLowerCase()}. Message ${msg} `) } break; } case 'heatsetpoint': try { let body = sys.bodies.findByObject(msg); if (topics[topics.length - 2].toLowerCase() === 'body') { if (typeof body === 'undefined') { logger.error(new ServiceParameterError(`Cannot set body heatSetpoint. You must supply a valid id, circuit, name, or type for the body`, 'body', 'id', msg.id)); return; } if (typeof msg.setPoint !== 'undefined' || typeof msg.heatSetpoint !== 'undefined') { let setPoint = parseInt(msg.setPoint, 10) || parseInt(msg.heatSetpoint, 10); if (!isNaN(setPoint)) { await sys.board.bodies.setHeatSetpointAsync(body, setPoint); } } } } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; case 'coolsetpoint': try { let body = sys.bodies.findByObject(msg); if (topics[topics.length - 2].toLowerCase() === 'body') { if (typeof body === 'undefined') { logger.error(new ServiceParameterError(`Cannot set body coolSetpoint. You must supply a valid id, circuit, name, or type for the body`, 'body', 'id', msg.id)); return; } if (typeof msg.setPoint !== 'undefined' || typeof msg.coolSetpoint !== 'undefined') { let setPoint = parseInt(msg.coolSetpoint, 10) || parseInt(msg.coolSetpoint, 10); if (!isNaN(setPoint)) { await sys.board.bodies.setCoolSetpointAsync(body, setPoint); } } } } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; case 'setpoint': try { let body = sys.bodies.findByObject(msg); if (topics[topics.length - 2].toLowerCase() === 'body') { if (typeof body === 'undefined') { logger.error(new ServiceParameterError(`Cannot set body setPoint. You must supply a valid id, circuit, name, or type for the body`, 'body', 'id', msg.id)); return; } if (typeof msg.setPoint !== 'undefined' || typeof msg.heatSetpoint !== 'undefined') { let setPoint = parseInt(msg.setPoint, 10) || parseInt(msg.heatSetpoint, 10); if (!isNaN(setPoint)) { await sys.board.bodies.setHeatSetpointAsync(body, setPoint); } } if (typeof msg.coolSetpoint !== 'undefined') { let setPoint = parseInt(msg.coolSetpoint, 10); if (!isNaN(setPoint)) { await sys.board.bodies.setCoolSetpointAsync(body, setPoint); } } } } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; case 'heatmode': try { if (topics[topics.length - 2].toLowerCase() !== 'body') return; // Map the mode that was passed in. This should accept the text based name or the ordinal id value. let mode = parseInt(msg.mode, 10); let val; if (isNaN(mode)) mode = parseInt(msg.heatMode, 10); if (!isNaN(mode)) val = sys.board.valueMaps.heatModes.transform(mode); else val = sys.board.valueMaps.heatModes.transformByName(msg.mode || msg.heatMode); if (typeof val.val === 'undefined') { logger.error(new ServiceParameterError(`Invalid value for heatMode: ${msg.mode}`, 'body', 'heatMode', mode)); return; } mode = val.val; let body = sys.bodies.findByObject(msg); if (typeof body === 'undefined') { logger.error(new ServiceParameterError(`Cannot set body heatMode. You must supply a valid id, circuit, name, or type for the body`, 'body', 'id', msg.id)); return; } let tbody = await sys.board.bodies.setHeatModeAsync(body, mode); } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; case 'chlorinator': try { let schlor = await sys.board.chlorinator.setChlorAsync(msg); } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; case 'chemcontroller': try { await sys.board.chemControllers.setChemControllerAsync(msg); } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; case 'settheme': try { let theme = await state.circuits.setLightThemeAsync(parseInt(msg.id, 10), sys.board.valueMaps.lightThemes.encode(msg.theme)); } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; case 'temp': case 'temps': try { await sys.board.system.setTempsAsync(msg); } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; case 'tempsensor': case 'tempsensors': try { await sys.board.system.setTempSensorsAsync(msg); } catch (err) { logger.error(`Error processing MQTT topic ${topics[topics.length - 2]}: ${err.message}`); } break; default: logger.silly(`MQTT: Inbound MQTT topic not matched: ${topic}: ${message.toString()}`) break; } } } catch (err) { logger.error(`Error processing MQTT request ${topic}: ${err}. ${message}`) } } } class MqttInterfaceEvent extends InterfaceEvent { public topics: MQTTPublishTopic[] } export class MQTTPublishTopic { topic: string; useRootTopic: boolean; message: string; description: string; formatter: any[]; qos: string; retain: boolean; enabled?: boolean; filter?: string; lastSent: MQTTMessage[]; options: any; processor?: string[]; ignoreProcessor: boolean = false; _fnProcessor: (ctx: any, pub: MQTTPublishTopic, sys: PoolSystem, state: State, data: any) => any } class MQTTMessage { topic: string; message: string; } class MqttSubscriptions { public subscriptions: IMQTTSubscription[] } class MqttTopicSubscription implements IInterfaceEvent { root: string; topic: string; enabled: boolean; fnProcessor: (ctx: any, sub: MqttTopicSubscription, sys: PoolSystem, state: State, value: any) => void; options: any = {}; constructor(root: string, sub: any) { this.root = sub.root || root; this.topic = sub.topic; if (typeof sub.processor !== 'undefined') { let fnBody = Array.isArray(sub.processor) ? sub.processor.join('\n') : sub.processor; try { this.fnProcessor = new Function('ctx', 'sub', 'sys', 'state', 'value', fnBody) as (ctx: any, sub: MqttTopicSubscription, sys: PoolSystem, state: State, value: any) => void; } catch (err) { logger.error(`Error compiling subscription processor: ${err} -- ${fnBody}`); } } } public get topicPath(): string { return `${this.root}/${this.topic}` }; public executeProcessor(bindings: MqttInterfaceBindings, value: any) { let baseOpts = extend(true, { headers: {} }, bindings.cfg.options, bindings.context.options); let opts = extend(true, baseOpts, this.options); let vars = bindings.bindVarTokens(this, this.topic, value); let ctx = { util: utils, client: bindings.client, vars: vars || {}, publish: (topic: string, message: any, options?: any) => { try { let msg: string; if (typeof message === 'undefined') msg = ''; else if (typeof message === 'string') msg = message; else if (typeof message === 'boolean') msg = message ? 'true' : 'false'; else if (message instanceof Timestamp) (message as Timestamp).format(); else if (typeof message.getTime === 'function') msg = Timestamp.toISOLocal(message); else { msg = Utils.stringifyJSON(message); } let baseOpts = extend(true, { headers: {} }, bindings.cfg.options, bindings.context.options); let pubOpts: IClientPublishOptions = { retain: typeof baseOpts.retain !== 'undefined' ? baseOpts.retain : true, qos: typeof baseOpts.qos !== 'undefined' ? baseOpts.qos : 2 }; if (typeof options !== 'undefined') { if (typeof options.retain !== 'undefined') pubOpts.retain = options.retain; if (typeof options.qos !== 'undefined') pubOpts.qos = options.qos; if (typeof options.headers !== 'undefined') pubOpts.properties = extend(true, {}, baseOpts.properties, options.properties); } let top = `${this.root}`; if (!top.endsWith('/') && !topic.startsWith('/')) top += '/'; top += topic; logger.silly(`Publishing ${top}-${msg}`); // Now we should be able to send this to the broker. bindings.client.publish(top, msg, pubOpts, (err) => { if (err) { logger.error(`Error publishing topic ${top}-${msg} : ${err}`); } }); } catch (err) { logger.error(`Error publishing ${topic} to server ${bindings.cfg.name} from ${this.topic}`); } } }; this.fnProcessor(ctx, this, sys, state, value); state.emitEquipmentChanges(); } } export interface IMQTTSubscription { topic: string, description: string, processor?: string, enabled?: boolean }