/* nodejs-poolController. An application to control pool equipment.
Copyright (C) 2016, 2017, 2018, 2019, 2020, 2021, 2022.
Russell Goldin, tagyoureit. russ.goldin@gmail.com
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
*/
import * as path from 'path';
import * as fs from 'fs';
import { Direction, Inbound, Message, Outbound, Protocol } from '../comms/messages/Messages';
import { logger } from '../../logger/Logger';
import { webApp } from '../../web/Server';
import { VirtualPump } from './pumps/VirtualPump';
import { VirtualPumpVS } from './pumps/VirtualPumpVS';
import { VirtualChlorinator } from './chlorinators/VirtualChlorinator';
import { VirtualIntelliChem } from './intellichem/VirtualIntelliChem';
/**
* VirtualEquipmentManager
*
* Simulates downstream bus-attached equipment (pumps, chlorinators, etc.) that
* an upstream master (real OCP or njsPC/Nixie) believes is physically present.
*
* This is NOT configured in poolConfig.json and does NOT appear anywhere in
* sys.* or state.* equipment collections. It is a wire-level impersonator
* controlled via REST endpoints under /config/virtualEquipment and persisted
* in its own file (data/virtualEquipment.json).
*
* Terminology:
* - "intent" — what the user asked for via REST (enabled=true/false)
* - "autoDisabled" — set true when a collision with a real device is
* detected on the bus. Requires an explicit REST
* re-enable to clear.
* - "effective" — enabled && !autoDisabled
*/
export class VirtualEquipmentManager {
public static readonly CONFLICT_WINDOW_MS = 1000;
private _pumps: VirtualPump[] = [];
private _chlorinators: VirtualChlorinator[] = [];
private _intellichems: VirtualIntelliChem[] = [];
private _filePath: string;
private _loaded = false;
private _saveTimer: NodeJS.Timeout | null = null;
constructor(dataDir?: string) {
this._filePath = path.posix.join(dataDir || path.posix.join(process.cwd(), 'data'), 'virtualEquipment.json');
}
public get pumps(): VirtualPump[] { return this._pumps; }
public get chlorinators(): VirtualChlorinator[] { return this._chlorinators; }
public get intellichems(): VirtualIntelliChem[] { return this._intellichems; }
public get filePath(): string { return this._filePath; }
public async loadAsync(): Promise {
try {
if (!fs.existsSync(this._filePath)) {
this._loaded = true;
try {
await this.saveAsync();
logger.info(`VirtualEquipment: created empty ${this._filePath} (no virtual devices configured)`);
} catch (err) {
logger.warn(`VirtualEquipment: could not create ${this._filePath}: ${(err as Error).message}`);
}
return;
}
const raw = fs.readFileSync(this._filePath, 'utf8') || '{}';
const parsed = JSON.parse(raw);
const pumpDefs: any[] = Array.isArray(parsed.pumps) ? parsed.pumps : [];
for (const def of pumpDefs) {
try {
const pump = this._constructPump(def);
if (pump) this._pumps.push(pump);
} catch (err) {
logger.warn(`VirtualEquipment: skipping bad pump definition ${JSON.stringify(def)}: ${(err as Error).message}`);
}
}
const chlorDefs: any[] = Array.isArray(parsed.chlorinators) ? parsed.chlorinators : [];
for (const def of chlorDefs) {
try {
const chlor = this._constructChlorinator(def);
if (chlor) this._chlorinators.push(chlor);
} catch (err) {
logger.warn(`VirtualEquipment: skipping bad chlorinator definition ${JSON.stringify(def)}: ${(err as Error).message}`);
}
}
const ichemDefs: any[] = Array.isArray(parsed.intellichems) ? parsed.intellichems : [];
for (const def of ichemDefs) {
try {
const ic = new VirtualIntelliChem(def);
this._intellichems.push(ic);
} catch (err) {
logger.warn(`VirtualEquipment: skipping bad intellichem definition ${JSON.stringify(def)}: ${(err as Error).message}`);
}
}
this._loaded = true;
const effectivePumps = this._pumps.filter(p => p.isEffective).length;
const effectiveChlors = this._chlorinators.filter(c => c.isEffective).length;
const effectiveIChems = this._intellichems.filter(ic => ic.isEffective).length;
logger.info(`VirtualEquipment: loaded ${this._pumps.length} pump(s) (${effectivePumps} effective), ${this._chlorinators.length} chlorinator(s) (${effectiveChlors} effective), ${this._intellichems.length} intellichem(s) (${effectiveIChems} effective)`);
} catch (err) {
logger.error(`VirtualEquipment: failed to load ${this._filePath}: ${(err as Error).message}`);
}
}
private _constructPump(def: any): VirtualPump | null {
if (typeof def.address !== 'number') throw new Error('address is required');
const type = (def.type || 'vs').toLowerCase();
switch (type) {
case 'vs':
return new VirtualPumpVS({
address: def.address,
portId: typeof def.portId === 'number' ? def.portId : 0,
enabled: def.enabled === true,
autoDisabled: def.autoDisabled === true,
autoDisabledAt: def.autoDisabledAt || null,
autoDisabledReason: def.autoDisabledReason || null,
wattModel: def.wattModel || 'cheap'
});
default:
throw new Error(`unsupported virtual pump type "${type}"`);
}
}
private _constructChlorinator(def: any): VirtualChlorinator | null {
if (typeof def.address !== 'number') throw new Error('address is required');
return new VirtualChlorinator({
address: def.address,
portId: typeof def.portId === 'number' ? def.portId : 0,
enabled: def.enabled === true,
autoDisabled: def.autoDisabled === true,
autoDisabledAt: def.autoDisabledAt || null,
autoDisabledReason: def.autoDisabledReason || null,
saltLevel: typeof def.saltLevel === 'number' ? def.saltLevel : 3400,
modelName: def.modelName || 'Intellichlor--40'
});
}
public shouldAnswer(msg: Inbound): boolean {
if (msg.protocol === Protocol.Pump) {
const pump = this.findEffectivePumpByAddress(msg.dest);
if (!pump) return false;
if (msg.source !== 16 && msg.source !== Message.pluginAddress) return false;
return pump.supportsAction(msg.action);
}
if (msg.protocol === Protocol.Chlorinator) {
if (msg.dest < 80 || msg.dest > 83) return false;
const chlor = this.findEffectiveChlorinatorByAddress(msg.dest);
if (!chlor) return false;
return chlor.supportsAction(msg.action);
}
if (msg.protocol === Protocol.Broadcast && msg.dest >= 144 && msg.dest <= 158) {
const ic = this.findEffectiveIntelliChemByAddress(msg.dest);
if (!ic) return false;
return ic.supportsAction(msg.action);
}
if (msg.protocol === Protocol.IntelliChem) {
const ic = this.findEffectiveIntelliChemByAddress(msg.dest);
if (!ic) return false;
return ic.supportsAction(msg.action);
}
return false;
}
public process(msg: Inbound): void {
if (msg.protocol === Protocol.Pump) {
const pump = this.findEffectivePumpByAddress(msg.dest);
if (!pump) return;
try {
pump.process(msg);
this.emit();
} catch (err) {
logger.error(`VirtualEquipment: pump at address ${pump.address} failed to process action ${msg.action}: ${(err as Error).message}`);
}
} else if (msg.protocol === Protocol.Chlorinator) {
const chlor = this.findEffectiveChlorinatorByAddress(msg.dest);
if (!chlor) return;
try {
chlor.process(msg);
this.emit();
} catch (err) {
logger.error(`VirtualEquipment: chlorinator at address ${chlor.address} failed to process action ${msg.action}: ${(err as Error).message}`);
}
} else if (msg.protocol === Protocol.Broadcast && msg.dest >= 144 && msg.dest <= 158) {
const ic = this.findEffectiveIntelliChemByAddress(msg.dest);
if (!ic) return;
try {
ic.process(msg);
if (ic._dirty) { ic._dirty = false; this._debounceSave(); }
this.emit();
} catch (err) {
logger.error(`VirtualEquipment: intellichem at address ${ic.address} failed to process action ${msg.action}: ${(err as Error).message}`);
}
} else if (msg.protocol === Protocol.IntelliChem) {
const ic = this.findEffectiveIntelliChemByAddress(msg.dest);
if (!ic) return;
try {
ic.process(msg);
if (ic._dirty) { ic._dirty = false; this._debounceSave(); }
this.emit();
} catch (err) {
logger.error(`VirtualEquipment: intellichem at address ${ic.address} failed to process action ${msg.action}: ${(err as Error).message}`);
}
}
}
public observe(msg: Inbound): void {
if (msg.protocol === Protocol.Pump) {
const pump = this.findPumpByAddress(msg.source);
if (!pump || !pump.isEffective) return;
const now = Date.now();
pump.pushRecentInboundEcho(now);
const windowStart = now - VirtualEquipmentManager.CONFLICT_WINDOW_MS;
const echoes = pump.recentEchoes.filter(t => t >= windowStart);
if (echoes.length >= 2) {
const reason = `Collision: ${echoes.length} inbound packets with source=${pump.address} within ${VirtualEquipmentManager.CONFLICT_WINDOW_MS}ms — a real pump is likely on the bus.`;
pump.setAutoDisabled(true, reason);
logger.warn(`VirtualEquipment: auto-disabling pump at address ${pump.address}. ${reason}`);
this.saveAsync().catch(e => logger.error(`VirtualEquipment: save after auto-disable failed: ${e.message}`));
this.emit();
}
} else if (msg.protocol === Protocol.Chlorinator) {
if (msg.dest >= 80 && msg.dest <= 83) return;
const chlor = this.findChlorinatorByAddress(80);
if (!chlor || !chlor.isEffective) return;
const now = Date.now();
chlor.pushRecentInboundEcho(now);
const windowStart = now - VirtualEquipmentManager.CONFLICT_WINDOW_MS;
const echoes = chlor.recentEchoes.filter(t => t >= windowStart);
if (echoes.length >= 2) {
const reason = `Collision: ${echoes.length} chlorinator response packets within ${VirtualEquipmentManager.CONFLICT_WINDOW_MS}ms — a real chlorinator is likely on the bus.`;
chlor.setAutoDisabled(true, reason);
logger.warn(`VirtualEquipment: auto-disabling chlorinator at address ${chlor.address}. ${reason}`);
this.saveAsync().catch(e => logger.error(`VirtualEquipment: save after auto-disable failed: ${e.message}`));
this.emit();
}
}
}
public shouldAnswerOutbound(msg: Outbound): boolean {
const synth = this._outboundToInbound(msg);
return this.shouldAnswer(synth);
}
public processOutbound(msg: Outbound): void {
const synth = this._outboundToInbound(msg);
this.process(synth);
}
private _outboundToInbound(msg: Outbound): Inbound {
const inbound = new Inbound();
inbound.protocol = msg.protocol;
inbound.direction = Direction.In;
inbound.portId = msg.portId;
inbound.preamble = msg.preamble.slice();
inbound.header = msg.header.slice();
inbound.payload = msg.payload.slice();
inbound.term = msg.term.slice();
inbound.isValid = true;
inbound.timestamp = new Date();
return inbound;
}
public findPumpByAddress(address: number): VirtualPump | undefined {
return this._pumps.find(p => p.address === address);
}
public findEffectivePumpByAddress(address: number): VirtualPump | undefined {
const p = this.findPumpByAddress(address);
return p && p.isEffective ? p : undefined;
}
public findChlorinatorByAddress(address: number): VirtualChlorinator | undefined {
return this._chlorinators.find(c => c.address === address);
}
public findEffectiveChlorinatorByAddress(address: number): VirtualChlorinator | undefined {
const c = this.findChlorinatorByAddress(address);
return c && c.isEffective ? c : undefined;
}
public findIntelliChemByAddress(address: number): VirtualIntelliChem | undefined {
return this._intellichems.find(ic => ic.address === address);
}
public findEffectiveIntelliChemByAddress(address: number): VirtualIntelliChem | undefined {
const ic = this.findIntelliChemByAddress(address);
return ic && ic.isEffective ? ic : undefined;
}
public async upsertPumpAsync(def: any): Promise {
if (typeof def.address !== 'number') throw new Error('address is required');
const type = (def.type || 'vs').toLowerCase();
let pump = this.findPumpByAddress(def.address);
if (pump) {
if (pump.type !== type) {
this._pumps = this._pumps.filter(p => p !== pump);
pump = null;
} else {
pump.applyUserConfig({
enabled: def.enabled === true,
portId: typeof def.portId === 'number' ? def.portId : pump.portId,
wattModel: def.wattModel || pump.wattModel
});
pump.clearAutoDisabled();
}
}
if (!pump) {
pump = this._constructPump({ ...def, autoDisabled: false });
this._pumps.push(pump);
}
await this.saveAsync();
this.emit();
return pump;
}
public async deletePumpAsync(address: number): Promise {
const before = this._pumps.length;
this._pumps = this._pumps.filter(p => p.address !== address);
if (this._pumps.length !== before) {
await this.saveAsync();
this.emit();
}
}
public async reenablePumpAsync(address: number): Promise {
const pump = this.findPumpByAddress(address);
if (!pump) return undefined;
pump.clearAutoDisabled();
await this.saveAsync();
this.emit();
return pump;
}
public async upsertChlorinatorAsync(def: any): Promise {
if (typeof def.address !== 'number') throw new Error('address is required');
let chlor = this.findChlorinatorByAddress(def.address);
if (chlor) {
chlor.applyUserConfig({
enabled: def.enabled === true,
portId: typeof def.portId === 'number' ? def.portId : chlor.portId,
saltLevel: typeof def.saltLevel === 'number' ? def.saltLevel : chlor.saltLevel,
modelName: def.modelName || chlor.modelName
});
chlor.clearAutoDisabled();
} else {
chlor = this._constructChlorinator({ ...def, autoDisabled: false });
this._chlorinators.push(chlor);
}
await this.saveAsync();
this.emit();
return chlor;
}
public async deleteChlorinatorAsync(address: number): Promise {
const before = this._chlorinators.length;
this._chlorinators = this._chlorinators.filter(c => c.address !== address);
if (this._chlorinators.length !== before) {
await this.saveAsync();
this.emit();
}
}
public async reenableChlorinatorAsync(address: number): Promise {
const chlor = this.findChlorinatorByAddress(address);
if (!chlor) return undefined;
chlor.clearAutoDisabled();
await this.saveAsync();
this.emit();
return chlor;
}
public async upsertIntelliChemAsync(def: any): Promise {
if (typeof def.address !== 'number') throw new Error('address is required');
let ic = this.findIntelliChemByAddress(def.address);
if (ic) {
ic.applyUserConfig(def);
ic.clearAutoDisabled();
} else {
ic = new VirtualIntelliChem({ ...def, autoDisabled: false });
this._intellichems.push(ic);
}
await this.saveAsync();
this.emit();
return ic;
}
public async deleteIntelliChemAsync(address: number): Promise {
const before = this._intellichems.length;
this._intellichems = this._intellichems.filter(ic => ic.address !== address);
if (this._intellichems.length !== before) {
await this.saveAsync();
this.emit();
}
}
public async reenableIntelliChemAsync(address: number): Promise {
const ic = this.findIntelliChemByAddress(address);
if (!ic) return undefined;
ic.clearAutoDisabled();
await this.saveAsync();
this.emit();
return ic;
}
public getSnapshot(): any {
return {
filePath: this._filePath,
pumps: this._pumps.map(p => p.toSnapshot()),
chlorinators: this._chlorinators.map(c => c.toSnapshot()),
intellichems: this._intellichems.map(ic => ic.toSnapshot())
};
}
private _debounceSave(): void {
if (this._saveTimer) return;
this._saveTimer = setTimeout(() => {
this._saveTimer = null;
this.saveAsync().catch(e => logger.error(`VirtualEquipment: debounced save failed: ${e.message}`));
}, 2000);
}
public async saveAsync(): Promise {
const data = {
pumps: this._pumps.map(p => p.toPersisted()),
chlorinators: this._chlorinators.map(c => c.toPersisted()),
intellichems: this._intellichems.map(ic => ic.toPersisted())
};
const dir = path.dirname(this._filePath);
try {
await fs.promises.mkdir(dir, { recursive: true });
await fs.promises.writeFile(this._filePath, JSON.stringify(data, null, 2), 'utf8');
} catch (err) {
logger.error(`VirtualEquipment: failed to write ${this._filePath}: ${(err as Error).message}`);
throw err;
}
}
public emit(): void {
try {
webApp.emitToClients('virtualEquipment', this.getSnapshot());
} catch { /* webApp may not be initialized during unit tests */ }
}
}
export const virtualEquipmentManager: VirtualEquipmentManager = new VirtualEquipmentManager();