/************************************************************************ * Copyright (c) 2023 sunking framework * Author : Shao * Mail : yi-shaoye@163.com * Date : 2021-10-28 * Use : 网络模块基类 ************************************************************************/ import { EventEmitter } from 'events'; import * as crypto from 'crypto'; import { BaseSocket } from './BaseSocket'; import * as Loader from '../util/loader'; import myLogger from '@wingyi8/sk-logger'; import { ServerStatus, ConnectorOptions, MsgType } from '../util/define'; const logger = myLogger.getLogger('sunking/base/BaseConnector'); /** * 服务器的抽象基类. * 通过扩展来实现传输协议(如HTTP WebSocket). * @typeParam ServiceType - `ServiceType` from generated `proto.ts` */ export abstract class BaseConnector extends EventEmitter { /** * 服务器名字 */ protected name: string = ''; /** * 服务器描述 */ protected des: string = ''; /** * 服务器IP */ protected host: string = '127.0.0.1'; /** * 监听端口 */ protected port: number = 0; /** * 服务器类型 */ protected type: string = 'ws'; /** * 服务器密钥路径 */ protected key: string = ''; /** * 服务器证书路径 */ protected cert: string = ''; /** * 连接上限 */ protected maxConnectionNum: number = Number.POSITIVE_INFINITY; /** * 客户端列表 */ protected ClientAry: Set = new Set(); /** * 消息解码 */ protected msgDecode: (msg: Buffer) => any = this.decode; /** * 消息编码 */ protected msgEncode: (protocol: string, msg: any) => Buffer = this.encode; /** * 路由列表 */ protected routeConfig: string[] = []; /** * 路由描述 */ protected routeDes: string[] = []; /** * 消息处理列表 */ protected routeHandler: Function[] = []; /** * 路由列表md5 */ protected md5: string = ''; /** * 握手重连数据 */ protected handshakeBuf: Buffer = null as any; /** * 握手全部数据 */ protected handshakeBufAll: Buffer = null as any; /** * 心跳数据 */ public heartbeatBuf: Buffer; /** * 心跳时间 */ public heartbeatTime: number = 60; /** * 服务器状态 */ protected _status: ServerStatus = ServerStatus.Closed; /** * 调试日志 */ protected debugLog: boolean = false; /** * 服务器当前状态 */ get status(): ServerStatus { return this._status; } /** * 消息过滤 */ protected protoFilter: (client: BaseSocket, route: string, data: any) => boolean = null as any; /** * 路由中间件 */ protected middleware: (data: any, client: BaseSocket) => boolean = null as any; /** * 后处理 */ public postProcessing: (data: any, client: BaseSocket, packet: any, protocol: string) => void = null as any; /** * 配置服务器相关 * @param {ConnectorOptions} config 配置 */ public configure(config: ConnectorOptions) { this.name = config.name; this.des = config.des; this.type = config.type; this.host = config.host; this.port = config.port; this.key = config.key || ''; this.cert = config.cert || ''; this.maxConnectionNum = config.maxConnectionNum || Number.POSITIVE_INFINITY; this.heartbeatTime = (config.heartbeat || 0) * 1000; if (typeof config.debugLog != 'undefined') { this.debugLog = config.debugLog; } if (typeof config.clientMgr !== 'undefined') { let clientMgr = require(`${process.cwd()}/${config.clientMgr}`); // 客户端上线 if (typeof clientMgr.connection != 'undefined') { this.on('connection', clientMgr.connection); } // 客户端下线 if (typeof clientMgr.disconnect !== 'undefined') { this.on('disconnect', clientMgr.disconnect); }; }; // 服务器路由注册 config.routeConfig = config.routeConfig || 'handle'; this.routeConfig = []; this.routeHandler = []; this.routeDes = []; Loader.load(`${config.routeConfig}`, config.prefix || '/', config.method || 'all', (handle: Loader.ServerEventHandler, index: number) => { this.registerRoute(handle, index); }); // 通讯数据预处理 if (typeof config.verifyMsg !== 'undefined') { let m = require(`${process.cwd()}/${config.verifyMsg}`); if (null != m['msgDecode']) { this.msgDecode = m['msgDecode']; } if (null != m['msgDecode']) { this.msgEncode = m['msgEncode']; } } // 消息过滤 if (typeof config.protoFilter != 'undefined') { let m = require(`${process.cwd()}/${config.protoFilter}`); if (null != m['protoFilter']) { this.protoFilter = m['protoFilter']; } else if (null != m['default']) { this.protoFilter = m['default']; } } // 路由中间件 if (typeof config.middleware != 'undefined') { let m = require(`${process.cwd()}/${config.middleware}`); if (null != m['middleware']) { this.middleware = m['middleware']; } else if (null != m['default']) { this.middleware = m['default']; } } // 后处理 if (typeof config.postProcessing != 'undefined') { let m = require(`${process.cwd()}/${config.postProcessing}`); if (null != m['postProcessing']) { this.postProcessing = m['postProcessing']; } else if (null != m['default']) { this.postProcessing = m['default']; } } }; /** * 路由注册 * @param handle 路由协议 * @param count 编号 */ protected registerRoute(handle: Loader.ServerEventHandler, count: number) { this.routeConfig.push(handle.url); this.routeDes.push(handle.description); this.routeHandler.push(handle.handler); logger.info(`读取协议处理成功! [${handle.description}] url[${handle.url}] cmd[${count}]`); } /** * 启动服务器 */ public abstract start(): Promise; /** * 停止服务器 */ public abstract stop(): void; /** * 显示服务器信息 */ public abstract show(): void; /** * 初始化缓存 */ protected onInitBuffer(): void { // 计算md5 let cipher = crypto.createHash('md5') this.md5 = cipher.update(JSON.stringify(this.routeConfig)).digest('hex'); // 握手重连数据 let routeBuf = Buffer.from(JSON.stringify({ 'md5': this.md5, 'heartbeat': this.heartbeatTime / 1000 })); this.handshakeBuf = Buffer.alloc(routeBuf.length + 5); this.handshakeBuf.writeUInt32BE(routeBuf.length, 0); this.handshakeBuf.writeUInt8(MsgType.handshake, 4); routeBuf.copy(this.handshakeBuf, 5); // 握手全部数据 let routeBufAll = Buffer.from(JSON.stringify({ 'md5': this.md5, 'route': this.routeConfig, 'heartbeat': this.heartbeatTime / 1000 })); this.handshakeBufAll = Buffer.alloc(routeBufAll.length + 5); this.handshakeBufAll.writeUInt32BE(routeBufAll.length, 0); this.handshakeBufAll.writeUInt8(MsgType.handshake, 4); routeBufAll.copy(this.handshakeBufAll, 5); // 心跳数据 this.heartbeatBuf = Buffer.alloc(5); this.heartbeatBuf.writeUInt32BE(1, 0); this.heartbeatBuf.writeUInt8(MsgType.heartbeat, 4); } /** * 获取握手数据 * @param md5 验证码 */ public getHandshake(md5: string): Buffer { if (md5 === this.md5) { return this.handshakeBuf; } return this.handshakeBufAll; } /** * 协议解码 * @param data 数据包 * @returns 协议解码数据 */ protected protoDecode(client: BaseSocket, data: Buffer) { return { 'cmd': data.readUInt16BE(1), 'msg': data.subarray(3) } } /** * rpc协议解码 * @param data 数据包 * @returns 协议解码数据 */ protected rpcDecode(client: BaseSocket, data: Buffer) { return { 'cmd': data.readUInt16BE(1), 'id': data.readUInt32BE(3), 'msg': data.subarray(7) } } /** * 协议编码 * @param protocol 协议 * @param packet 数据包 */ public protoEncode(protocol: string, packet: any): Buffer { let msgBuf: Buffer = this.msgEncode(protocol, packet); let buf = Buffer.allocUnsafe(msgBuf.length + 5); buf.writeUInt32BE(msgBuf.length, 0); buf.writeUInt8(MsgType.msg, 4); msgBuf.copy(buf, 5); return buf; } /** * rpc协议编码 * @param protocol 协议 * @param packet 数据包 */ public rpcEncode(protocol: string, packet: any, id: number, msgType: MsgType): Buffer { const msgBuf: Buffer = this.msgEncode(protocol, packet); const buf = Buffer.allocUnsafe(msgBuf.length + 9); buf.writeUInt32BE(msgBuf.length + 4, 0); buf.writeUInt8(msgType, 4); buf.writeUInt32BE(id, 5); msgBuf.copy(buf, 9); return buf; } /** * 默认解码器 * @param msg 解码数据包 */ protected decode(msg: Buffer) { return { 'body': JSON.parse(msg.toString()) } } /** * 默认编码器 * @param msg 编码数据包 */ protected encode(cmd: string, msg: any) { return Buffer.from(JSON.stringify({ cmd, msg })); } /** * 处理消息 * @param client 客户端 * @param msgBuf 数据包 * @returns */ public async handleMsg(client: BaseSocket, msgBuf: Buffer) { try { // 协议解析 let data = this.protoDecode(client, msgBuf); if (data.cmd >= 0 && this.routeHandler.length > data.cmd) { // 前置消息过滤 if (this.protoFilter && await this.protoFilter(client, this.routeConfig[data.cmd], data)) { return; } // 数据处理 let msg = this.msgDecode(data.msg); msg.url = this.routeConfig[data.cmd]; logger.info(`[ ${this.name}-Server ]数据协议解析 urlpath[ ${this.routeConfig[data.cmd]} ] [ ${this.routeDes[data.cmd]} ] data[ ${JSON.stringify(msg.body)} ] [ ${client.uid} ]`); // 路由中间件 if (this.middleware && await this.middleware(msg, client)) { return; } // 后处理 if (this.postProcessing) { const sendMessage = client.sendMessage; client.sendMessage = (packet: any, protocol: string) => { sendMessage.call(client, packet, protocol); this.postProcessing(msg, client, packet, protocol) } } // 路由逻辑处理 this.routeHandler[data.cmd](msg, client); } else { logger.error(`[ ${this.name}-Server ]收到未知请求 cmd[ ${data.cmd} ]`); } } catch (e: any) { logger.error(`[ ${this.name}-Server ] 数据验证未通过! ${e.stack}`); } } /** * 处理消息 * @param client 客户端 * @param msgBuf 数据包 */ public async handleRpcReq(client: BaseSocket, msgBuf: Buffer) { try { // 协议解析 let data = this.rpcDecode(client, msgBuf); if (data.cmd >= 0 && this.routeHandler.length > data.cmd) { // 前置消息过滤 if (this.protoFilter && await this.protoFilter(client, this.routeConfig[data.cmd], data)) { return; } // 数据处理 let msg = this.msgDecode(data.msg); msg.url = this.routeConfig[data.cmd]; logger.info(`[ ${this.name}-Server ]数据协议解析 urlpath[ ${this.routeConfig[data.cmd]} ] [ ${this.routeDes[data.cmd]} ] data[ ${JSON.stringify(msg.body)} ] [ ${client.uid} ]`); // 路由中间件 if (this.middleware && await this.middleware(msg, client)) { return; } // 路由逻辑处理 const packet = await this.routeHandler[data.cmd](msg, client); logger.info(`发送到客户端rpc数据[ ${data.id} ][ ${JSON.stringify(packet)} ] [ ${client.uid} ]`); client.send(this.rpcEncode('', packet, data.id, MsgType.rpcRes)); } else { logger.error(`[ ${this.name}-Server ]收到未知请求 cmd[ ${data.cmd} ]`); } } catch (e: any) { logger.error(`[ ${this.name}-Server ] 数据验证未通过! ${e.stack}`); } } /** * 处理消息 * @param client 客户端 * @param msgBuf 数据包 */ public async handleRpcRes(client: BaseSocket, msgBuf: Buffer) { try { // 协议解析 let data = this.rpcDecode(client, msgBuf); if (data.id < 0) { return; } if (client.rpcRequestMap.has(data.id)) { // 前置消息过滤 if (this.protoFilter && await this.protoFilter(client, this.routeConfig[data.cmd], data)) { return; } // 数据处理 let msg = this.msgDecode(data.msg); // 路由中间件 if (this.middleware && await this.middleware(msg, client)) { return; } const rpcInfo = client.rpcRequestMap.get(data.id); logger.info(`[ ${this.name}-Server ]数据协议解析 urlpath[ ${rpcInfo.url} ] data[ ${JSON.stringify(msg.body)} ]`); rpcInfo.cb(msg.body); client.rpcRequestMap.delete(data.id); } else { logger.error(`[ ${this.name}-Server ]收到未知请求 cmd[ ${data.cmd} ]`); } } catch (e: any) { logger.error(`[ ${this.name}-Server ] 数据验证未通过! ${e.stack}`); } } /** * 添加客户端 * @param client 客户端对象 */ public addClient(client: BaseSocket): void { this.ClientAry.add(client); this.emit('connection', client); } /** * 移除客户端 * @param client 客户端对象 */ public removeClient(client: BaseSocket): void { this.ClientAry.delete(client); this.emit('disconnect', client); } /** * 广播消息 * @param protocol 协议 * @param packet 数据包 */ public sendToAll(packet: any, protocol: string): void { logger.info('发送到全部客户端数据[ %s ][ %s ]', protocol, JSON.stringify(packet)); let buff = this.protoEncode(protocol, packet); this.ClientAry.forEach((client: BaseSocket) => { client.send(buff); }); } }