/************************************************************************ * Copyright (c) 2023 sunking framework * Author : Shao * Mail : yi-shaoye@163.com * Date : 20213-01-13 * Use : 客户端基类 ************************************************************************/ import { EventEmitter } from 'events'; import * as Loader from '../util/loader'; import myLogger from '@wingyi8/sk-logger'; import { ServerStatus, ConnectorOptions, MsgType, RpcTimeout } from '../util/define'; const logger = myLogger.getLogger('sunking/base/BaseClinet'); export abstract class BaseClient extends EventEmitter { /** * 客户端名字 */ protected name: string = ''; /** * 客户端描述 */ protected des: string = ''; /** * 客户端IP */ protected host: string = '127.0.0.1'; /** * 连接端口 */ protected port: number = 0; /** * 客户端类型 */ protected type: string = 'wsClient'; /** * 自动重连 */ protected autoConnect: boolean = true; /** * 重连间隔 */ protected reconnectDelay: number = 5000; /** * 重连次数 */ protected reconnectionAttempts: number = -1; /** * 断线重连计数 */ protected reconnectionCount: number = 0; /** * 重连计时器 */ protected ReconnectAt: number = 0; /** * 消息解码 */ protected msgDecode: (msg: Buffer) => any = this.decode; /** * 消息编码 */ protected msgEncode: (protocol: string, msg: any) => Buffer = this.encode; /** * 路由列表 */ protected routeConfig: string[] = []; /** * 路由描述 */ protected routeDes: string[] = []; /** * 消息处理列表 */ protected routeHandler: Function[] = []; /** * 心跳计时器 */ protected heartbeatTimer: NodeJS.Timeout = null as any; /** * 心跳回应超时 */ protected heartbeatTimeoutTimer: NodeJS.Timeout = null as any; /** * 客户端状态 */ protected _status: ServerStatus = ServerStatus.Closed; /** * 客户端上一次状态 */ protected PreviousState: ServerStatus = ServerStatus.Closed; /** * rpc计数器 */ protected rpcId: number = 1; /** * rpc消息队列 */ protected rpcRequestMap: Map = new Map(); /** * 超时 */ protected rpcTimeout: number = 5000; /** * 客户端当前状态 */ get status(): ServerStatus { return this._status; } /** * 消息过滤 */ protected protoFilter: (client: BaseClient, route: string, data: any) => boolean = null as any; /** * 配置客户端相关 * @param {ConnectorOptions} config 配置 */ public configure(config: ConnectorOptions) { this.name = config.name; this.des = config.des; this.type = config.type; this.host = config.host; this.port = config.port; if (typeof config.clientMgr !== 'undefined') { let clientMgr = require(`${process.cwd()}/${config.clientMgr}`); // 连接成功回调 if (typeof clientMgr.openEvent != 'undefined') { this.on('openEvent', clientMgr.openEvent); } // 消息回调 if (typeof clientMgr.messageEvent != 'undefined') { this.on('messageEvent', clientMgr.messageEvent); } // 连接关闭回调 if (typeof clientMgr.closeEvent != 'undefined') { this.on('closeEvent', clientMgr.closeEvent); } // 错误回调 if (typeof clientMgr.errorEvent != 'undefined') { this.on('errorEvent', clientMgr.errorEvent); } // 重连回调 if (typeof clientMgr.reconnectEvent != 'undefined') { this.on('reconnectEvent', clientMgr.reconnectEvent) }; }; // 客户端路由注册 if (typeof config.routeConfig !== 'undefined') { this.routeConfig = []; this.routeDes = []; this.routeHandler = []; Loader.load(`${config.routeConfig}`, `/`, `all`, (handle: Loader.ServerEventHandler, index: number) => { this.registerRoute(handle, index); }); } // 通讯数据预处理 if (typeof config.verifyMsg !== 'undefined') { let m = require(`${process.cwd()}/${config.verifyMsg}`); if (null != m['msgDecode']) { this.msgDecode = m['msgDecode']; } if (null != m['msgDecode']) { this.msgEncode = m['msgEncode']; } } // 消息过滤 if (typeof config.protoFilter != 'undefined') { this.protoFilter = require(`${process.cwd()}/${config.protoFilter}`); } } /** * 路由注册 * @param handle 路由协议 * @param count 编号 */ protected registerRoute(handle: Loader.ServerEventHandler, count: number) { this.routeConfig.push(handle.url); this.routeDes.push(handle.description); this.routeHandler.push(handle.handler); logger.info(`读取协议处理成功! url[${handle.url}] cmd[${count}]`); } /** * 协议解码 * @param data 数据包 * @returns 协议解码数据 */ protected protoDecode(data: Buffer) { return JSON.parse(data.subarray(1).toString() || `{}`); } /** * 协议编码 * @param protocol 协议 * @param packet 数据包 */ public protoEncode(protocol: string, packet: any): Buffer { let msgBuf: Buffer = this.msgEncode(protocol, packet); let buf = Buffer.allocUnsafe(msgBuf.length + 5); buf.writeUInt32BE(msgBuf.length, 0); buf.writeUInt8(MsgType.msg, 4); msgBuf.copy(buf, 5); return buf; } /** * 默认解码器 * @param msg 解码数据包 */ protected decode(msg: Buffer) { return { 'body': JSON.parse(msg.toString() || '{}') }; } /** * 默认编码器 * @param msg 编码数据包 */ protected encode(cmd: string, msg: any) { return Buffer.from(JSON.stringify(msg)); } /** * 获取rpc索引 */ protected getRpcId(): number { const id = this.rpcId++; if (this.rpcId > 99999999) { this.rpcId = 1; } return id; } /** * 启动客户端 */ public abstract start(): Promise; /** * 停止客户端 */ public abstract stop(): void; /** * 显示客户端信息 */ public abstract show(): void; /** * 发送消息 * @param protocol 名字 * @param packet 数据包 */ public abstract sendMessage(url: string, packet: any, protocol?: string): void; /** * 发包 * @param msg 消息 */ public abstract send(msg: Buffer): void; }