import Application from '../application'; import { SocketProxy, loggerLevel, ServerInfo } from '../util/interfaceDefine'; import { TcpClient } from '../components/tcpClient'; import * as define from '../util/define'; import * as rpcService from './rpcService'; import * as appUtil from '../util/appUtil'; import * as path from 'path'; const meFilename = `[${path.basename(__filename, '.js')}.ts]`; /** * Whether to establish a socket connection */ export function ifCreateRpcClient(app: Application, server: ServerInfo) { // Only one socket connection is established between the two servers if (app.serverId < server.id && !app.noRpcMatrix[appUtil.getNoRpcKey(app.serverType, server.serverType)]) { removeSocket(server.id); // eslint-disable-next-line no-new new RpcClientSocket(app, server); } } /** * Remove socket connection */ export function removeSocket(id: string) { const socket = rpcClientSockets.get(id); if (socket) { socket.remove(); rpcClientSockets.delete(id); } } const rpcClientSockets = new Map(); export class RpcClientSocket { private readonly app: Application; public id: string; private readonly host: string; private readonly port: number; private socket: SocketProxy = null as any; private connectTimer: NodeJS.Timeout = null as any; private heartbeatTimer: NodeJS.Timeout = null as any; private heartbeatTimeoutTimer: NodeJS.Timeout = null as any; private readonly sendCache: boolean = false; private readonly interval: number = 0; private sendArr: Buffer[] = []; private sendTimer: NodeJS.Timeout = null as any; private nowLen = 0; private readonly maxLen = +Infinity; private die: boolean = false; private readonly serverToken: string = ''; constructor(app: Application, server: ServerInfo) { this.app = app; this.id = server.id; this.host = server.host; this.port = server.port; rpcClientSockets.set(this.id, this); const rpcConfig = app.someconfig.rpc || {}; let interval = 0; if (rpcConfig.interval) { if (typeof rpcConfig.interval === 'number') { interval = rpcConfig.interval; } else { interval = rpcConfig.interval[server.serverType] || rpcConfig.interval.default || 0; } } if (interval >= 10) { this.sendCache = true; this.interval = interval; const tmpMaxLen = parseInt(rpcConfig.intervalCacheLen as any) || 0; if (tmpMaxLen > 0) { this.maxLen = tmpMaxLen; } } const tokenConfig = app.someconfig.recognizeToken || {} as any; this.serverToken = tokenConfig.serverToken || define.some_config.Server_Token; this.doConnect(0); } private doConnect(delay: number) { if (this.die) { return; } const self = this; this.connectTimer = setTimeout(() => { const connectCb = function () { self.app.logger(loggerLevel.debug, `${meFilename} connect to rpc server success: ${self.id}`); // register const registerBuf = Buffer.from(JSON.stringify({ id: self.app.serverId, serverType: self.app.serverType, serverToken: self.serverToken })); const buf = Buffer.allocUnsafe(registerBuf.length + 5); buf.writeUInt32BE(registerBuf.length + 1, 0); buf.writeUInt8(define.Rpc_Msg.register, 4); registerBuf.copy(buf, 5); self.socket.send(buf); if (self.sendCache) { self.sendTimer = setInterval(self.sendInterval.bind(self), self.interval); } }; self.connectTimer = null as any; const rpcConfig = self.app.someconfig.rpc || {}; const noDelay = !!rpcConfig.noDelay; self.socket = new TcpClient(self.port, self.host, rpcConfig.maxLen || define.some_config.SocketBufferMaxLen, noDelay, connectCb); self.socket.on('data', self.onData.bind(self)); self.socket.on('close', self.onClose.bind(self)); self.app.logger(loggerLevel.debug, `${meFilename} try to connect to rpc server: ${self.id}`); }, delay); } private onClose() { this.app.rpcPool.removeSocket(this.id); clearTimeout(this.heartbeatTimer); clearTimeout(this.heartbeatTimeoutTimer); clearInterval(this.sendTimer); this.sendArr = []; this.nowLen = 0; this.heartbeatTimeoutTimer = null as any; this.socket = null as any; this.app.logger(loggerLevel.error, `${meFilename} socket closed, reconnect the rpc server later: ${this.id}`); const rpcConfig = this.app.someconfig.rpc || {}; const delay = rpcConfig.reconnectDelay || define.some_config.Time.Rpc_Reconnect_Time; this.doConnect(delay * 1000); } /** * Send heartbeat at regular intervals */ private heartbeatSend() { const rpcConfig = this.app.someconfig.rpc || {}; const heartbeat = rpcConfig.heartbeat || define.some_config.Time.Rpc_Heart_Beat_Time; let timeDelay = heartbeat * 1000 - 5000 + Math.floor(5000 * Math.random()); if (timeDelay < 5000) { timeDelay = 5000; } this.heartbeatTimer = setTimeout(() => { const buf = Buffer.allocUnsafe(5); buf.writeUInt32BE(1, 0); buf.writeUInt8(define.Rpc_Msg.heartbeat, 4); this.socket.send(buf); this.heartbeatTimeoutStart(); this.heartbeatTimer.refresh(); }, timeDelay); } /** * After sending a heartbeat, receive a response */ private heartbeatResponse() { clearTimeout(this.heartbeatTimeoutTimer); this.heartbeatTimeoutTimer = null as any; } /** * After sending the heartbeat, a response must be received within a certain period of time, otherwise the connection will be disconnected */ private heartbeatTimeoutStart() { if (this.heartbeatTimeoutTimer !== null) { return; } const self = this; this.heartbeatTimeoutTimer = setTimeout(function () { self.app.logger(loggerLevel.error, `${meFilename} heartbeat timeout, close the rpc socket: ${self.id}`); self.socket.close(); }, define.some_config.Time.Rpc_Heart_Beat_Timeout_Time * 1000); } private onData(data: Buffer) { try { const type = data.readUInt8(0); if (type === define.Rpc_Msg.clientMsgIn) { this.app.backendServer.handleMsg(this.id, data); } else if (type === define.Rpc_Msg.clientMsgOut) { this.app.frontendServer.sendMsgByUids(data); } else if (type === define.Rpc_Msg.rpcMsgAwait) { rpcService.handleMsgAwait(this.id, data).catch(() => { }); } else if (type === define.Rpc_Msg.rpcMsg) { rpcService.handleMsg(this.id, data); } else if (type === define.Rpc_Msg.applySession) { this.app.frontendServer.applySession(data); } else if (type === define.Rpc_Msg.register) { this.registerHandle(); } else if (type === define.Rpc_Msg.heartbeat) { this.heartbeatResponse(); } } catch (e: any) { this.app.logger(loggerLevel.error, e); } } /** * registration success */ private registerHandle() { this.heartbeatSend(); this.app.rpcPool.addSocket(this.id, this); } /** * Remove the socket */ remove() { this.die = true; if (this.socket) { this.socket.close(); } else if (this.connectTimer !== null) { clearTimeout(this.connectTimer); } } send(data: Buffer) { if (this.sendCache) { this.sendArr.push(data); this.nowLen += data.length; if (this.nowLen > this.maxLen) { this.sendInterval(); } } else { this.socket.send(data); } } private sendInterval() { if (this.sendArr.length > 0) { this.socket.send(Buffer.concat(this.sendArr)); this.sendArr.length = 0; this.nowLen = 0; } } }