/* nodejs-poolController. An application to control pool equipment. Copyright (C) 2016, 2017, 2018, 2019, 2020, 2021, 2022. Russell Goldin, tagyoureit. russ.goldin@gmail.com This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ import { Direction, Inbound, Outbound, Protocol } from '../../comms/messages/Messages'; import { PumpStateMessage } from '../../comms/messages/status/PumpStateMessage'; import { conn } from '../../comms/Comms'; import { sys } from '../../Equipment'; import { logger } from '../../../logger/Logger'; // Actions whose response carries live rpm/watts/driveState that // PumpStateMessage.process consumes for state.pumps[]. Loopback of pure ACKs // (4 remote/local, 6 power) would be noise — state.pumps doesn't read them. const LOOPBACK_ACTIONS = new Set([7]); export interface VirtualPumpOptions { address: number; portId?: number; enabled?: boolean; autoDisabled?: boolean; autoDisabledAt?: string | null; autoDisabledReason?: string | null; wattModel?: string; } /** * Base class for virtual pumps. Handles shared actions 4 (remote/local), * 6 (power), 7 (status request) and the pump-type-agnostic pieces of the * 15-byte status payload. Subclasses implement: * - processSpeedCommand() for set-speed commands (action 1 / 9 / 10) * - supportedActions list * - computeWatts() * - pump type string ('vs' / 'vf' / 'vsf' / …) */ export abstract class VirtualPump { public readonly address: number; public portId: number; public wattModel: string; private _enabled: boolean; private _autoDisabled: boolean; private _autoDisabledAt: string | null; private _autoDisabledReason: string | null; // Runtime state — not persisted. protected _running = false; protected _remote = false; protected _targetRpm = 0; protected _targetFlow = 0; protected _feature = 0; protected _enabledAt: number = Date.now(); protected _lastPacketAt: number | null = null; protected _packetCount = 0; // Sliding window of inbound-packet timestamps whose source matched our // address. Populated by the manager's observe(); used to detect // collisions with a real pump. Kept small (last 8). private _recentEchoes: number[] = []; constructor(opts: VirtualPumpOptions) { this.address = opts.address; this.portId = opts.portId ?? 0; this.wattModel = opts.wattModel || 'cheap'; // Fail-off default: a missing `enabled` field must NOT light up a // virtual pump that could collide with real hardware. Only an // explicit `enabled: true` enables; anything else is disabled. this._enabled = opts.enabled === true; this._autoDisabled = opts.autoDisabled === true; this._autoDisabledAt = opts.autoDisabledAt ?? null; this._autoDisabledReason = opts.autoDisabledReason ?? null; } public abstract get type(): string; protected abstract supportedActions: Set; protected abstract processSpeedCommand(msg: Inbound, response: Outbound): boolean; protected abstract computeWatts(): number; public get enabled(): boolean { return this._enabled; } public get autoDisabled(): boolean { return this._autoDisabled; } public get autoDisabledAt(): string | null { return this._autoDisabledAt; } public get autoDisabledReason(): string | null { return this._autoDisabledReason; } public get isEffective(): boolean { return this._enabled && !this._autoDisabled; } public get recentEchoes(): number[] { return this._recentEchoes; } public supportsAction(action: number): boolean { return this.supportedActions.has(action); } public pushRecentInboundEcho(ts: number): void { this._recentEchoes.push(ts); if (this._recentEchoes.length > 8) this._recentEchoes.splice(0, this._recentEchoes.length - 8); } public setAutoDisabled(v: boolean, reason?: string): void { const wasEffective = this.isEffective; this._autoDisabled = v; this._autoDisabledAt = v ? new Date().toISOString() : null; this._autoDisabledReason = v ? (reason || 'Collision detected on bus') : null; // When effectiveness drops we are no longer answering on the bus — // any leftover "running / 2540 rpm / 798 W" runtime is stale and // misleading in the dP readout. Clear it. if (wasEffective && !this.isEffective) this._resetRuntime(); } public clearAutoDisabled(): void { this._autoDisabled = false; this._autoDisabledAt = null; this._autoDisabledReason = null; } public applyUserConfig(opts: Partial): void { const wasEffective = this.isEffective; if (typeof opts.enabled === 'boolean') this._enabled = opts.enabled; if (typeof opts.portId === 'number') this.portId = opts.portId; if (typeof opts.wattModel === 'string') this.wattModel = opts.wattModel; // Re-enabling resets the runtime clock so the status time counter // doesn't look frozen after a long auto-disable. if (opts.enabled === true) this._enabledAt = Date.now(); // Same cleanup on user-disable as on auto-disable. if (wasEffective && !this.isEffective) this._resetRuntime(); } /** * Wipe transient runtime state (running, remote, speeds, watts). Called * when the pump transitions from effective → not-effective so the REST / * socket snapshot doesn't display stale OCP-commanded values next to * "Effective: no". _packetCount / _lastPacketAt are preserved as a * historical record of bus activity. */ private _resetRuntime(): void { this._running = false; this._remote = false; this._targetRpm = 0; this._targetFlow = 0; this._feature = 0; } /** * Process an inbound master→pump packet and enqueue the response on the * same portId with src=ourAddress, dst=msg.source. * * Responses mirror what a real IntelliFlo pump emits — small payloads * for the ack cases, and a 15-byte payload for the status (action 7). */ public process(msg: Inbound): void { this._lastPacketAt = Date.now(); this._packetCount++; const response = Outbound.create({ portId: msg.portId, protocol: Protocol.Pump, source: this.address, dest: msg.source, action: msg.action, payload: [], retries: 0, response: false }); switch (msg.action) { case 4: { // Remote/local control. [255] = remote, [0] = local. const val = msg.extractPayloadByte(0); this._remote = val === 255; response.appendPayloadByte(val); break; } case 6: { // Power. [10] = on, [4] = off. const val = msg.extractPayloadByte(0); if (val === 10) this._running = true; else if (val === 4) this._running = false; response.appendPayloadByte(val); break; } case 7: { this._appendStatusPayload(response); break; } default: { const handled = this.processSpeedCommand(msg, response); if (!handled) { logger.verbose(`VirtualPump ${this.address}: ignoring unsupported action ${msg.action}`); return; } break; } } try { let port = conn.findPortById(response.portId); if (port) port.emitter.emit('messagewritepriority', response); else conn.queueSendMessage(response); logger.verbose(`VirtualPump ${this.address}: answered action ${msg.action} with ${response.toShortPacket()}`); } catch (err) { logger.error(`VirtualPump ${this.address}: failed to queue response for action ${msg.action}: ${(err as Error).message}`); } // njsPC only populates state.pumps[].rpm/watts/driveState from inbound // pump replies (PumpStateMessage.process, gated on msg.source>=96). A // real pump on the bus satisfies that naturally; our virtual pump // writes are outbound-only and never come back. Feed a synthetic copy // of the reply directly to the parser so ICP/OCP and dashPanel agree. // We call PumpStateMessage.process() directly (not Messages.process / // Inbound.process) so the VirtualEquipment dispatch hook isn't // re-entered and we can't loop. if (LOOPBACK_ACTIONS.has(msg.action)) { this._loopbackAsInbound(response); } } private _loopbackAsInbound(out: Outbound): void { // Only loop back if the OCP has an active pump configured at our // address — PumpStateMessage.process() looks up by address and would // otherwise synthesize a phantom pump entry, violating our // "virtual pump is invisible to sys.pumps" invariant. const cfg = sys.pumps.find(p => p.address === this.address && p.isActive === true); if (!cfg) return; try { const inbound = new Inbound(); inbound.protocol = Protocol.Pump; inbound.direction = Direction.In; inbound.portId = out.portId; inbound.preamble = [255, 0, 255]; inbound.header = out.header.slice(); inbound.payload = out.payload.slice(); inbound.term = out.term.slice(); inbound.isValid = true; inbound.timestamp = new Date(); PumpStateMessage.process(inbound); } catch (err) { logger.error(`VirtualPump ${this.address}: loopback to PumpStateMessage failed for action ${out.action}: ${(err as Error).message}`); } } /** * Append the 15-byte status payload (action 7). Byte layout mirrors * what PumpStateMessage.process decodes: * [0] command 0 idle / 10 running * [1] mode 0 * [2] driveState 0 idle / 2 running * [3] watts hi * [4] watts lo * [5] rpm hi * [6] rpm lo * [7] flow (0 for VS) * [8] ppc * [9] (unused) * [10] (unused) * [11] status hi 0 * [12] status lo 0 * [13] hours component of run time * [14] minutes component of run time */ private _appendStatusPayload(response: Outbound): void { response.appendPayloadBytes(0, 15); response.setPayloadByte(0, this._running ? 10 : 0); response.setPayloadByte(1, 0); response.setPayloadByte(2, this._running ? 2 : 0); const watts = this._running ? Math.max(0, Math.round(this.computeWatts())) : 0; response.setPayloadByte(3, Math.floor(watts / 256) & 0xff); response.setPayloadByte(4, watts & 0xff); const rpm = this._running ? Math.max(0, this._targetRpm) : 0; response.setPayloadByte(5, Math.floor(rpm / 256) & 0xff); response.setPayloadByte(6, rpm & 0xff); const flow = this._running ? Math.max(0, this._targetFlow) & 0xff : 0; response.setPayloadByte(7, flow); response.setPayloadByte(8, 0); response.setPayloadByte(11, 0); response.setPayloadByte(12, 0); const runMinutes = Math.max(0, Math.floor((Date.now() - this._enabledAt) / 60000)); response.setPayloadByte(13, Math.floor(runMinutes / 60) & 0xff); response.setPayloadByte(14, (runMinutes % 60) & 0xff); } /** Serialize user-intent + auto-disable state for data/virtualEquipment.json. */ public toPersisted(): any { return { address: this.address, type: this.type, portId: this.portId, enabled: this._enabled, autoDisabled: this._autoDisabled, autoDisabledAt: this._autoDisabledAt, autoDisabledReason: this._autoDisabledReason, wattModel: this.wattModel }; } /** Serialize intent + runtime state for REST/socket consumers. */ public toSnapshot(): any { return { ...this.toPersisted(), isEffective: this.isEffective, runtime: { running: this._running, remote: this._remote, targetRpm: this._targetRpm, targetFlow: this._targetFlow, feature: this._feature, watts: this._running ? Math.round(this.computeWatts()) : 0, enabledAt: new Date(this._enabledAt).toISOString(), lastPacketAt: this._lastPacketAt ? new Date(this._lastPacketAt).toISOString() : null, packetCount: this._packetCount } }; } }