import { EventEmitter } from 'events'; import { Logger } from 'winston'; import WebSocket from 'ws'; import { type ParserOptions, Parser } from 'xml2js'; import { parseBooleans, parseNumbers } from 'xml2js/lib/processors.js'; import { Message } from './ISY.js'; import { EventType } from './Events/EventType.js'; import { ISYInitializationError } from './ISY.js'; import { emphasize } from './Utils.js'; import * as Utils from './Utils.js'; import type { ClientRequestArgs } from 'http'; const defaultParserOptions: ParserOptions = { explicitArray: false, mergeAttrs: true, attrValueProcessors: [parseNumbers, parseBooleans], valueProcessors: [parseNumbers, parseBooleans], tagNameProcessors: [tagName => (tagName === 'st' || tagName === 'cmd' || tagName === 'nodeDef' ? '' : tagName)], }; const parser = new Parser(defaultParserOptions); /** * WebSocket endpoint configuration options */ export interface ISYWebSocketConfig { /** Host address */ host: string; /** Port number */ port: number; /** Protocol (ws or wss) */ wsprotocol: 'ws' | 'wss'; /** Socket path for Unix socket connections */ socketPath?: string; /** Authentication credentials */ credentials: { username: string; password: string; }; /** WebSocket client options */ webSocketOptions: WebSocket.ClientOptions & ClientRequestArgs & { guardianInterval?: number }; /** Guardian timer interval in milliseconds */ guardianInterval: number; /** Whether WebSocket is enabled */ enableWebSocket: boolean; } /** * WebSocket message handler interface */ export interface IWebSocketMessageHandler { handleWebSocketMessage(event: { data: any }): void; getNode(address: string): any; getVariable(type: any, id: number): any; elkAlarmPanel?: any; zoneMap?: Map; nodeChangedHandler?(node: any): void; } /** * ISY WebSocket endpoint management class * Handles WebSocket connection, message processing, and connection monitoring */ export class ISYWebSocketEndpoint extends EventEmitter { // #region Properties private webSocket: WebSocket; private webSocketOptions: WebSocket.ClientOptions & ClientRequestArgs; private guardianTimer: NodeJS.Timeout; private lastActivity: number; private readonly config: ISYWebSocketConfig; private readonly logger: Logger; private readonly messageHandler: IWebSocketMessageHandler; // #endregion Properties // #region Constructor /** * Creates a new ISY WebSocket endpoint * @param config WebSocket configuration * @param logger Logger instance * @param messageHandler Message handler for processing WebSocket events */ constructor(config: ISYWebSocketConfig, logger: Logger, messageHandler: IWebSocketMessageHandler) { super(); this.config = config; this.logger = logger; this.messageHandler = messageHandler; this.guardianTimer = null; this.lastActivity = Date.now(); this.webSocketOptions = { origin: 'com.universal-devices.websockets.isy' }; if (this.config.socketPath) { this.webSocketOptions.socketPath = this.config.socketPath; } else { this.webSocketOptions.auth = `${this.config.credentials.username}:${this.config.credentials.password}`; } } // #endregion Constructor // #region Public Methods /** * Initializes the WebSocket connection */ public async initialize(): Promise { if (!this.config.enableWebSocket) { this.logger.warn('WebSocket is disabled. Skipping initialization.'); return; } try { let address = `${this.config.wsprotocol}://${this.config.host}:${this.config.port}/rest/subscribe`; if (this.config.socketPath) { address = `ws+unix:${this.config.socketPath}:/rest/subscribe`; } this.logger.info(`Opening webSocket: ${address}`); this.logger.info('Using the following websocket options: ' + JSON.stringify(this.webSocketOptions)); if (this.webSocket) { try { if (this.webSocket.readyState === WebSocket.OPEN) this.webSocket.close(); this.webSocket = null; } catch (e) { this.logger.warn(`Error closing existing websocket: ${e.message}`); } } const webSocketPromise = new Promise((resolve, reject) => { const webSocket = new WebSocket(address, ['ISYSUB'], this.webSocketOptions); webSocket .on('open', () => { this.logger.info(emphasize('Websocket subscription open')); this.guardianTimer = setTimeout(async () => { this.logger.warn('WebSocket Guardian Timer expired. Closing WebSocket.'); webSocket.terminate(); this.guardianTimer = null; await this.initialize(); }, this.config.guardianInterval ?? 60000); resolve(webSocket); }) .on('message', (data, b) => { this.logger.silly(`Received message: ${Utils.logStringify(data, 1)}`); this.guardianTimer.refresh(); this.handleWebSocketMessage({ data: data }); }) .on('error', (err: any, response: any) => { this.logger.warn(`Websocket subscription error: ${err}`); reject(new ISYInitializationError('Websocket subscription error', 'websocket')); }) .on('fail', (data: string, response: any) => { this.logger.warn(`Websocket subscription failure: ${data}`); reject(new Error('Websocket subscription failure')); }) .on('abort', () => { this.logger.warn('Websocket subscription aborted.'); throw new Error('Websocket subscription aborted.'); }) .on('timeout', (ms: string) => { this.logger.warn(`Websocket subscription timed out after ${ms} milliseconds.`); reject(new Error('Timeout contacting ISY')); }) .on('close', (code: number, reason: string) => { this.logger.info(emphasize(`Websocket subscription closed: ${code} - ${reason}`)); if (this.guardianTimer) { clearTimeout(this.guardianTimer); } this.guardianTimer = null; if (code !== 1005) { this.logger.warn(`WebSocket closed with code ${code}: ${reason}. Forcing to terminate.`); if (webSocket) { webSocket.terminate(); } } if (webSocket) { webSocket.removeAllListeners(); } }); }); this.webSocket = await webSocketPromise; } catch (e) { throw new ISYInitializationError(e, 'websocket'); } } /** * Handles incoming WebSocket messages * @param event WebSocket message event */ public handleWebSocketMessage(event: { data: any }): void { this.lastActivity = Date.now(); parser.parseString(event.data, (err: any, res: { Event: Message }) => { if (err) { this.logger.error(`Error parsing ISY WebSocket message: ${err} - ${event.data}`); return; } try { const evt = res?.Event; if (evt === undefined || evt.control === undefined) { return; } let actionValue = 0; if (evt.action instanceof Object) { actionValue = Number(evt.action._); } else if (typeof evt.action === 'string') { actionValue = Number(evt.action); } const stringControl = Number((evt.control as string)?.replace('_', '')); switch (stringControl) { case EventType.Elk: if (actionValue === 2) { this.messageHandler.elkAlarmPanel?.handleEvent(event); } else if (actionValue === 3) { const zeElement = evt.eventInfo.ze; const zoneId = zeElement.zone; const zoneDevice = this.messageHandler.zoneMap?.[zoneId]; if (zoneDevice !== null) { if (zoneDevice.handleEvent(event)) { this.messageHandler.nodeChangedHandler?.(zoneDevice); } } } break; case EventType.Trigger: if (actionValue === 6) { const varNode = evt.eventInfo.var; const id = varNode.id; const type = varNode.type; this.messageHandler.getVariable(type, id)?.handleEvent(evt); } break; case EventType.Heartbeat: this.logger.debug(`Received ${EventType[EventType.Heartbeat]} Signal from ISY: ${JSON.stringify(evt)}`); break; case EventType.SystemStatusChanged: case EventType.ProgressReport: this.logger.silly(`${EventType[stringControl]} Message: ${JSON.stringify(evt)}`); break; default: if (evt.node !== '' && evt.node !== undefined && evt.node !== null) { const impactedNode = this.messageHandler.getNode(evt.node); if (impactedNode !== undefined && impactedNode !== null) { try { impactedNode.handleEvent(evt); } catch (e) { this.logger.error(`Error handling message for ${impactedNode.name}: ${e.message}`); } } else { this.logger.silly(`${stringControl} Message for Unidentified Device: ${JSON.stringify(evt)}`); } } else { this.logger.debug(`${EventType[stringControl]} Message: ${JSON.stringify(evt)}`); } break; } } catch (e) { this.logger.error(`Error handling WebSocket message: ${e.message} - ${JSON.stringify(event)}`); } }); } /** * Closes the WebSocket connection */ public async close(): Promise { try { const closePromise = new Promise(resolve => this.webSocket?.once('close', (x, y) => { resolve(); }) ); if (this.guardianTimer) { clearInterval(this.guardianTimer); this.guardianTimer = null; } this.webSocket?.close(); await closePromise; } catch (e) { this.logger.error(`Error closing websocket: ${e.message}`); } } /** * Gets the current WebSocket connection state */ public get isConnected(): boolean { return this.webSocket?.readyState === WebSocket.OPEN; } /** * Gets the last activity timestamp */ public get getLastActivity(): number { return this.lastActivity; } // #endregion Public Methods // #region Private Methods /** * Guardian function to monitor WebSocket activity */ private async guardian(): Promise { const timeNow = Date.now(); if (timeNow - this.lastActivity > this.config.guardianInterval) { this.logger.warn( `Guardian: Detected no activity in more then ${this.config.guardianInterval} seconds. Reinitializing web sockets` ); // Emit event to notify ISY instance to refresh statuses this.emit('guardianTimeout'); await this.initialize(); this.guardianTimer.refresh(); this.logger.warn('Guardian: WebSocket reinitialized'); this.lastActivity = Date.now(); } } // #endregion Private Methods }