/************************************************************************ * Copyright (c) 2023 sunking framework * Author : Shao * Mail : yi-shaoye@163.com * Date : 2023-01-13 * Use : ws客户端对象 ************************************************************************/ import myLogger from '@wingyi8/sk-logger'; import { BaseClient } from "../base/BaseClient"; import { WebSocketTransport } from './WebSocketTransport'; import { MsgType, ServerStatus } from '../util/define'; const logger = myLogger.getLogger('sunking/client/WsClient'); export class WsClient extends BaseClient { /** * socket对象 */ private socket: WebSocketTransport = null; /** * 开始连接服务器 */ private ConnectionStarted: number = 0; /** * 超时时间 */ private Timeout: number = 20000; /** * 定时器 */ private updateTimer: NodeJS.Timeout = null; /** * 开启客户端连接 */ public start(): Promise { if (this.socket) { throw new Error('Client already started'); } this._status = ServerStatus.Initial; // 初始化状态 this.PreviousState = ServerStatus.Initial; // 重新计数 this.reconnectionCount = 0; return new Promise(rs => { // 启动链接 this.open(); // 开启循环 this.updateTimer = setInterval(this.onUpdate, 1000); rs(); }); }; /** * 停止服务器 */ public stop(): void { logger.info('Stopping client...'); this._status = ServerStatus.Closed; if (this.socket != null) { this.socket.close(); } this.socket = null; this.emit('closeEvent', this); clearInterval(this.updateTimer); }; /** * 显示服务器信息 */ public show() { logger.info(`创建 ${this.type} 客户端 [${this.des}] ${this.type.replace(/Client/, '')}://${this.host}:${this.port}`); }; /** * 开启连接 */ private open(): void { // 检测状态 if (this._status != ServerStatus.Initial && this._status != ServerStatus.Closed && this._status != ServerStatus.Reconnecting) return; if (this.socket != null) { this.socket.close(); } else { // 创建客户端连接 this.socket = new WebSocketTransport(this, this.type.replace(/Client/, ''), this.host, this.port); } // 开启连接 this.socket.open(); // 切换状态 this._status = ServerStatus.Opening; // 开启计时 this.ConnectionStarted = Date.now(); } /** * 循环逻辑 */ private onUpdate = () => { switch (this._status) { // 正在连接 case ServerStatus.Opening: { if (Date.now() - this.ConnectionStarted >= this.Timeout) { // 发送错误信息 this.emit('errorEvent', this, 'Connection timed out!'); this.tryToReconnect(); } } break; // 尝试重连 case ServerStatus.Reconnecting: { if (this.ReconnectAt != 0 && Date.now() >= this.ReconnectAt) { this.open(); } } break; } // rpc 超时检测 if (this.rpcRequestMap.size > 0) { let deleteCount: Array = []; const now = Date.now(); this.rpcRequestMap.forEach((one) => { if (one.time < now) { one.cb(null); deleteCount.push(one.id); } }) deleteCount.map(id => { this.rpcRequestMap.delete(id); }) } } /** * 处理传送器开启 */ public onTransportOpen(): void { // 记录当前状态 this._status = ServerStatus.Opened; // 连接服务器回调 this.emit('openEvent', this, this.PreviousState == ServerStatus.Opened); // 记录最后状态 this.PreviousState = ServerStatus.Opened; // 计数器 this.reconnectionCount = 0; } /** * 处理传送器错误 */ public onTransportError(error: string): void { // 关闭传送器 this.socket.close(); // 发送错误信息 this.emit('errorEvent', this, error); // 尝试重连 this.tryToReconnect(); } /** * 尝试重连 */ private tryToReconnect(): void { if (this._status == ServerStatus.Reconnecting || this._status == ServerStatus.Closed) return; // 检测自动重连 if (!this.autoConnect) { this.stop(); return; } // 超过重连次数 if (this.reconnectionAttempts > 0 && ++this.reconnectionCount >= this.reconnectionAttempts) { this.stop(); return; } // 记录重连时间 this.ReconnectAt = Date.now() + this.reconnectDelay; // 切换状态 this._status = ServerStatus.Reconnecting // socket连接 this.socket.open(); // 重连事件 this.emit('reconnectEvent', this, this.reconnectionCount, this.reconnectionAttempts); } /** * 发送消息 * @param packet 数据包 */ public send(packet: Buffer): void { if (this._status !== ServerStatus.Opened) { return; } this.socket.send(packet); } /** * 发送消息 * @param url 消息路由 * @param protocol 名字 * @param packet 数据包 */ public sendMessage(url: string, packet: any, protocol?: string) { logger.info(`发送到服务器数据[ ${url} ][ ${protocol} ][ ${JSON.stringify(packet)} ] [ ${this.des} ]`); this.socket.sendMessage(url, this.msgEncode(protocol, packet)); } /** * RPC消息 * @param url 消息路由 * @param protocol 名字 * @param packet 数据包 */ public rpcMessage(url: string, packet: any, protocol?: string): Promise { logger.info(`发送到服务器RPC数据[ ${url} ][ ${protocol} ][ ${JSON.stringify(packet)} ] [ ${this.des} ]`); return new Promise((resolve) => { // 消息索引 const id = this.getRpcId(); // 消息队列 this.rpcRequestMap.set(id, { id, url, cb: resolve, time: Date.now() + this.rpcTimeout }) // 发送消息 this.socket.rpcMessage(url, this.msgEncode(protocol, packet), id); }); } /** * 协议解码 * @param data 数据包 * @returns 协议解码数据 */ protected protoDecode(data: Buffer) { let msg = super.protoDecode(data); const cmd = this.routeConfig.indexOf(msg.cmd); if (cmd < 0) { logger.error(`[ ${this.name}-wsClient ]收到未知消息 cmd[ ${msg.cmd} ]`); } msg.cmd = cmd; return msg; } /** * rpc协议解码 * @param data 数据包 * @returns 协议解码数据 */ protected rpcDecode(data: Buffer) { const id = data.readUInt32BE(1); if (id < 0) { logger.error(`[ ${this.name}-wsClient ]收到未知消息 id[ ${id} ]`); } let msg = JSON.parse(data.subarray(5).toString() || `{}`); if (msg.cmd != '') { const cmd = this.routeConfig.indexOf(msg.cmd); if (cmd < 0) { logger.error(`[ ${this.name}-wsClient ]收到未知消息 cmd[ ${msg.cmd} ]`); } msg.cmd = cmd; } msg.id = id; return msg; } /** * rpc协议编码 * @param protocol 协议 * @param packet 数据包 */ public rpcEncode(packet: any, id: number, msgType: MsgType): Buffer { const msgBuf: Buffer = this.msgEncode(null, packet); const msgLen = msgBuf.length + 11; const buf = Buffer.allocUnsafe(msgLen); buf.writeUInt32BE(msgLen, 0); buf.writeUInt8(msgType, 4); buf.writeUInt16BE(0, 5); buf.writeUInt32BE(id, 7); msgBuf.copy(buf, 11); return buf; } /** * 默认解码器 * @param msg 解码数据包 */ protected decode(msg: Buffer) { return { 'body': msg }; } /** * 处理消息 * @param msgBuf 数据包 */ public handleMsg(msgBuf: Buffer) { try { // 协议解析 let data = this.protoDecode(msgBuf); if (data.cmd < 0) { return; } if (this.routeHandler.length > data.cmd) { // 前置消息过滤 if (this.protoFilter && this.protoFilter(this, this.routeConfig[data.cmd], data)) { return; } // 数据处理 let msg = this.msgDecode(data.msg); logger.info(`[ ${this.name}-wsClient ]数据协议解析 urlpath[ ${this.routeConfig[data.cmd]} ] data[ ${JSON.stringify(msg.body)} ]`); this.routeHandler[data.cmd](msg, this); } else { logger.error(`[ ${this.name}-wsClient ]收到未知消息 cmd[ ${data.cmd} ]`); } } catch (e: any) { logger.error(`[ ${this.name}-wsClient ] 数据验证未通过! ${e.stack}`); } } /** * 处理rpc请求消息 * @param msgBuf 数据包 */ public async handleRpcReq(msgBuf: Buffer) { try { // 协议解析 let data = this.rpcDecode(msgBuf); if (data.id < 0) { return; } if (this.routeHandler.length > data.cmd) { // 前置消息过滤 if (this.protoFilter && this.protoFilter(this, this.routeConfig[data.cmd], data)) { return; } // 数据处理 let msg = this.msgDecode(data.msg); logger.info(`[ ${this.name}-wsClient ]数据协议解析 urlpath[ ${this.routeConfig[data.cmd]} ] data[ ${JSON.stringify(msg.body)} ]`); // 路由逻辑处理 const packet = await this.routeHandler[data.cmd](msg, this); logger.info(`发送到服务器rpc数据[ ${data.id} ][ ${JSON.stringify(packet)} ] [ ${this.des} ]`); this.send(this.rpcEncode(packet, data.id, MsgType.rpcRes)); } else { logger.error(`[ ${this.name}-wsClient ]收到未知消息 cmd[ ${data.cmd} ]`); } } catch (e: any) { logger.error(`[ ${this.name}-wsClient ] 数据验证未通过! ${e.stack}`); } } /** * 处理rpc响应消息 * @param msgBuf 数据包 */ public handleRpcRes(msgBuf: Buffer) { try { // 协议解析 let data = this.rpcDecode(msgBuf); if (data.id < 0) { return; } if (this.rpcRequestMap.has(data.id)) { // 前置消息过滤 if (this.protoFilter && this.protoFilter(this, '', data)) { return; } // 数据处理 let msg = this.msgDecode(data.msg); const rpcInfo = this.rpcRequestMap.get(data.id); logger.info(`[ ${this.name}-wsClient ]数据协议解析 urlpath[ ${rpcInfo.url} ] data[ ${JSON.stringify(msg.body)} ]`); rpcInfo.cb(msg.body); this.rpcRequestMap.delete(data.id); } else { logger.error(`[ ${this.name}-wsClient ]收到未知消息 id[ ${data.id} ]`); } } catch (e: any) { logger.error(`[ ${this.name}-wsClient ] 数据验证未通过! ${e.stack}`); } } }