/** * The master central server, accepts the monitor connection, is responsible for the mutual understanding between the servers, and accepts cli commands */ import Application from '../application'; import { MasterCli } from './cliUtil'; import { SocketProxy, monitor_get_new_server, monitor_remove_server, monitor_reg_master, loggerLevel, ServerInfo } from '../util/interfaceDefine'; import tcpServer from './tcpServer'; import { runServers } from '../util/starter'; import define = require('../util/define'); import * as msgCoder from './msgCoder'; import * as path from 'path'; const meFilename = `[${path.basename(__filename, '.js')}.ts]`; const servers = new Map(); const serversDataTmp: monitor_get_new_server = { T: define.Master_To_Monitor.addServer, servers: new Map() }; let masterCli: MasterCli; let app: Application; let serverToken = ''; let cliToken = ''; export function start(_app: Application, cb?: Function) { app = _app; masterCli = new MasterCli(_app, servers); startServer(cb); } function startServer(cb?: Function) { const tokenConfig = app.someconfig.recognizeToken || {} as any; serverToken = tokenConfig.serverToken || define.some_config.Server_Token; cliToken = tokenConfig.cliToken || define.some_config.Cli_Token; tcpServer(app.serverInfo.port, false, startCb, newClientCb); function startCb() { const str = `listening at [${app.serverInfo.host}:${app.serverInfo.port}] ${app.serverId}`; console.log(str); if (cb) cb(); if (app.startMode === 'all') { runServers(app); } } function newClientCb(socket: SocketProxy) { return new UnregSocket_proxy(socket); } } /** * Unregistered socket proxy */ class UnregSocket_proxy { private socket: SocketProxy; private registerTimer: NodeJS.Timeout = null as any; private readonly onDataFunc: (data: Buffer) => void; private readonly onCloseFunc: () => void; constructor(socket: SocketProxy) { this.socket = socket; this.onDataFunc = this.onData.bind(this); this.onCloseFunc = this.onClose.bind(this); socket.on('data', this.onDataFunc); socket.on('close', this.onCloseFunc); this.registerTimeout(); } private registerTimeout() { const self = this; this.registerTimer = setTimeout(function () { app.logger(loggerLevel.error, `${meFilename} unregistered socket, register timeout, close it, ${self.socket.remoteAddress}`); self.socket.close(); }, 5000); } private onData(_data: Buffer) { const socket = this.socket; let data: monitor_reg_master; try { data = JSON.parse(_data.toString()); } catch (err) { app.logger(loggerLevel.error, `${meFilename} unregistered socket, JSON parse error, close it, ${socket.remoteAddress}`); socket.close(); return; } // The first packet must be registered if (!data || data.T !== define.Monitor_To_Master.register) { app.logger(loggerLevel.error, `${meFilename} unregistered socket, illegal data, close it, ${socket.remoteAddress}`); socket.close(); return; } // Is it a server? if (data.serverToken) { if (data.serverToken !== serverToken) { app.logger(loggerLevel.error, `${meFilename} unregistered socket, illegal serverToken, close it, ${socket.remoteAddress}`); socket.close(); return; } // eslint-disable-next-line @typescript-eslint/prefer-optional-chain if (!data.serverInfo || !data.serverInfo.id || !data.serverInfo.host || !data.serverInfo.port || !data.serverInfo.serverType) { app.logger(loggerLevel.error, `${meFilename} unregistered socket, illegal serverInfo, close it, ${socket.remoteAddress}`); socket.close(); return; } this.registerOk(); // eslint-disable-next-line no-new new Master_ServerProxy(data, socket); return; } // Is it a cli? if (data.cliToken) { if (data.cliToken !== cliToken) { app.logger(loggerLevel.error, `${meFilename} unregistered socket, illegal cliToken, close it, ${socket.remoteAddress}`); socket.close(); return; } this.registerOk(); // eslint-disable-next-line no-new new Master_ClientProxy(socket); return; } app.logger(loggerLevel.error, `${meFilename} unregistered socket, illegal socket, close it, ${socket.remoteAddress}`); socket.close(); } private onClose() { clearTimeout(this.registerTimer); app.logger(loggerLevel.error, `${meFilename} unregistered socket closed, ${this.socket.remoteAddress}`); } private registerOk() { clearTimeout(this.registerTimer); this.socket.removeListener('data', this.onDataFunc); this.socket.removeListener('close', this.onCloseFunc); this.socket = null as any; } } /** * master processing server agent */ export class Master_ServerProxy { private readonly socket: SocketProxy; public sid: string = ''; public serverType: string = ''; private heartbeatTimeoutTimer: NodeJS.Timeout = null as any; constructor(data: monitor_reg_master, socket: SocketProxy) { this.socket = socket; this.init(data); } private init(data: monitor_reg_master) { const socket = this.socket; if (servers.has(data.serverInfo.id)) { app.logger(loggerLevel.error, `${meFilename} already has a monitor named: ${data.serverInfo.id}, close it, ${socket.remoteAddress}`); socket.close(); return; } socket.maxLen = define.some_config.SocketBufferMaxLen; this.heartbeatTimeout(); socket.on('data', this.onData.bind(this)); socket.on('close', this.onClose.bind(this)); this.sid = data.serverInfo.id; this.serverType = data.serverInfo.serverType; // Construct a new server message const socketInfo: monitor_get_new_server = { T: define.Master_To_Monitor.addServer, servers: new Map() }; socketInfo.servers.set(this.sid, data.serverInfo); const socketInfoBuf: Buffer = msgCoder.encodeInnerData(socketInfo); // Notify other servers that there are new servers for (const ms of servers.values()) { ms.socket.send(socketInfoBuf); } // Notify the newly added server, which servers are currently available const result = msgCoder.encodeInnerData(serversDataTmp); this.socket.send(result); servers.set(this.sid, this); serversDataTmp.servers.set(this.sid, data.serverInfo); app.logger(loggerLevel.debug, `${meFilename} get a new monitor named: ${this.sid}, ${this.socket.remoteAddress}`); } private heartbeatTimeout() { this.heartbeatTimeoutTimer = setTimeout(() => { app.logger(loggerLevel.error, `${meFilename} heartbeat timeout, close the monitor named: ${this.sid}, ${this.socket.remoteAddress}`); this.socket.close(); }, define.some_config.Time.Monitor_Heart_Beat_Time * 1000 * 2); } send(msg: any) { this.socket.send(msgCoder.encodeInnerData(msg)); } private heartbeatResponse() { const msg = { T: define.Master_To_Monitor.heartbeatResponse }; const buf = msgCoder.encodeInnerData(msg); this.socket.send(buf); } private onData(_data: Buffer) { let data: any; try { data = JSON.parse(_data.toString()); } catch (err) { app.logger(loggerLevel.error, `${meFilename} JSON parse error,close the monitor named: ${this.sid}, ${this.socket.remoteAddress}`); this.socket.close(); return; } try { if (data.T === define.Monitor_To_Master.heartbeat) { this.heartbeatTimeoutTimer.refresh(); this.heartbeatResponse(); } else if (data.T === define.Monitor_To_Master.cliMsg) { masterCli.deal_monitor_msg(data); } else { app.logger(loggerLevel.error, `${meFilename} the monitor illegal data type close it: ${this.sid} ${this.socket.remoteAddress}`); this.socket.close(); } } catch (e: any) { app.logger(loggerLevel.error, e); this.socket.close(); } } private onClose() { clearTimeout(this.heartbeatTimeoutTimer); servers.delete(this.sid); serversDataTmp.servers.delete(this.sid); const serverInfo: monitor_remove_server = { T: define.Master_To_Monitor.removeServer, id: this.sid, serverType: this.serverType }; const serverInfoBuf: Buffer = msgCoder.encodeInnerData(serverInfo); for (const ms of servers.values()) { ms.socket.send(serverInfoBuf); } app.logger(loggerLevel.error, `${meFilename} a monitor disconnected: ${this.sid}, ${this.socket.remoteAddress}`); } } /** * master handles cli agent */ export class Master_ClientProxy { private readonly socket: SocketProxy; private heartbeatTimeoutTimer: NodeJS.Timeout = null as any; constructor(socket: SocketProxy) { this.socket = socket; this.init(); } private init() { const socket = this.socket; socket.maxLen = define.some_config.SocketBufferMaxLen; this.heartbeatTimeOut(); socket.on('data', this.onData.bind(this)); socket.on('close', this.onClose.bind(this)); app.logger(loggerLevel.info, `${meFilename} get a new cli: ${socket.remoteAddress}`); } private heartbeatTimeOut() { this.heartbeatTimeoutTimer = setTimeout(() => { app.logger(loggerLevel.error, `${meFilename} heartbeat timeout, close the cli: ${this.socket.remoteAddress}`); this.socket.close(); }, define.some_config.Time.Monitor_Heart_Beat_Time * 1000 * 2); } private onData(_data: Buffer) { let data: any; try { data = JSON.parse(_data.toString()); } catch (err) { app.logger(loggerLevel.error, `${meFilename} JSON parse error,close the cli: ${this.socket.remoteAddress}`); this.socket.close(); return; } try { if (data.T === define.Cli_To_Master.heartbeat) { this.heartbeatTimeoutTimer.refresh(); } else if (data.T === define.Cli_To_Master.cliMsg) { app.logger(loggerLevel.info, `${meFilename} get command from the cli: ${this.socket.remoteAddress} ==> ${_data.toString()}`); masterCli.deal_cli_msg(this, data); } else { app.logger(loggerLevel.error, `${meFilename} the cli illegal data type close it: ${this.socket.remoteAddress}`); this.socket.close(); } } catch (e: any) { app.logger(loggerLevel.error, `${meFilename} cli handle msg err, close it: ${this.socket.remoteAddress}\n ${e.stack}`); this.socket.close(); } } send(msg: any) { this.socket.send(msgCoder.encodeInnerData(msg)); } private onClose() { clearTimeout(this.heartbeatTimeoutTimer); app.logger(loggerLevel.info, `${meFilename} a cli disconnected: ${this.socket.remoteAddress}`); } }