import Application from '../application'; import { I_clientManager, I_clientSocket, SocketProxy, I_connectorConfig } from '../util/interfaceDefine'; import * as define from '../util/define'; import { Session } from '../components/session'; import { EventEmitter } from 'events'; import WebSocket, * as ws from 'ws'; import * as https from 'https'; import * as http from 'http'; import * as crypto from 'crypto'; let maxLen = 0; /** * connector ws */ export class ConnectorWs { public app: Application; public clientManager: I_clientManager = null as any; public handshakeBuf: Buffer; // Handshake buffer public handshakeBufAll: Buffer = null as any; // Handshake buffer all public heartbeatBuf: Buffer; // Heartbeat response buffer public heartbeatTime: number = 0; // Heartbeat time private readonly maxConnectionNum: number = Number.POSITIVE_INFINITY; public nowConnectionNum: number = 0; public sendCache = false; public interval: number = 0; public md5 = ''; // route array md5 constructor(info: { app: Application; clientManager: I_clientManager; config: I_connectorConfig; startCb: () => void }) { this.app = info.app; this.clientManager = info.clientManager; const connectorConfig = info.config || {}; maxLen = connectorConfig.maxLen || define.some_config.SocketBufferMaxLen; this.heartbeatTime = (connectorConfig.heartbeat || 0) * 1000; if (connectorConfig.maxConnectionNum != null) { this.maxConnectionNum = connectorConfig.maxConnectionNum; } const interval = Number(connectorConfig.interval) || 0; if (interval >= 10) { this.sendCache = true; this.interval = interval; } wsServer(info.app.serverInfo.clientPort, connectorConfig, info.startCb, this.newClientCb.bind(this)); // Handshake buffer const cipher = crypto.createHash('md5'); this.md5 = cipher.update(JSON.stringify(this.app.routeConfig)).digest('hex'); const routeBuf = Buffer.from(JSON.stringify({ md5: this.md5, heartbeat: this.heartbeatTime / 1000 })); this.handshakeBuf = Buffer.alloc(routeBuf.length + 5); this.handshakeBuf.writeUInt32BE(routeBuf.length + 1, 0); this.handshakeBuf.writeUInt8(define.Server_To_Client.handshake, 4); routeBuf.copy(this.handshakeBuf, 5); const routeBufAll = Buffer.from(JSON.stringify({ md5: this.md5, route: this.app.routeConfig, heartbeat: this.heartbeatTime / 1000 })); this.handshakeBufAll = Buffer.alloc(routeBufAll.length + 5); this.handshakeBufAll.writeUInt32BE(routeBufAll.length + 1, 0); this.handshakeBufAll.writeUInt8(define.Server_To_Client.handshake, 4); routeBufAll.copy(this.handshakeBufAll, 5); // Heartbeat response buffer this.heartbeatBuf = Buffer.alloc(5); this.heartbeatBuf.writeUInt32BE(1, 0); this.heartbeatBuf.writeUInt8(define.Server_To_Client.heartbeatResponse, 4); } private newClientCb(socket: SocketProxy) { if (this.nowConnectionNum < this.maxConnectionNum) { return new ClientSocket(this, this.clientManager, socket); } else { console.warn('socket num has reached the maxConnectionNum, close it'); socket.close(); } } } class ClientSocket implements I_clientSocket { session: Session = null as any; // Session remoteAddress: string = ''; private readonly connector: ConnectorWs; private readonly clientManager: I_clientManager; private readonly socket: SocketProxy; // socket private readonly registerTimer: NodeJS.Timeout = null as any; // Handshake timeout timer private heartbeatTimer: NodeJS.Timeout = null as any; // Heartbeat timeout timer private readonly sendCache: boolean = false; private readonly interval: number = 0; private sendTimer: NodeJS.Timeout = null as any; private sendArr: Buffer[] = []; constructor(connector: ConnectorWs, clientManager: I_clientManager, socket: SocketProxy) { this.connector = connector; this.connector.nowConnectionNum++; this.sendCache = connector.sendCache; this.interval = connector.interval; this.clientManager = clientManager; this.socket = socket; this.remoteAddress = socket.remoteAddress; this.socket.socket._receiver._maxPayload = 50; // Up to 50 byte of data when not registered socket.once('data', this.onRegister.bind(this)); socket.on('close', this.onClose.bind(this)); this.registerTimer = setTimeout(() => { this.close(); }, 10000); } private onRegister(data: Buffer) { const type = data.readUInt8(0); if (type === define.Client_To_Server.handshake) { // shake hands this.handshake(data); } else { this.close(); } } /** * Received data */ private onData(data: Buffer) { const type = data.readUInt8(0); if (type === define.Client_To_Server.msg) { // Ordinary custom message this.clientManager.handleMsg(this, data); } else if (type === define.Client_To_Server.heartbeat) { // Heartbeat this.heartbeat(); this.heartbeatResponse(); } else { this.close(); } } /** * closed */ private onClose() { this.connector.nowConnectionNum--; clearTimeout(this.registerTimer); clearTimeout(this.heartbeatTimer); this.heartbeatTimer = null as any; clearInterval(this.sendTimer); this.sendArr = []; this.clientManager.removeClient(this); } /** * shake hands */ private handshake(data: Buffer) { let msg: { 'md5': string } = null as any; try { msg = JSON.parse(data.slice(1).toString()); } catch (e) { } if (!msg) { this.close(); return; } if (msg.md5 === this.connector.md5) { this.send(this.connector.handshakeBuf); } else { this.send(this.connector.handshakeBufAll); } clearTimeout(this.registerTimer); this.heartbeat(); this.clientManager.addClient(this); if (this.sendCache) { this.sendTimer = setInterval(this.sendInterval.bind(this), this.interval); } this.socket.socket._receiver._maxPayload = maxLen; this.socket.on('data', this.onData.bind(this)); } /** * Heartbeat */ private heartbeat() { if (this.connector.heartbeatTime === 0) { return; } if (this.heartbeatTimer) { this.heartbeatTimer.refresh(); } else { this.heartbeatTimer = setTimeout(() => { this.close(); }, this.connector.heartbeatTime * 2); } } /** * Heartbeat response */ private heartbeatResponse() { this.send(this.connector.heartbeatBuf); } /** * send data */ send(msg: Buffer) { if (this.sendCache) { this.sendArr.push(msg); } else { this.socket.send(msg); } } private sendInterval() { if (this.sendArr.length > 0) { this.socket.send(Buffer.concat(this.sendArr)); this.sendArr.length = 0; } } /** * close */ close() { this.sendInterval(); this.socket.close(); } } /** * websocket server */ function wsServer(port: number, config: I_connectorConfig, startCb: () => void, newClientCb: (socket: SocketProxy) => void) { const httpServer = config.ssl ? https.createServer({ cert: config.cert, key: config.key }) : http.createServer(); const server = new ws.Server({ server: httpServer }); server.on('connection', function (socket, req) { newClientCb(new WsSocket(socket, req.socket.remoteAddress ?? '')); }); server.on('error', (err) => { console.log(err); process.exit(); }); server.on('close', () => { }); httpServer.listen(port, startCb); } class WsSocket extends EventEmitter implements SocketProxy { die: boolean = false; remoteAddress: string = ''; socket: WebSocket; maxLen: number = 0; len: number = 0; buffer: Buffer = null as any; headLen = 0; headBuf = Buffer.alloc(4); private readonly onDataFunc: (data: Buffer) => void = null as any; constructor(socket: WebSocket, remoteAddress: string) { super(); this.socket = socket; this.remoteAddress = remoteAddress; socket.on('close', () => { this.onClose(); }); socket.on('error', (err: any) => { this.onClose(err); }); this.onDataFunc = this.onData.bind(this); socket.on('message', this.onDataFunc); } private onClose(err?: Error) { if (!this.die) { this.die = true; this.socket.off('message', this.onDataFunc); this.emit('close', err); } } private onData(data: Buffer) { let index = 0; while (index < data.length) { const msgLen = data.readUInt32BE(index); this.emit('data', data.slice(index + 4, index + 4 + msgLen)); index += msgLen + 4; } } send(data: Buffer) { this.socket.send(data); } close() { this.socket.close(); this.socket.emit('close'); } }