/* nodejs-poolController. An application to control pool equipment.
Copyright (C) 2016, 2017, 2018, 2019, 2020, 2021, 2022.
Russell Goldin, tagyoureit. russ.goldin@gmail.com
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
*/
import { AutoDetectTypes } from '@serialport/bindings-cpp';
import { EventEmitter } from 'events';
import * as net from 'net';
import { SerialPort, SerialPortMock, SerialPortOpenOptions } from 'serialport';
import { setTimeout } from 'timers';
import { config } from '../../config/Config';
import { logger } from '../../logger/Logger';
import { webApp } from "../../web/Server";
import { utils } from "../Constants";
import { sys } from "../Equipment";
import { InvalidEquipmentDataError, InvalidOperationError, OutboundMessageError } from '../Errors';
import { state } from "../State";
import { Inbound, Message, Outbound, Response } from './messages/Messages';
import { sl } from './ScreenLogic';
const extend = require("extend");
export class Connection {
constructor() { }
public rs485Ports: RS485Port[] = [];
public get mock(): boolean {
let port = this.findPortById(0);
return typeof port !== 'undefined' && port.mock ? true : false;
}
public isPortEnabled(portId: number) {
let port: RS485Port = this.findPortById(portId);
return typeof port === 'undefined' ? false : port.enabled && port.isOpen && !port.closing;
}
public async deleteAuxPort(data: any): Promise {
try {
let portId = parseInt(data.portId, 10);
if (isNaN(portId)) return Promise.reject(new InvalidEquipmentDataError(`A valid port id was not provided to be deleted`, 'RS485Port', data.id));
if (portId === 0) return Promise.reject(new InvalidEquipmentDataError(`You may not delete the primart RS485 Port`, 'RS485Port', data.id));
let port = this.findPortById(portId);
this.removePortById(portId);
let section = `controller.comms` + (portId === 0 ? '' : portId);
let cfg = config.getSection(section, {});
config.removeSection(section);
state.equipment.messages.removeItemByCode(`rs485:${portId}:connection`);
return cfg;
} catch (err) { logger.error(`Error deleting aux port`) }
}
public async setScreenlogicAsync(data: any) {
let ccfg = config.getSection('controller.screenlogic');
if (typeof data.type === 'undefined' || data.type !== 'local' || data.type !== 'remote') return Promise.reject(new InvalidEquipmentDataError(`Invalid Screenlogic type (${data.type}). Allowed values are 'local' or 'remote'`, 'Screenlogic', 'screenlogic'));
if ((data.address as string).slice(8) !== 'Pentair:') return Promise.reject(new InvalidEquipmentDataError(`Invalid address (${data.address}). Must start with 'Pentair:'`, 'Screenlogic', 'screenlogic'));
}
public async setPortAsync(data: any): Promise {
try {
let ccfg = config.getSection('controller');
let pConfig;
let portId;
let maxId = -1;
for (let sec in ccfg) {
if (sec.startsWith('comms')) {
let p = ccfg[sec];
maxId = Math.max(p.portId, maxId);
if (p.portId === data.portId) pConfig = p;
}
}
if (typeof pConfig === 'undefined') {
// We are adding a new one.
if (data.portId === -1 || typeof data.portId === 'undefined') portId = maxId + 1;
else portId = data.portId;
}
else portId = pConfig.portId;
if (isNaN(portId) || portId < 0) return Promise.reject(new InvalidEquipmentDataError(`Invalid port id defined ${portId}`, 'RS485Port', data.portId));
let section = `controller.comms` + (portId === 0 ? '' : portId);
// Lets set the config data.
let pdata = config.getSection(section, {
portId: portId,
type: 'local',
rs485Port: "/dev/ttyUSB0",
portSettings: { baudRate: 9600, dataBits: 8, parity: 'none', stopBits: 1, flowControl: false, autoOpen: false, lock: false },
netSettings: { allowHalfOpen: false, keepAlive: false, keepAliveInitialDelay: 1000 },
mock: false,
netConnect: false,
netHost: "raspberrypi",
netPort: 9801,
inactivityRetry: 10
});
if (portId === 0) {
pdata.screenlogic = {
connectionType: "local",
systemName: "Pentair: 00-00-00",
password: 1234
}
}
pdata.enabled = typeof data.enabled !== 'undefined' ? utils.makeBool(data.enabled) : utils.makeBool(pdata.enabled);
pdata.type = data.type;
pdata.netConnect = data.type === 'network' || data.type === 'netConnect'; // typeof data.netConnect !== 'undefined' ? utils.makeBool(data.netConnect) : utils.makeBool(pdata.netConnect);
pdata.rs485Port = typeof data.rs485Port !== 'undefined' ? data.rs485Port : pdata.rs485Port;
pdata.inactivityRetry = typeof data.inactivityRetry === 'number' ? data.inactivityRetry : pdata.inactivityRetry;
pdata.mock = data.mock; // typeof data.mockPort !== 'undefined' ? utils.makeBool(data.mockPort) : utils.makeBool(pdata.mockPort);
if (pdata.mock) { pdata.rs485Port = 'MOCK_PORT'; }
if (pdata.type === 'netConnect') { // (pdata.netConnect) {
pdata.netHost = typeof data.netHost !== 'undefined' ? data.netHost : pdata.netHost;
pdata.netPort = typeof data.netPort === 'number' ? data.netPort : pdata.netPort;
}
if (typeof data.portSettings !== 'undefined') {
pdata.portSettings = extend(true, { baudRate: 9600, dataBits: 8, parity: 'none', stopBits: 1, flowControl: false, autoOpen: false, lock: false }, pdata.portSettings, data.portSettings);
}
if (typeof data.netSettings !== 'undefined') {
pdata.netSettings = extend(true, { keepAlive: false, allowHalfOpen: false, keepAliveInitialDelay: 10000 }, pdata.netSettings, data.netSettings);
}
if (pdata.type === 'screenlogic') {
let password = data.screenlogic.password.toString();
let regx = /Pentair: (?:(?:\d|[A-Z])(?:\d|[A-Z])-){2}(?:\d|[A-Z])(?:\d|[A-Z])/g;
let type = data.screenlogic.connectionType;
let systemName = data.screenlogic.systemName;
if (type !== 'remote' && type !== 'local') return Promise.reject(new InvalidEquipmentDataError(`An invalid type was supplied for Screenlogic ${type}. Must be remote or local.`, 'Screenlogic', data));
if (systemName.match(regx) === null) return Promise.reject(new InvalidEquipmentDataError(`An invalid system name was supplied for Screenlogic ${systemName}}. Must be in the format 'Pentair: xx-xx-xx'.`, 'Screenlogic', data));
if (password.length !== 4) return Promise.reject(new InvalidEquipmentDataError(`An invalid password was supplied for Screenlogic ${password}. (Length must be <= 4)}`, 'Screenlogic', data));
pdata.screenlogic = data.screenlogic;
}
let existing = this.findPortById(portId);
if (typeof existing !== 'undefined')
if (existing.type === 'screenlogic' || sl.enabled) {
await sl.closeAsync();
}
else {
if (!await existing.closeAsync()) {
existing.closing = false; // if closing fails, reset flag so user can try again
return Promise.reject(new InvalidOperationError(`Unable to close the current RS485 port (Try to save the port again as it usually works the second time).`, 'setPortAsync'));
}
}
config.setSection(section, pdata);
let cfg = config.getSection(section, {
type: 'local',
rs485Port: "/dev/ttyUSB0",
portSettings: { baudRate: 9600, dataBits: 8, parity: 'none', stopBits: 1, flowControl: false, autoOpen: false, lock: false },
netSettings: { allowHalfOpen: false, keepAlive: false, keepAliveInitialDelay: 5 },
mock: false,
netConnect: false,
netHost: "raspberrypi",
netPort: 9801,
inactivityRetry: 10
});
if (portId === 0) {
cfg.screenlogic = {
connectionType: "local",
systemName: "Pentair: 00-00-00",
password: 1234
}
}
existing = this.getPortByCfg(cfg);
if (typeof existing !== 'undefined') {
if (pdata.type === 'screenlogic') {
await sl.openAsync();
}
else {
existing.reconnects = 0;
//existing.emitPortStats();
if (!await existing.openAsync(cfg)) {
if (cfg.netConnect) return Promise.reject(new InvalidOperationError(`Unable to open Socat Connection to ${pdata.netHost}`, 'setPortAsync'));
return Promise.reject(new InvalidOperationError(`Unable to open RS485 port ${pdata.rs485Port}`, 'setPortAsync'));
}
}
}
return cfg;
} catch (err) { return Promise.reject(err); }
}
public async stopAsync() {
try {
for (let i = this.rs485Ports.length - 1; i >= 0; i--) {
let port = this.rs485Ports[i];
await port.closeAsync();
}
logger.info(`Closed all serial communications connection.`);
} catch (err) { logger.error(`Error closing comms connection: ${err.message} `); }
}
public async initAsync() {
try {
Message.publishPluginAddress();
// So now that we are now allowing multiple comm ports we need to initialize each one. We are keeping the comms section from the config.json
// simply because I have no idea what the Docker folks do with this. So the default comms will be the one with an OCP or if there are no aux ports.
let cfg = config.getSection('controller');
for (let section in cfg) {
if (section.startsWith('comms')) {
let c = cfg[section];
if (typeof c.type === 'undefined') {
let type = 'local';
if (c.mock) type = 'mock';
else if (c.netConnect) type = 'network';
config.setSection(`controller.${section}`, c);
console.log(section);
console.log(c);
}
let port = new RS485Port(c);
// Alright now lets do some conversion of the existing data.
this.rs485Ports.push(port);
await port.openAsync();
}
}
} catch (err) { logger.error(`Error initializing RS485 ports ${err.message}`); }
}
public findPortById(portId?: number): RS485Port { return this.rs485Ports.find(elem => elem.portId === (portId || 0)); }
public async removePortById(portId: number) {
for (let i = this.rs485Ports.length - 1; i >= 0; i--) {
let port = this.rs485Ports[i];
if (port.portId === portId) {
await port.closeAsync();
// Don't remove the primary port. You cannot delete this one.
if (portId !== 0) this.rs485Ports.splice(i, 1);
}
}
}
public
getPortByCfg(cfg: any) {
let port = this.findPortById(cfg.portId || 0);
if (typeof port === 'undefined') {
port = new RS485Port(cfg);
this.rs485Ports.push(port);
}
return port;
}
public async listInstalledPorts(): Promise {
try {
let ports = [];
// So now that we are now allowing multiple comm ports we need to initialize each one. We are keeping the comms section from the config.json
// simply because I have no idea what the Docker folks do with this. So the default comms will be the one with an OCP or if there are no aux ports.
let cfg = config.getSection('controller');
for (let section in cfg) {
if (section.startsWith('comms')) {
let port = config.getSection(`controller.${section}`);
if (port.portId === 0) port.name = 'Primary';
else port.name = `Aux${port.portId}`;
let p = this.findPortById(port.portId);
port.isOpen = typeof p !== 'undefined' ? p.isOpen : false;
ports.push(port);
}
}
return ports;
} catch (err) { logger.error(`Error listing installed RS485 ports ${err.message}`); }
}
private getBroadcastPorts(currPort: RS485Port) {
// if an ANSLQ25 controller is present, broadcast outbound writes to all other ports that are not mock or dedicated for a pump or chlor
let anslq25port = sys.anslq25.portId;
let duplicateTo: number[] = [];
if (anslq25port >= 0) {
let ports = this.rs485Ports;
for (let i = 0; i < ports.length; i++) {
// if (ports[i].mock) continue;
if (ports[i].portId === currPort.portId) continue;
if (ports[i].portId === anslq25port) continue; // don't resend
if (!ports[i].isOpen) continue;
duplicateTo.push(ports[i].portId);
}
let pumps = sys.pumps.get();
for (let i = 0; i < pumps.length; i++) {
if (pumps[i].portId === currPort.portId ||
pumps[i].portId === anslq25port) {
if (duplicateTo.includes(pumps[i].portId)) duplicateTo.splice(duplicateTo.indexOf(pumps[i].portId, 1));
}
}
let chlors = sys.chlorinators.get();
for (let i = 0; i < chlors.length; i++) {
if (chlors[i].portId === currPort.portId ||
chlors[i].portId === anslq25port) {
if (duplicateTo.includes(chlors[i].portId)) duplicateTo.splice(duplicateTo.indexOf(chlors[i].portId, 1));
}
}
}
// send to the ansql25 port first, where possible
if (currPort.portId !== anslq25port) duplicateTo.unshift(anslq25port);
return duplicateTo;
}
/* public queueInboundToAnslq25(_msg: Inbound) {
// if we have a valid inbound packet on any port (besides dedicated pump/chlor) then also send to anslq25
if (!sys.anslq25.isActive || sys.anslq25.portId < 0 || !sys.anslq25.broadcastComms) return;
if (typeof _msg.isClone !== 'undefined' && _msg.isClone) return;
let anslq25port = sys.anslq25.portId;
if (anslq25port === _msg.portId) return;
let port = this.findPortById(anslq25port);
let msg = _msg.clone();
msg.portId = port.portId;
msg.isClone = true;
msg.id = Message.nextMessageId;
(msg as Inbound).process();
} */
/* public queueInboundToBroadcast(_msg: Outbound) {
// if we have a valid inbound packet on any port (besides dedicated pump/chlor) then also send to anslq25
if (!sys.anslq25.isActive || sys.anslq25.portId < 0 || !sys.anslq25.broadcastComms) return;
if (typeof _msg.isClone !== 'undefined' && _msg.isClone) return;
let anslq25port = sys.anslq25.portId;
if (anslq25port === _msg.portId) return;
let port = this.findPortById(anslq25port);
let msg = _msg.clone();
msg.portId = port.portId;
msg.isClone = true;
msg.id = Message.nextMessageId;
(msg as Inbound).process();
} */
/* public queueOutboundToAnslq25(_msg: Outbound) {
// if we have a valid inbound packet on any port (besides dedicated pump/chlor) then also send to anslq25
if (!sys.anslq25.isActive || sys.anslq25.portId < 0 || !sys.anslq25.broadcastComms) return;
if (typeof _msg.isClone !== 'undefined' && _msg.isClone) return;
let anslq25port = sys.anslq25.portId;
let _ports = this.getBroadcastPorts(this.findPortById(_msg.portId));
let msgs: Outbound[] = [];
for (let i = 0; i < _ports.length; i++) {
let port = this.findPortById(_ports[i]);
if (port.portId === _msg.portId) continue;
let msg = _msg.clone() as Outbound;
msg.isClone = true;
msg.portId = port.portId;
msg.response = _msg.response;
msgs.push(msg);
}
return msgs;
} */
public queueOutboundToBroadcast(_msg: Outbound) {
// if we have a valid inbound packet on any port (besides dedicated pump/chlor) then also send to anslq25
if (!sys.anslq25.isActive || sys.anslq25.portId < 0 || !sys.anslq25.broadcastComms) return;
if (typeof _msg.isClone !== 'undefined' && _msg.isClone) return;
let anslq25port = sys.anslq25.portId;
let _ports = this.getBroadcastPorts(this.findPortById(_msg.portId));
let msgs: Inbound[] = [];
for (let i = 0; i < _ports.length; i++) {
let port = this.findPortById(_ports[i]);
if (port.portId === _msg.portId) continue;
// // let msg = _msg.clone() as Inbound;
// let msg = Message.convertOutboundToInbound(_msg);
// msg.isClone = true;
// msg.portId = port.portId;
// msg.process();
setTimeout(() => { port.pushIn(Buffer.from(_msg.toPacket())) }, 100);
logger.silly(`mock inbound write bytes port:${_msg.portId} id:${_msg.id} bytes:${_msg.toShortPacket()}`)
// logger.packet()
// (msg as Inbound).process();
// msgs.push(msg);
}
// return msgs;
}
public queueSendMessage(msg: Outbound) {
let port = this.findPortById(msg.portId);
if (typeof port !== 'undefined') {
if (port.mock) {
msg.retries = 0;
if (msg.requiresResponse) msg.response = undefined;
}
const vEquip = sys.virtualEquipment;
if (vEquip && vEquip.shouldAnswerOutbound(msg)) {
vEquip.processOutbound(msg);
msg.retries = 0;
if (msg.requiresResponse) msg.response = undefined;
}
port.emitter.emit('messagewrite', msg);
}
else
logger.error(`queueSendMessage: Message was targeted for undefined port ${msg.portId || 0}`);
}
public async queueSendMessageAsync(msg: Outbound): Promise {
return new Promise(async (resolve, reject) => {
let port = this.findPortById(msg.portId);
if (typeof port === 'undefined') {
logger.error(`queueSendMessage: Message was targeted for undefined port ${msg.portId || 0}`);
return;
}
// In mock mode:
// - never retry the same outbound packet multiple times
// - never wait for responses (so API callers get "sent" semantics)
if (port.mock) {
msg.retries = 0;
if (msg.requiresResponse) msg.response = undefined;
}
const vEquip = sys.virtualEquipment;
if (vEquip && vEquip.shouldAnswerOutbound(msg)) {
vEquip.processOutbound(msg);
msg.retries = 0;
if (msg.requiresResponse) msg.response = undefined;
resolve(true);
return;
}
// also send to other broadcast ports
// let msgs = conn.queueOutboundToAnslq25(msg);
let msgs = [];
// conn.queueInboundToBroadcast(msg);
conn.queueOutboundToBroadcast(msg);
/* if (msgs.le
ngth > 0) {
msgs.push(msg);
let promises: Promise[] = [];
for (let i = 0; i < msgs.length; i++) {
let p: Promise = new Promise((_resolve, _reject) => {
msgs[i].onComplete = (err) => {
if (err) {
console.log(`rejecting ${msg.id} ${msg.portId} ${msg.action}`);
_reject(err);
}
else
{
console.log(`resolving id:${msg.id} portid:${msg.portId} dir:${msg.direction} action:${msg.action}`);
_resolve(true);
}
}
let _port = this.findPortById(msgs[i].portId);
_port.emitter.emit('messagewrite', msgs[i]);
});
promises.push(p);
}
let res = false;
await Promise.allSettled(promises).
then((results) => {
results.forEach((result) => {
console.log(result.status);
if (result.status === 'fulfilled') {res = true;}
});
});
if (res) resolve(true); else reject(`No packets had responses.`);
}
else { */
msg.onComplete = (err) => {
if (err) {
reject(err);
}
else resolve(true);
}
port.emitter.emit('messagewrite', msg);
// let ports = this.getBroadcastPorts(port);
//}
})
}
// public sendMockPacket(msg: Inbound) {
// let port = this.findPortById(msg.portId);
// port.emitter.emit('mockmessagewrite', msg);
// }
public pauseAll() {
for (let i = 0; i < this.rs485Ports.length; i++) {
let port = this.rs485Ports[i];
port.pause();
}
}
public resumeAll() {
for (let i = 0; i < this.rs485Ports.length; i++) {
let port = this.rs485Ports[i];
port.resume();
}
}
public async getLocalPortsAsync(): Promise {
try {
return await SerialPort.list();
} catch (err) { logger.error(`Error retrieving local ports ${err.message}`); }
}
}
export class Counter {
constructor() {
this.bytesReceived = 0;
this.recSuccess = 0;
this.recFailed = 0;
this.recCollisions = 0;
this.bytesSent = 0;
this.sndAborted = 0;
this.sndRetries = 0;
this.sndSuccess = 0;
this.recFailureRate = 0;
this.sndFailureRate = 0;
this.recRewinds = 0;
}
public bytesReceived: number;
public bytesSent: number;
public recSuccess: number;
public recFailed: number;
public recCollisions: number;
public recFailureRate: number;
public sndSuccess: number;
public sndAborted: number;
public sndRetries: number;
public sndFailureRate: number;
public recRewinds: number;
public updatefailureRate(): void {
this.recFailureRate = (this.recFailed + this.recSuccess) !== 0 ? (this.recFailed / (this.recFailed + this.recSuccess) * 100) : 0;
this.sndFailureRate = (this.sndAborted + this.sndSuccess) !== 0 ? (this.sndAborted / (this.sndAborted + this.sndSuccess) * 100) : 0;
}
public toLog(): string {
return `{ "bytesReceived": ${this.bytesReceived} "success": ${this.recSuccess}, "failed": ${this.recFailed}, "bytesSent": ${this.bytesSent}, "collisions": ${this.recCollisions}, "failureRate": ${this.recFailureRate.toFixed(2)}% }`;
}
}
// The following class allows njsPC to have multiple RS485 buses. Each port has its own buffer and message processor
// so that devices on the bus can be isolated to a particular port. By doing this the communications are such that multiple
// ports can be used to accommodate differing port speeds and fixed port addresses. If an
export class RS485Port {
constructor(cfg: any) {
this._cfg = cfg;
this.emitter = new EventEmitter();
this._inBuffer = [];
this._outBuffer = [];
this.procTimer = null;
this.emitter.on('messagewrite', (msg) => { this.pushOut(msg); });
this.emitter.on('messagewritepriority', (msg) => {
if (this.isOpen && this.isRTS) {
this.writeMessage(msg);
} else {
this._outBuffer.unshift(msg);
setImmediate(() => { this.processPackets(); });
}
});
this.emitter.on('mockmessagewrite', (msg) => {
let bytes = msg.toPacket();
this.counter.bytesSent += bytes.length;
this.counter.sndSuccess++;
this.emitPortStats();
msg.process();
});
}
public get name(): string { return this.portId === 0 ? 'Primary' : `Aux${this.portId}` }
public isRTS: boolean = true;
public reconnects: number = 0;
public emitter: EventEmitter;
public get portId() { return typeof this._cfg !== 'undefined' && typeof this._cfg.portId !== 'undefined' ? this._cfg.portId : 0; }
public get type() { return typeof this._cfg.type !== 'undefined' ? this._cfg.type : this._cfg.netConnect ? 'netConnect' : this._cfg.mock ? 'mock' : 'local' };
public isOpen: boolean = false;
public closing: boolean = false;
private _cfg: any;
private _port: SerialPort | SerialPortMock | net.Socket;
public mock: boolean = false;
private isPaused: boolean = false;
private connTimer: NodeJS.Timeout;
//public buffer: SendRecieveBuffer;
public get enabled(): boolean { return typeof this._cfg !== 'undefined' && this._cfg.enabled; }
public counter: Counter = new Counter();
private procTimer: NodeJS.Timeout;
public writeTimer: NodeJS.Timeout
private _processing: boolean = false;
private _lastTx: number = 0;
private _lastRx: number = 0;
private _inBytes: number[] = [];
private _inBuffer: number[] = [];
private _outBuffer: Outbound[] = [];
private _waitingPacket: Outbound;
private _msg: Inbound;
// Connection management functions
public async openAsync(cfg?: any): Promise {
if (this.isOpen) await this.closeAsync();
if (typeof cfg !== 'undefined') this._cfg = cfg;
if (!this._cfg.enabled) {
this.emitPortStats();
state.equipment.messages.removeItemByCode(`rs485:${this.portId}:connection`);
return true;
}
if (this._cfg.netConnect && !this._cfg.mock) {
if (typeof this._port !== 'undefined' && this.isOpen) {
// This used to try to reconnect and recreate events even though the socket was already connected. This resulted in
// instances where multiple event processors were present. Node doesn't give us any indication that the socket is
// still viable or if it is closing from either end.
return true;
}
else if (typeof this._port !== 'undefined') {
// We need to kill the existing connection by ending it.
let port = this._port as net.Socket;
await new Promise((resolve, _) => {
port.end(() => {
resolve(true);
});
});
port.destroy();
}
let opts = extend(true, { keepAliveInitialDelay: 0 }, this._cfg.netSettings);
// Convert the initial delay to milliseconds.
if (typeof this._cfg.netSettings !== 'undefined' && typeof this._cfg.netSettings.keepAliveInitialDelay === 'number') opts.keepAliveInitialDelay = this._cfg.netSettings.keepAliveInitialDelay * 1000;
let nc: net.Socket = new net.Socket(opts);
nc.once('connect', () => { logger.info(`Net connect (socat) ${this._cfg.portId} connected to: ${this._cfg.netHost}:${this._cfg.netPort}`); }); // Socket is opened but not yet ready.
nc.once('ready', () => {
this.isOpen = true;
this.isRTS = true;
logger.info(`Net connect (socat) ${this._cfg.portId} ready and communicating: ${this._cfg.netHost}:${this._cfg.netPort}`);
nc.on('data', (data) => {
//this.resetConnTimer();
if (data.length > 0 && !this.isPaused) this.pushIn(data);
});
this.emitPortStats();
this.processPackets(); // if any new packets have been added to queue, process them.
state.equipment.messages.removeItemByCode(`rs485:${this.portId}:connection`);
});
nc.once('close', (p) => {
this.isOpen = false;
if (typeof this._port !== 'undefined' && !this._port.destroyed) this._port.destroy();
this._port = undefined;
this.clearOutboundBuffer();
this.emitPortStats();
if (!this.closing) {
// If we are closing manually this event should have been cleared already and should never be called. If this is fired out
// of sequence then we will check the closing flag to ensure we are not forcibly closing the socket.
if (typeof this.connTimer !== 'undefined' && this.connTimer) {
clearTimeout(this.connTimer);
this.connTimer = null;
}
this.connTimer = setTimeout(async () => {
try {
// We are already closed so give some inactivity retry and try again.
await this.openAsync();
} catch (err) { }
}, this._cfg.inactivityRetry * 1000);
}
logger.info(`Net connect (socat) ${this._cfg.portId} closed ${p === true ? 'due to error' : ''}: ${this._cfg.netHost}:${this._cfg.netPort}`);
});
nc.on('end', () => { // Happens when the other end of the socket closes.
this.isOpen = false;
logger.info(`Net connect (socat) ${this.portId} end event was fired`);
});
//nc.on('drain', () => { logger.info(`The drain event was fired.`); });
//nc.on('lookup', (o) => { logger.info(`The lookup event was fired ${o}`); });
// Occurs when there is no activity. This should not reset the connection, the previous implementation did so and
// left the connection in a weird state where the previous connection was processing events and the new connection was
// doing so as well. This isn't an error it is a warning as the RS485 bus will most likely be communicating at all times.
//nc.on('timeout', () => { logger.warn(`Net connect (socat) Connection Idle: ${this._cfg.netHost}:${this._cfg.netPort}`); });
if (this._cfg.inactivityRetry > 0) {
nc.setTimeout(Math.max(this._cfg.inactivityRetry, 10) * 1000, async () => {
logger.warn(`Net connect (socat) connection idle: ${this._cfg.netHost}:${this._cfg.netPort} retrying connection.`);
try {
await this.closeAsync();
await this.openAsync();
} catch (err) { logger.error(`Net connect (socat)$ {this.portId} error retrying connection ${err.message}`); }
});
}
return await new Promise((resolve, _) => {
// We only connect an error once as we will destroy this connection on error then recreate a new socket on failure.
nc.once('error', (err) => {
logger.error(`Net connect (socat) error: ${err.message}`);
//logger.error(`Net connect (socat) Connection: ${err}. ${this._cfg.inactivityRetry > 0 ? `Retry in ${this._cfg.inactivityRetry} seconds` : `Never retrying; inactivityRetry set to ${this._cfg.inactivityRetry}`}`);
//this.resetConnTimer();
this.isOpen = false;
this.emitPortStats();
this.processPackets(); // if any new packets have been added to queue, process them.
// if the promise has already been fulfilled, but the error happens later, we don't want to call the promise again.
if (typeof resolve !== 'undefined') { resolve(false); }
if (this._cfg.inactivityRetry > 0) {
logger.error(`Net connect (socat) connection ${this.portId} error: ${err}. Retry in ${this._cfg.inactivityRetry} seconds`);
if (this.connTimer) clearTimeout(this.connTimer);
this.connTimer = setTimeout(async () => { try { await this.openAsync(); } catch (err) { } }, this._cfg.inactivityRetry * 1000);
}
else logger.error(`Net connect (socat) connection ${this.portId} error: ${err}. Never retrying -- No retry time set`);
state.equipment.messages.setMessageByCode(`rs485:${this.portId}:connection`, 'error', `${this.name} RS485 port disconnected`);
});
nc.connect(this._cfg.netPort, this._cfg.netHost, () => {
if (typeof this._port !== 'undefined') logger.warn(`Net connect (socat) ${this.portId} recovered from lost connection.`);
logger.info(`Net connect (socat) Connection ${this.portId} connected`);
this._port = nc;
// if just changing existing port, reset key flags
this.isOpen = true;
this.isRTS = true;
this.closing = false;
this._processing = false;
this.emitPortStats();
resolve(true);
resolve = undefined;
});
});
}
else {
if (typeof this._port !== 'undefined' && this.isOpen) {
// This used to try to reconnect even though the serial port was already connected. This resulted in
// instances where an access denied error was emitted. So if the port is open we will simply return.
this.resetConnTimer();
return true;
}
let sp: SerialPort | SerialPortMock = null;
if (this._cfg.mock) {
this.mock = true;
let portPath = 'MOCK_PORT';
SerialPortMock.binding.createPort(portPath)
// SerialPortMock.binding = SerialPortMock;
// SerialPortMock.createPort(portPath, { echo: false, record: true });
let opts: SerialPortOpenOptions = { path: portPath, autoOpen: false, baudRate: 9600 };
sp = new SerialPortMock(opts);
}
else if (this._cfg.type === 'screenlogic') {
return await sl.openAsync();
}
else {
this.mock = false;
let opts: SerialPortOpenOptions = extend(true, { path: this._cfg.rs485Port }, this._cfg.portSettings);
sp = new SerialPort(opts);
}
return await new Promise((resolve, _) => {
// The serial port open method calls the callback just once. Unfortunately that is not the case for
// network serial port connections. There really isn't a way to make it syncronous. The openAsync will truly
// be open if a hardware interface is used and this method returns.
sp.open((err) => {
if (err) {
if (!this.mock) this.resetConnTimer();
this.isOpen = false;
logger.error(`Error opening port ${this.portId}: ${err.message}. ${this._cfg.inactivityRetry > 0 && !this.mock ? `Retry in ${this._cfg.inactivityRetry} seconds` : `Never retrying; (fwiw, inactivityRetry set to ${this._cfg.inactivityRetry})`}`);
resolve(false);
state.equipment.messages.setMessageByCode(`rs485:${this.portId}:connection`, 'error', `${this.name} RS485 port disconnected`);
}
else {
state.equipment.messages.removeItemByCode(`rs485:${this.portId}:connection`);
resolve(true);
}
this.emitPortStats();
});
// The event processors below should not resolve or reject the promise. This is the misnomer with the stupid javascript promise
// structure when dealing with serial ports. The original promise will be either accepted or rejected above with the open method. These
// won't be called until long after the promise is resolved above. Yes we should never reject this promise. The resolution is true
// for a successul connect and false otherwise.
sp.on('open', () => {
if (typeof this._port !== 'undefined') logger.info(`Serial Port ${this.portId}: ${this._cfg.rs485Port} recovered from lost connection.`)
else logger.info(`Serial port: ${sp.path} request to open successful ${sp.baudRate}b ${sp.port.openOptions.dataBits}-${sp.port.openOptions.parity}-${sp.port.openOptions.stopBits}`);
this._port = sp;
this.isOpen = true;
/// if just changing existing port, reset key flags
this.isRTS = true;
this.closing = false;
this._processing = false;
sp.on('data', (data) => {
if (!this.mock && !this.isPaused) this.resetConnTimer();
this.pushIn(data);
});
if (!this.mock) this.resetConnTimer();
this.emitPortStats();
});
sp.on('close', (err) => {
this.isOpen = false;
if (err && err.disconnected) {
logger.info(`Serial Port ${this.portId} - ${this._cfg.rs485Port} has been disconnected and closed. ${JSON.stringify(err)}`)
}
else {
logger.info(`Serial Port ${this.portId} - ${this._cfg.rs485Port} has been closed. ${err ? JSON.stringify(err) : ''}`);
}
});
sp.on('error', (err) => {
// an underlying streams error from a SP write may call the error event
// instead/in leiu of the error callback
if (typeof this.writeTimer !== 'undefined') { clearTimeout(this.writeTimer); this.writeTimer = null; }
this.isOpen = false;
if (sp.isOpen) sp.close((err) => { }); // call this with the error callback so that it doesn't emit to the error again.
if (!this.mock) this.resetConnTimer();
logger.error(`Serial Port ${this.portId}: An error occurred : ${this._cfg.rs485Port}: ${JSON.stringify(err)}`);
this.emitPortStats();
});
});
}
}
public async closeAsync(): Promise {
try {
if (this.closing) return false;
this.closing = true;
if (this.connTimer) clearTimeout(this.connTimer);
if (typeof this._port !== 'undefined' && this.isOpen) {
let success = await new Promise(async (resolve, reject) => {
if (this._cfg.netConnect) {
this._port.removeAllListeners();
this._port.once('error', (err) => {
if (err) {
logger.error(`Error closing ${this.portId} ${this._cfg.netHost}: ${this._cfg.netPort} / ${this._cfg.rs485Port}: ${err}`);
resolve(false);
}
else {
// RSG - per the docs the error event will subsequently
// fire the close event. This block should never be called and
// likely isn't needed; error listener should always have an err passed
this._port.removeAllListeners(); // call again since we added 2x .once below.
this._port = undefined;
this.isOpen = false;
logger.info(`Successfully closed (socat) ${this.portId} port ${this._cfg.netHost}:${this._cfg.netPort} / ${this._cfg.rs485Port}`);
resolve(true);
}
});
this._port.once('end', () => {
logger.info(`Net connect (socat) ${this.portId} closing: ${this._cfg.netHost}:${this._cfg.netPort}`);
});
this._port.once('close', (p) => {
this._port.removeAllListeners(); // call again since we added 2x .once above.
this.isOpen = false;
this._port = undefined;
logger.info(`Net connect (socat) ${this.portId} successfully closed: ${this._cfg.netHost}:${this._cfg.netPort}`);
resolve(true);
});
logger.info(`Net connect (socat) ${this.portId} request close: ${this._cfg.netHost}:${this._cfg.netPort}`);
// Unfortunately the end call does not actually work in node. It will simply not return anything so we are going to
// just call destroy and forcibly close it.
let port = this._port as net.Socket;
await new Promise((resfin, _) => {
port.end(() => {
logger.info(`Net connect (socat) ${this.portId} sent FIN packet: ${this._cfg.netHost}:${this._cfg.netPort}`);
resfin(true);
});
});
if (typeof this._port !== 'undefined') {
logger.info(`Net connect (socat) destroy socket: ${this._cfg.netHost}:${this._cfg.netPort}`);
this._port.destroy();
}
}
else if (!(this._port instanceof net.Socket) && typeof this._port.close === 'function') {
this._port.close((err) => {
if (err) {
logger.error(`Error closing ${this.portId} serial port ${this._cfg.rs485Port}: ${err}`);
resolve(false);
}
else {
this._port.removeAllListeners(); // remove any listeners still around
this._port = undefined;
logger.info(`Successfully closed portId ${this.portId} for serial port ${this._cfg.rs485Port}`);
this.isOpen = false;
resolve(true);
}
});
}
else {
resolve(true);
this._port = undefined;
}
});
if (success) { this.closeBuffer(); }
return success;
}
return true;
} catch (err) { logger.error(`Error closing comms connection ${this.portId}: ${err.message}`); return false; }
finally { this.emitPortStats(); }
}
public pause() { this.isPaused = true; this.clearBuffer(); this.drain(function (err) { }); }
// RKS: Resume is executed in a closure. This is because we want the current async process to complete
// before we resume. This way the messages are cleared right before we restart.
public resume() { if (this.isPaused) setTimeout(() => { this.clearBuffer(); this.isPaused = false; }, 0); }
protected resetConnTimer(...args) {
//console.log(`resetting connection timer`);
if (this.connTimer !== null) clearTimeout(this.connTimer);
if (!this._cfg.mock && this._cfg.inactivityRetry > 0 && !this.closing) this.connTimer = setTimeout(async () => {
try {
if (this._cfg.netConnect)
logger.warn(`Inactivity timeout for ${this.portId} serial port ${this._cfg.netHost}:${this._cfg.netPort}/${this._cfg.rs485Port} after ${this._cfg.inactivityRetry} seconds`);
else
logger.warn(`Inactivity timeout for ${this.portId} serial port ${this._cfg.rs485Port} after ${this._cfg.inactivityRetry} seconds`);
//await this.closeAsync();
this.reconnects++;
await this.openAsync();
}
catch (err) { logger.error(`Error resetting RS485 port on inactivity: ${err.message}`); };
}, this._cfg.inactivityRetry * 1000);
}
// Data management functions
public drain(cb: (err?: Error) => void) {
if (typeof this._port === 'undefined') {
logger.debug(`Serial Port ${this.portId}: Cannot perform drain function on port that is not open.`);
cb();
}
if ((this._port instanceof SerialPort || this._port instanceof SerialPortMock) && typeof (this._port.drain) === 'function')
this._port.drain(cb as (err) => void);
else // Call the method immediately as the port doesn't wait to send.
cb();
}
public write(msg: Outbound, cb: (err?: Error) => void) {
let bytes = Buffer.from(msg.toPacket());
let _cb = cb;
if (this._cfg.netConnect) {
// SOCAT drops the connection and destroys the stream. Could be weeks or as little as a day.
if (typeof this._port === 'undefined' || this._port.destroyed !== false) {
this.openAsync().then(() => {
(this._port as net.Socket).write(bytes, 'binary', cb);
});
}
else
(this._port as net.Socket).write(bytes, 'binary', cb);
}
else {
// For mock ports, we still want to exercise the real outbound send pipeline:
// - log + emit to dashpanel via logger.packet(msg) in writeMessage()
// - write the exact bytes that would have been sent
// Do NOT loop outbound messages back into inbound processing here; mock/replay injects inbound separately.
this.writeTimer = setTimeout(() => {
// RSG - I ran into a scenario where the underlying stream
// processor was not retuning the CB and comms would
// completely stop. This timeout is a failsafe.
// Further, the underlying stream may throw an event error
// and not call the callback (per node docs) hence the
// public writeTimer.
if (typeof cb === 'function') {
cb = undefined;
_cb(new Error(`Serialport stream has not called the callback in 3s.`));
}
}, 3000);
(this._port as any).write(bytes, (err) => {
if (typeof this.writeTimer !== 'undefined') {
clearTimeout(this.writeTimer);
this.writeTimer = null;
if (typeof cb === 'function') {
cb = undefined;
_cb(err);
}
}
});
}
}
// make public for now; should enable writing directly to mock port at Conn level...
public pushIn(pkt: Buffer) {
this._inBuffer.push.apply(this._inBuffer, pkt.toJSON().data);
this._lastRx = Date.now();
if (sys.isReady) setImmediate(() => { this.processPackets(); });
}
private pushOut(msg) {
this._outBuffer.push(msg); setImmediate(() => { this.processPackets(); });
}
private clearBuffer() { this._inBuffer.length = 0; this.clearOutboundBuffer(); }
private closeBuffer() { clearTimeout(this.procTimer); this.clearBuffer(); this._msg = undefined; }
private clearOutboundBuffer() {
// let processing = this._processing; // we are closing the port. don't need to reinstate this status afterwards
clearTimeout(this.procTimer);
this.procTimer = null;
this._processing = true;
this.isRTS = false;
let msg: Outbound = typeof this._waitingPacket !== 'undefined' ? this._waitingPacket : this._outBuffer.shift();
this._waitingPacket = null;
while (typeof msg !== 'undefined' && msg) {
// Fail the message.
msg.failed = true;
if (typeof msg.onAbort === 'function') msg.onAbort();
else logger.warn(`Message cleared from outbound buffer: ${msg.toShortPacket()} `);
let err = new OutboundMessageError(msg, `Message cleared from outbound buffer: ${msg.toShortPacket()} `);
if (typeof msg.onComplete === 'function') msg.onComplete(err, undefined);
if (msg.requiresResponse) {
// Wait for this current process to complete then bombard all the processes with the callback.
if (msg.response instanceof Response && typeof (msg.response.callback) === 'function') setImmediate(msg.response.callback, msg);
}
this.counter.sndAborted++;
msg = this._outBuffer.shift();
}
//this._processing = false; // processing; - we are closing the port
//this.isRTS = true; // - we are closing the port
}
private processPackets() {
if (this._processing || this.closing) return;
if (this.procTimer) {
clearTimeout(this.procTimer);
this.procTimer = null;
}
this._processing = true;
this.processInboundPackets();
this.processOutboundPackets();
this._processing = false;
}
private processWaitPacket(): boolean {
if (typeof this._waitingPacket !== 'undefined' && this._waitingPacket) {
let timeout = this._waitingPacket.timeout || 1000;
let dt = new Date();
if (this._waitingPacket.timestamp.getTime() + timeout < dt.getTime()) {
if (this._waitingPacket.remainingTries > 0) {
logger.silly(`Retrying outbound message after ${(dt.getTime() - this._waitingPacket.timestamp.getTime()) / 1000} secs with ${this._waitingPacket.remainingTries} attempt(s) left. - ${this._waitingPacket.toShortPacket()} `);
this.counter.sndRetries++;
this.writeMessage(this._waitingPacket);
}
else {
// No retries remaining; fail the message (writeMessage will abort without writing).
logger.silly(`Outbound message timed out after ${(dt.getTime() - this._waitingPacket.timestamp.getTime()) / 1000} secs with no retries remaining. - ${this._waitingPacket.toShortPacket()} `);
this.writeMessage(this._waitingPacket);
}
}
return true;
}
return false;
}
protected processOutboundPackets() {
let msg: Outbound;
if (!this.processWaitPacket() && this._outBuffer.length > 0) {
if (this.isOpen || this.closing) {
if (this.isRTS) {
msg = this._outBuffer.shift();
if (typeof msg === 'undefined' || !msg) return;
// If the serial port is busy we don't want to process any outbound. However, this used to
// not process the outbound even when the incoming bytes didn't mean anything. Now we only delay
// the outbound when we actually have a message signatures to process.
this.writeMessage(msg);
}
}
else {
// port is closed, reject message
msg = this._outBuffer.shift();
msg.failed = true;
logger.warn(`Comms port ${msg.portId} is not open. Message aborted: ${msg.toShortPacket()} `);
// This is a hard fail. We don't have any more tries left and the message didn't
// make it onto the wire.
if (typeof msg.onAbort === 'function') msg.onAbort();
else logger.warn(`Message aborted after ${msg.tries} attempt(s): ${msg.toShortPacket()} `);
let error = new OutboundMessageError(msg, `Comms port ${msg.portId} is not open. Message aborted: ${msg.toShortPacket()} `);
if (typeof msg.onComplete === 'function') msg.onComplete(error, undefined);
this._waitingPacket = null;
this.counter.sndAborted++;
this.counter.updatefailureRate();
this.emitPortStats();
// return; // if port isn't open, do not continue and setTimeout
}
}
// RG: added the last `|| typeof msg !== 'undef'` because virtual chem controller only sends a single packet
// but this condition would be eval'd before the callback of port.write was calls and the outbound packet
// would be sitting idle for eternity.
if (this._outBuffer.length > 0 || typeof this._waitingPacket !== 'undefined' || this._waitingPacket || typeof msg !== 'undefined') {
// Configurable inter-frame delay (default 30ms) overrides fixed 100ms.
const dCfg = (config.getSection('controller').txDelays || {});
const interFrame = Math.max(0, Number(dCfg.interFrameDelayMs || 30));
let self = this;
this.procTimer = setTimeout(() => self.processPackets(), interFrame);
}
}
private writeMessage(msg: Outbound) {
// Make sure we are not re-entrant while the the port.write is going on.
// This ends in goofiness as it can send more than one message at a time while it
// waits for the command buffer to be flushed. NOTE: There is no success message and the callback to
// write only verifies that the buffer got ahold of it.
let self = this;
try {
if (!this.isRTS || this.closing) return;
var bytes = msg.toPacket();
if (this.isOpen) {
this.isRTS = false; // only set if port is open, otherwise it won't be set back to true
// ISSUE-121: Close the race window between _outBuffer.shift() (in processOutboundPackets)
// and completeWrite setting _waitingPacket. The mock port (and fast hardware) can deliver
// the inbound response BEFORE the async port.write callback fires, leaving the message
// untracked in clearResponses(). Set _waitingPacket here, before port.write, so any
// inbound matcher always finds it.
if (msg.requiresResponse && msg.remainingTries > 0) {
this._waitingPacket = msg;
}
if (msg.remainingTries <= 0) {
// It will almost never fall into here. The rare case where
// we have an RTS semaphore and a waiting response might make it go here.
msg.failed = true;
this._waitingPacket = null;
if (typeof msg.onAbort === 'function') msg.onAbort();
else logger.warn(`Message aborted after ${msg.tries} attempt(s): ${msg.toShortPacket()} `);
let err = new OutboundMessageError(msg, `Message aborted after ${msg.tries} attempt(s): ${msg.toShortPacket()} `);
if (typeof msg.onComplete === 'function') msg.onComplete(err, undefined);
if (msg.requiresResponse) {
if (msg.response instanceof Response && typeof (msg.response.callback) === 'function') {
setTimeout(msg.response.callback, 100, msg);
}
}
this.counter.sndAborted++;
this.isRTS = true;
return;
}
const dCfg = (config.getSection('controller').txDelays || {});
const idleBeforeTx = Math.max(0, Number(dCfg.idleBeforeTxMs || 0));
const interByte = Math.max(0, Number(dCfg.interByteDelayMs || 0));
const now = Date.now();
const idleElapsed = now - Math.max(this._lastTx, this._lastRx);
const doWrite = () => {
this.counter.bytesSent += bytes.length;
msg.timestamp = new Date();
logger.packet(msg);
if (interByte > 0 && bytes.length > 1 && this._port && (this._port instanceof SerialPort || this._port instanceof SerialPortMock)) {
// Manual inter-byte pacing
let idx = 0;
const writeNext = () => {
if (idx >= bytes.length) {
this._lastTx = Date.now();
completeWrite(undefined);
return;
}
const b = Buffer.from([bytes[idx++]]);
(this._port as any).write(b, (err) => {
if (err) {
this._lastTx = Date.now();
completeWrite(err);
return;
}
if (interByte > 0) setTimeout(writeNext, interByte);
else setImmediate(writeNext);
});
};
writeNext();
} else {
this.write(msg, (err) => {
this._lastTx = Date.now();
completeWrite(err);
});
}
};
const completeWrite = (err?: Error) => {
clearTimeout(this.writeTimer);
this.writeTimer = null;
msg.tries++;
this.isRTS = true;
if (err) {
if (msg.remainingTries > 0) self._waitingPacket = msg;
else {
msg.failed = true;
logger.warn(`Message aborted after ${msg.tries} attempt(s): ${bytes}: ${err} `);
// this is a hard fail. We don't have any more tries left and the message didn't
// make it onto the wire.
let error = new OutboundMessageError(msg, `Message aborted after ${msg.tries} attempt(s): ${err} `);
if (typeof msg.onComplete === 'function') msg.onComplete(error, undefined);
// ISSUE-121: Only clear _waitingPacket if it actually points to this msg.
// A non-response write must not wipe an unrelated config request that's awaiting reply.
if (self._waitingPacket === msg) self._waitingPacket = null;
self.counter.sndAborted++;
}
}
else {
logger.verbose(`Wrote packet [Port ${this.portId} id: ${msg.id}] [${bytes}].Retries remaining: ${msg.remainingTries} `);
// We have all the success we are going to get so if the call succeeded then
// don't set the waiting packet when we aren't actually waiting for a response.
if (!msg.requiresResponse) {
// ISSUE-121: As far as we know the message made it to OCP.
// Only clear _waitingPacket if it refers to THIS msg — pump answers and other
// fire-and-forget writes must not clobber a different msg that is genuinely waiting
// for a response (e.g. config Action 222 that we set as _waitingPacket pre-write).
if (self._waitingPacket === msg) self._waitingPacket = null;
self.counter.sndSuccess++;
if (typeof msg.onComplete === 'function') msg.onComplete(err, undefined);
}
else if (msg.remainingTries >= 0) self._waitingPacket = msg;
}
self.counter.updatefailureRate();
self.emitPortStats();
};
// Honor idle-before-TX if not enough bus quiet time has elapsed
if (idleBeforeTx > 0 && idleElapsed < idleBeforeTx) {
const wait = idleBeforeTx - idleElapsed;
setTimeout(doWrite, wait);
} else doWrite();
}
}
catch (err) {
logger.error(`Error sending message: ${err.message}
for message: ${msg.toShortPacket()}`)
// the show, err, messages, must go on!
if (this.isOpen) {
clearTimeout(this.writeTimer);
this.writeTimer = null;
msg.tries++;
this.isRTS = true;
msg.failed = true;
// this is a hard fail. We don't have any more tries left and the message didn't
// make it onto the wire.
let error = new OutboundMessageError(msg, `Message aborted after ${msg.tries} attempt(s): ${err} `);
if (typeof msg.onComplete === 'function') msg.onComplete(error, undefined);
this._waitingPacket = null;
this.counter.sndAborted++;
}
}
}
private clearResponses(msgIn: Inbound) {
// ISSUE-121: Original guard `_outBuffer.length === 0 && _waitingPacket` was buggy
// (always false unless waitingPacket existed AND buffer was empty).
// Correct early-exit: nothing to match against.
if (this._outBuffer.length === 0 && (typeof this._waitingPacket === 'undefined' || this._waitingPacket === null)) return;
var callback;
let msgOut = this._waitingPacket;
if (typeof (this._waitingPacket) !== 'undefined' && this._waitingPacket) {
var resp = msgOut.response;
if (msgOut.requiresResponse) {
if (resp instanceof Response && resp.isResponse(msgIn, msgOut)) {
this._waitingPacket = null;
if (typeof msgOut.onComplete === 'function') msgOut.onComplete(undefined, msgIn);
callback = resp.callback;
resp.message = msgIn;
this.counter.sndSuccess++;
if (resp.ack) this.pushOut(resp.ack);
}
}
}
// Go through and remove all the packets that need to be removed from the queue.
// RG - when would there be additional packets besides the first in the outbuffer that needs to be removed from a single incoming packet?
// RKS: This occurs when two of the same message signature is thrown onto the queue. Most often when there is a queue full of configuration requests. The
// triggers that cause the outbound message may come at the same time that another controller makes a call.
var i = this._outBuffer.length - 1;
while (i >= 0) {
const out = this._outBuffer[i];
if (typeof out === 'undefined') {
i--;
continue;
}
let resp = out.response;
// RG - added check for msgOut because the *Touch chlor packet 153 adds an status packet 217
// but if it is the only packet on the queue the outbound will have been cleared out already.
// ISSUE-121: scope check now uses out.scope only — do NOT require scope match against msgOut
// (which is _waitingPacket, often unrelated to the buffered out being matched).
if (out.requiresResponse) {
if (resp instanceof Response && resp.isResponse(msgIn, out)) {
resp.message = msgIn;
if (typeof (resp.callback) === 'function' && resp.callback) callback = resp.callback;
this._outBuffer.splice(i, 1);
// Resolve any async sender whose queued duplicate was satisfied by this inbound response.
if (typeof out.onComplete === 'function') out.onComplete(undefined, msgIn);
}
}
i--;
}
// RKS: This callback is important because we are managing queues. The position of this callback
// occurs after all things related to the message have been processed including removal of subsequent
// messages from the queue. This is because another panel on the bus may throw additional messages
// that we also need. This occurs when more than one panel on the bus requests a reconfig at the same time.
if (typeof (callback) === 'function') { setTimeout(callback, 100, msgOut); }
}
public get stats() {
let status = this.isOpen ? 'open' : this._cfg.enabled ? 'closed' : 'disabled';
return extend(true, { portId: this.portId, status: status, reconnects: this.reconnects }, this.counter)
}
public emitPortStats() {
webApp.emitToChannel('rs485PortStats', 'rs485Stats', this.stats);
}
private processCompletedMessage(msg: Inbound, ndx): number {
msg.timestamp = new Date();
msg.portId = this.portId;
msg.id = Message.nextMessageId;
//console.log(`msg id ${msg.id} assigned to port${msg.portId} action:${msg.action} ${msg.toShortPacket()}`)
this.counter.recCollisions += msg.collisions;
this.counter.recRewinds += msg.rewinds;
this.emitPortStats();
if (msg.isValid) {
this.counter.recSuccess++;
this.counter.updatefailureRate();
msg.process();
//conn.queueInboundToAnslq25(msg);
this.clearResponses(msg);
}
else {
this.counter.recFailed++;
this.counter.updatefailureRate();
console.log('RS485 Stats:' + this.counter.toLog());
ndx = this.rewindFailedMessage(msg, ndx);
}
logger.packet(msg); // RSG - Moving this after msg clearing responses so emit will include responseFor data
return ndx;
}
private rewindFailedMessage(msg: Inbound, ndx: number): number {
this.counter.recRewinds++;
// Lets see if we can do a rewind to capture another message from the
// crap on the bus. This will get us to the innermost message. While the outer message may have failed the inner message should
// be able to buck up and make it happen.
this._inBytes = this._inBytes.slice(ndx); // Start by removing all of the bytes related to the original message.
// Add all of the elements of the message back in reverse.
this._inBytes.unshift(...msg.term);
this._inBytes.unshift(...msg.payload);
this._inBytes.unshift(...msg.header.slice(1)); // Trim off the first byte from the header. This means it won't find 16,2 or start with a 165. The
// algorithm looks for the header bytes to determine the protocol so the rewind shouldn't include the 16 in 16,2 otherwise it will just keep rewinding.
this._msg = msg = new Inbound();
ndx = msg.readPacket(this._inBytes);
if (msg.isComplete) { ndx = this.processCompletedMessage(msg, ndx); }
return ndx;
}
protected processInboundPackets() {
this.counter.bytesReceived += this._inBuffer.length;
this._inBytes.push.apply(this._inBytes, this._inBuffer.splice(0, this._inBuffer.length));
if (this._inBytes.length >= 1) { // Wait until we have something to process.
let ndx: number = 0;
let msg: Inbound = this._msg;
do {
if (typeof (msg) === 'undefined' || msg === null || msg.isComplete || !msg.isValid) {
this._msg = msg = new Inbound();
ndx = msg.readPacket(this._inBytes);
}
else ndx = msg.mergeBytes(this._inBytes);
if (msg.isComplete) ndx = this.processCompletedMessage(msg, ndx);
if (ndx > 0) {
this._inBytes = this._inBytes.slice(ndx);
ndx = 0;
}
else break;
} while (ndx < this._inBytes.length);
}
}
public hasAssignedEquipment() {
let pumps = sys.pumps.get();
for (let i = 0; i < pumps.length; i++) {
if (pumps[i].portId === this.portId) {
return true;
}
}
let chlors = sys.chlorinators.get();
for (let i = 0; i < chlors.length; i++) {
if (chlors[i].portId === this.portId) {
return true;
}
}
return false;
}
}
export var conn: Connection = new Connection();