/************************************************************************ * Copyright (c) 2023 sunking framework * Author : Shao * Mail : yi-shaoye@163.com * Date : 2023-01-13 * Use : ws客户端对象-传输模块 ************************************************************************/ import { EventEmitter } from 'events'; import { WebSocket, Event, ErrorEvent, CloseEvent, MessageEvent } from 'ws'; import myLogger from '@wingyi8/sk-logger'; import { WsClient } from './WsClient'; import { MsgType, enTransportStates } from '../util/define'; const logger = myLogger.getLogger('sunking/client/WebSocketTransport'); export class WebSocketTransport extends EventEmitter { /** * 状态 */ protected State: enTransportStates = enTransportStates.Closed; /** * 管理器 */ protected Manager: WsClient = null; /** * 客户端连接器 */ protected Implementation: WebSocket = null; /** * 客户端IP */ protected host: string = '127.0.0.1'; /** * 连接端口 */ protected port: number = 0; /** * 客户端类型 */ protected type: string = 'wsClient'; /** * 路由列表 */ protected routeConfig: string[] = []; /** * 消息处理列表 */ protected routeHandler: Function[] = []; /** * 路由列表md5 */ protected md5: string = ''; /** * 心跳数据 */ public heartbeatBuf: Buffer; /** * 心跳计时器 */ protected heartbeatTimer: NodeJS.Timeout = null as any; /** * 心跳回应超时 */ protected heartbeatTimeoutTimer: NodeJS.Timeout = null as any; constructor(manager: WsClient, type: string, host: string, port: number) { super(); this.Manager = manager; this.type = type; this.host = host; this.port = port; // 心跳数据 this.heartbeatBuf = Buffer.alloc(5); this.heartbeatBuf.writeUInt32BE(1, 0); this.heartbeatBuf.writeUInt8(MsgType.heartbeat, 4); } /** * 开启连接 */ public open(): void { // 检测状态 if (this.State != enTransportStates.Closed) return; // 创建客户端连接 this.Implementation = new WebSocket(`${this.type}://${this.host}:${this.port}`); // 连接 this.Implementation.onopen = this.onOpen; // 关闭 this.Implementation.onclose = this.onClosed; // 错误 this.Implementation.onerror = this.onError; // 消息 this.Implementation.onmessage = this.onMessageReceived; // 标记状态 this.State = enTransportStates.Connecting; } /** * 关闭传输并清理资源。 */ public close(): void { if (this.State == enTransportStates.Closed) return; this.State = enTransportStates.Closed; if (this.Implementation != null) { this.Implementation.close(); } else { logger.error("WebSocketTransport Close - WebSocket 实例为空!"); } this.Implementation = null; if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = null; } if (this.heartbeatTimeoutTimer) { clearTimeout(this.heartbeatTimeoutTimer); this.heartbeatTimeoutTimer = null; } } /** * 发送消息 * @param protocol 名字 * @param packet 数据包 */ public sendMessage(url: string, msgBuf: Buffer) { let cmd = this.routeConfig.indexOf(url); if (cmd < 0) { logger.error(`协议编码错误 ${url}`); return; } let buf = Buffer.allocUnsafe(msgBuf.length + 7); buf.writeUInt32BE(msgBuf.length + 7, 0); buf.writeUInt8(MsgType.msg, 4); buf.writeUInt16BE(cmd, 5); msgBuf.copy(buf, 7); this.send(buf); } /** * 发送消息 * @param protocol 名字 * @param packet 数据包 */ public rpcMessage(url: string, msgBuf: Buffer, id: number) { const cmd = this.routeConfig.indexOf(url); if (cmd < 0) { logger.error(`协议编码错误 ${url}`); return; } const msgLen = msgBuf.length + 11; const buf = Buffer.allocUnsafe(msgLen); buf.writeUInt32BE(msgLen, 0); buf.writeUInt8(MsgType.rpcReq, 4); buf.writeUInt16BE(cmd, 5); buf.writeUInt32BE(id, 7); msgBuf.copy(buf, 11); this.send(buf); } /** * 发送消息 * @param packet 数据包 */ public send(packet: Buffer): void { if (this.State !== enTransportStates.Open) { return; } this.Implementation.send(packet); } /** * 网络连接成功回调 */ private onOpen = (event: Event): void => { this.State = enTransportStates.Open; // 发送握手消息 let routeBuf = Buffer.from(JSON.stringify({ "md5": this.md5 })); let handshakeBuf = Buffer.alloc(routeBuf.length + 5); handshakeBuf.writeUInt32BE(routeBuf.length + 1, 0); handshakeBuf.writeUInt8(MsgType.handshake, 4); routeBuf.copy(handshakeBuf, 5); this.send(handshakeBuf); } /** * 服务器消息回调 */ private onMessageReceived = (event: MessageEvent) => { if (event.data instanceof Buffer) { let data: Buffer = event.data; let index = 0; while (index < data.length) { let msgLen = data.readUInt32BE(index); this.onPakcet(data.subarray(index + 4, index + 5 + msgLen)); index += msgLen + 5; } } } /** * 连接关闭回调 */ private onClosed = (event: CloseEvent): void => { logger.error(`连接关闭 ${event.code}`); this.close(); // 返回消息 this.Manager.onTransportError(`连接关闭 ${event.code}`); } /** * 连接错误回调 */ private onError = (event: ErrorEvent): void => { logger.error(`连接错误 ${event.message}`); // 返回消息 this.Manager.onTransportError(event.message); } /** * 消息解析 * @param data 数据包 */ private onPakcet(data: Buffer) { try { let type: MsgType = data.readUInt8(0); switch (type) { // 握手 case MsgType.handshake: { this.handshake(data); } break; // 自定义消息处理 case MsgType.msg: { this.Manager.handleMsg(data); } break // 心跳 case MsgType.heartbeat: { if (this.heartbeatTimeoutTimer) { clearTimeout(this.heartbeatTimeoutTimer); this.heartbeatTimeoutTimer = null; } } break; // rpc请求消息处理 case MsgType.rpcReq: { this.Manager.handleRpcReq(data); } break // rpc响应消息处理 case MsgType.rpcRes: { this.Manager.handleRpcRes(data); } break // 协议错误 default: { logger.error(`通讯协议错误 ${type}`); this.close(); } break; } } catch (e) { logger.error(`消息解析错误 ${e.stack}`); } } /** * 握手消息 */ private handshake(data: Buffer) { let msg: { "md5": string, "route": Array, "heartbeat": number, } = null as any; try { msg = JSON.parse(data.subarray(1).toString()); } catch (e) { logger.error(e.stack); this.Manager.onTransportError('握手消息错误!'); return; } if (msg.heartbeat > 0) { // 启动心跳检测 if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = null; } this.heartbeatTimer = setInterval(() => { this.send(this.heartbeatBuf); // 启动心跳超时检测 if (this.heartbeatTimeoutTimer) { clearTimeout(this.heartbeatTimeoutTimer); this.heartbeatTimeoutTimer = null; } this.heartbeatTimeoutTimer = setTimeout(() => { this.Manager.onTransportError('心跳回调超时!'); }, 4 * 1000); }, msg.heartbeat * 1000); } this.md5 = msg.md5; // 缓存路由配置 if (msg.route != null) { this.routeConfig = []; for (let i = 0; i < msg.route.length; ++i) { this.routeConfig.push(msg.route[i]); } } // 服务器连接成功回调 this.Manager.onTransportOpen(); } }