/************************************************************************ * Copyright (c) 2023 sunking framework * Author : Shao * Mail : yi-shaoye@163.com * Date : 2021-10-28 * Use : ws网络模块-客户端对象 ************************************************************************/ import { IncomingMessage } from 'http'; import { WebSocket } from 'ws'; import myLogger from '@wingyi8/sk-logger'; import { BaseSocket } from '../base/BaseSocket'; import { BaseConnector } from '../base/BaseConnector'; import { SocketStatus, MsgType } from '../util/define'; const logger = myLogger.getLogger('sunking/ws/WsSocket'); export class WsSocket extends BaseSocket { /** * socket对象 */ readonly socket: WebSocket = null; /** * 定时器 */ private updateTimer: NodeJS.Timeout = null; /** * 调试日志 */ private debugLog: boolean = false; constructor(id: number, connector: BaseConnector, socket: WebSocket, httpReq: IncomingMessage, log: boolean = false) { super(); this.connector = connector; this.sid = id; this.socket = socket; this.clientip = this.getClientIp(httpReq); this.debugLog = log; socket.on('error', this.emit.bind(this, 'error')); this._status = SocketStatus.ST_HEAD; // 注册消息处理 socket.on('message', this.onMessage); // 注册握手协议 this.once('data', this.onRegister); // 断开连接 socket.on('close', this.onClose); // 握手计时器 this.registerTimer = setTimeout(() => { this.closed(1000, '握手超时'); }, 10000); logger.info(`客户端[${this.clientip}] [${this.sid}] 网络连接成功`); } /** * 获取客户端IP */ protected getClientIp(httpReq: IncomingMessage): string { let ipAddress = httpReq.socket.remoteAddress; // Remove prefix ::ffff: return ipAddress ? ipAddress.replace(/^::ffff:/, '') : ''; } /** * 消息处理 * @param data 数据包 */ private onMessage = (data: Buffer) => { if (this.debugLog) { logger.debug(`收到客户端消息 ${data.length} ${data.toString()}`); } let index = 0; while (index < data.length) { let msgLen = data.readUInt32BE(index); if (this.debugLog) { logger.debug(`客户端消息 ${msgLen}`); } this.emit('data', data.subarray(index + 4, index + 4 + msgLen)); index += msgLen + 4; } } /** * 首包消息处理 * @param data 数据包 */ private onRegister = (data: Buffer) => { let type = data.readUInt8(0); if (this.debugLog) { logger.debug(`首包消息处理 ${type}`); } if (type === MsgType.handshake) { this.handshake(data); } else { this.closed(1000, '首包协议错误'); } } /** * 握手消息 */ private handshake(data: Buffer) { let msg: { 'md5': string } = null as any; try { msg = JSON.parse(data.subarray(1).toString()); } catch (e) { logger.error(e.stack); this.closed(1000, '握手消息格式错误'); return; } // 切换客户端状态 this._status = SocketStatus.ST_BODY; // 回包 this.send(this.connector.getHandshake(msg.md5)); // 启动心跳检测 clearTimeout(this.registerTimer); this.heartbeat(); // 注册消息监听 this.on('data', this.onData); // 客户端连接成功 this.emit('connection'); // 加入管理器 this.connector.addClient(this); // 开启循环 this.updateTimer = setInterval(this.onUpdate, 1000); } /** * 循环逻辑 */ private onUpdate = () => { // rpc 超时检测 if (this.rpcRequestMap.size > 0) { let deleteCount: Array = []; const now = Date.now(); this.rpcRequestMap.forEach((one) => { if (one.time < now) { one.cb(null); deleteCount.push(one.id); } }) deleteCount.map(id => { this.rpcRequestMap.delete(id); }) } } /** * 消息解析 * @param data 数据包 */ private onData = (data: Buffer) => { if (this._status !== SocketStatus.ST_BODY) { return; } try { let type: MsgType = data.readUInt8(0); if (this.debugLog) { logger.debug(`消息解析 ${type}`); } switch (type) { // 自定义消息处理 case MsgType.msg: { this.connector.handleMsg(this, data); } break // 心跳 case MsgType.heartbeat: { this.heartbeat(); this.heartbeatResponse(); } break; // rpc请求消息处理 case MsgType.rpcReq: { this.connector.handleRpcReq(this, data); } break // rpc响应消息处理 case MsgType.rpcRes: { this.connector.handleRpcRes(this, data); } break; // 协议错误 default: { logger.error(`通讯协议错误 ${type}`); this.closed(1020, '网络协议错误'); } break; } } catch (e) { logger.error(`消息解析错误 ${e.stack}`); } } /** * 心跳 */ private heartbeat() { if (this.connector.heartbeatTime === 0) { return; } if (this.heartbeatTimer) { this.heartbeatTimer.refresh(); } else { this.heartbeatTimer = setTimeout(() => { this.closed(1000, '心跳超时'); }, this.connector.heartbeatTime * 2); } } /** * 心跳回包 */ private heartbeatResponse() { this.send(this.connector.heartbeatBuf); } /** * closed */ private onClose = () => { this._status = SocketStatus.ST_CLOSED; logger.info(`客户端[ ${this.clientip} ] [ ${this.sid} ] 网络连接断开`); clearTimeout(this.registerTimer); clearTimeout(this.heartbeatTimer); clearInterval(this.updateTimer); this.registerTimer = null as any; this.heartbeatTimer = null as any; this.updateTimer = null as any; this.connector.removeClient(this); } /** * 发送消息 * @param packet 数据包 */ public send(packet: Buffer): void { if (this._status !== SocketStatus.ST_BODY) { return; } this.socket.send(packet); } /** * 发送消息 * @param protocol 名字 * @param packet 数据包 */ public sendMessage(packet: any, protocol?: string) { logger.info(`发送到客户端数据[ ${protocol} ][ ${JSON.stringify(packet)} ] [ ${this.uid} ]`); this.send(this.connector.protoEncode(protocol, packet)); } /** * RPC消息 * @param protocol 名字 * @param packet 数据包 */ public rpcMessage(packet: any, protocol?: string) { logger.info(`发送到客户端RPC数据[ ${protocol} ][ ${JSON.stringify(packet)} ] [ ${this.uid} ]`); return new Promise((resolve) => { // 消息索引 const id = this.getRpcId(); // 消息队列 this.rpcRequestMap.set(id, { id, url: protocol, cb: resolve, time: Date.now() + this.rpcTimeout }) // 发送消息 this.socket.send(this.connector.rpcEncode(protocol, packet, id, MsgType.rpcReq)); }); } /** * 广播消息 * @param protocol 协议 * @param packet 数据包 */ public sendToAll(packet: any, protocol: string) { this.connector.sendToAll(packet, protocol); } /** * 断开连接 */ public closed(code: number = 1000, message: string = '连接超时') { if (this.debugLog) { logger.debug(`断开连接 ${code} ${message}`); } this._status = SocketStatus.ST_CLOSED; this.socket.close(code, message); } }