/** * rpc connection management, sending rpc messages */ import Application from '../application'; import { I_rpcTimeout, I_rpcMsg, ServerInfo } from '../util/interfaceDefine'; import * as path from 'path'; import * as fs from 'fs'; import define = require('../util/define'); import * as appUtil from '../util/appUtil'; let app: Application; const msgHandler = new Map(); let rpcId = 1; // Must start from 1, not 0 const rpcRequest = new Map(); let rpcTimeMax: number = 10 * 1000; // overtime time let outTime = 0; // Current time + timeout const msgQueueDic = new Map>(); let msgCacheCount = 5000; /** * init * @param _app */ export function init(_app: Application) { app = _app; const rpcConfig = app.someconfig.rpc || {}; const rpcMsgCacheCount = parseInt(rpcConfig.rpcMsgCacheCount as any); if (rpcMsgCacheCount >= 0) { msgCacheCount = rpcMsgCacheCount; } const timeout = Number(rpcConfig.timeout) || 0; if (timeout >= 5) { rpcTimeMax = timeout * 1000; } outTime = Date.now() + rpcTimeMax; setInterval(() => { outTime = Date.now() + rpcTimeMax; }, 100); setInterval(checkTimeout, 2000); return new RpcCreate(); } export function rpcOnNewSocket(sid: string) { const queue = msgQueueDic.get(sid); if (!queue) { return; } msgQueueDic.delete(sid); for (const one of queue) { sendTo(sid, one.rpcTimeout, one.buf); } } /** * Process rpc messages * * [1] [1] [...] [...] * msgType rpcBufLen rpcBuf msgBuf */ export function handleMsg(sid: string, bufAll: Buffer) { const rpcBufLen = bufAll.readUInt8(1); const rpcMsg: I_rpcMsg = JSON.parse(bufAll.subarray(2, 2 + rpcBufLen).toString()); const msg: any[] = JSON.parse(bufAll.subarray(2 + rpcBufLen).toString()); if (!rpcMsg.cmd && rpcMsg.id) { const timeout = rpcRequest.get(rpcMsg.id); if (timeout) { rpcRequest.delete(rpcMsg.id); timeout.cb(...msg); } } else if (rpcMsg.cmd) { const cmd = (rpcMsg.cmd).split('.'); if (rpcMsg.id) { msg.push(getCallBackFunc(sid, rpcMsg.id)); } msgHandler.get(cmd[0])[cmd[1]](...msg); } } export async function handleMsgAwait(sid: string, bufAll: Buffer) { const rpcBufLen = bufAll.readUInt8(1); const rpcMsg: I_rpcMsg = JSON.parse(bufAll.slice(2, 2 + rpcBufLen).toString()); const msg = JSON.parse(bufAll.slice(2 + rpcBufLen).toString()); if (!rpcMsg.cmd && rpcMsg.id) { const timeout = rpcRequest.get(rpcMsg.id); if (timeout) { rpcRequest.delete(rpcMsg.id); timeout.cb(msg); } } else if (rpcMsg.cmd) { const cmd = (rpcMsg.cmd).split('.'); let data = await msgHandler.get(cmd[0])[cmd[1]](...msg); if (!rpcMsg.id) { return; } if (data === undefined) { data = null; } const bufEnd = getRpcMsg({ id: rpcMsg.id }, Buffer.from(JSON.stringify(data)), define.Rpc_Msg.rpcMsgAwait); sendTo(sid, null, bufEnd); } } /** * rpc structure */ class RpcCreate { private toId: string = ''; private notify: boolean = false; private readonly rpcObj: Rpc = {}; constructor() { this.loadRemoteMethod(); } loadRemoteMethod() { const self = this; app.rpc = this.rpcFunc.bind(this); const tmp_rpc_obj = this.rpcObj as any; const dirName = path.join(app.base, define.some_config.File_Dir.Servers); const exists = fs.existsSync(dirName); if (!exists) { return; } const thisSvrHandler: Array<{ filename: string; Con: any }> = []; fs.readdirSync(dirName).forEach(function (serverName) { const needRpc = !app.noRpcMatrix[appUtil.getNoRpcKey(app.serverType, serverName)]; if (!needRpc && serverName !== app.serverType) { return; } const remoteDirName = path.join(dirName, serverName, '/remote'); const exists = fs.existsSync(remoteDirName); if (exists) { if (needRpc) { tmp_rpc_obj[serverName] = {}; } fs.readdirSync(remoteDirName).forEach(function (fileName) { if (!fileName.endsWith('.js')) { return; } const fileBasename = path.basename(fileName, '.js'); const remote = require(path.join(remoteDirName, fileName)); if (remote.default && typeof remote.default === 'function') { if (needRpc) { tmp_rpc_obj[serverName][fileBasename] = self.initFunc(serverName, fileBasename, remote.default.prototype, Object.getOwnPropertyNames(remote.default.prototype)); } if (serverName === app.serverType) { thisSvrHandler.push({ filename: fileBasename, Con: remote.default }); } } }); } }); for (const one of thisSvrHandler) { msgHandler.set(one.filename, new one.Con(app)); } } rpcFunc(serverId: string, notify = false) { this.toId = serverId; this.notify = notify; return this.rpcObj; } initFunc(serverType: string, filename: string, func: any, funcFields: string[]) { const res: Record = {}; for (const field of funcFields) { if (field !== 'constructor' && typeof func[field] === 'function') { res[field] = this.proxyCb({ serverType, file_method: filename + '.' + field }); } } return res; } proxyCb(cmd: { 'serverType': string; 'file_method': string }) { const self = this; const func = async function (...args: any[]) { return await self.send(self.toId, self.notify, cmd, args); }; return func; } async send(sid: string, notify: boolean, cmd: { 'serverType': string; 'file_method': string }, args: any[]) { if (sid === '*') { this.sendT(cmd, args); return; } if (typeof args[args.length - 1] === 'function') { this.sendCb(sid, cmd, args); return; } return await this.sendAwait(sid, notify, cmd, args); } /** 发送给某一类型的服务器 */ sendT(cmd: { 'serverType': string; 'file_method': string }, args: any[]) { const servers = app.getServersByType(cmd.serverType); if (servers.length === 0) { return; } const msgBuf = Buffer.from(JSON.stringify(args)); const bufEnd = getRpcMsg({ cmd: cmd.file_method }, msgBuf, define.Rpc_Msg.rpcMsg); for (const one of servers) { if (one.id === app.serverId) { sendRpcMsgToSelf(cmd, msgBuf); } else { sendTo(one.id, null, bufEnd); } } } /** 回调形式,发送给某一服务器 */ sendCb(sid: string, cmd: { 'serverType': string; 'file_method': string }, args: any[]) { const cb: Function = args.pop(); if (sid === app.serverId) { sendRpcMsgToSelf(cmd, Buffer.from(JSON.stringify(args)), cb); return; } const rpcTimeout: I_rpcTimeout = { id: getRpcId(), cb, time: outTime, await: false }; const rpcMsg: I_rpcMsg = { cmd: cmd.file_method, id: rpcTimeout.id }; const bufEnd = getRpcMsg(rpcMsg, Buffer.from(JSON.stringify(args)), define.Rpc_Msg.rpcMsg); sendTo(sid, rpcTimeout, bufEnd); } /** await 形式,发送给某一服务器 */ async sendAwait(sid: string, notify: boolean, cmd: { 'serverType': string; 'file_method': string }, args: any[]) { if (sid === app.serverId) { return await sendRpcMsgToSelfAwait(cmd, Buffer.from(JSON.stringify(args)), notify); } const rpcMsg: I_rpcMsg = { cmd: cmd.file_method }; let promise: Promise = undefined as any; let rpcTimeout: I_rpcTimeout = null as any; if (!notify) { let cb: Function = null as any; promise = new Promise((resolve) => { cb = resolve; }); rpcTimeout = { id: getRpcId(), cb, time: outTime, await: true }; rpcMsg.id = rpcTimeout.id; } const bufEnd = getRpcMsg(rpcMsg, Buffer.from(JSON.stringify(args)), define.Rpc_Msg.rpcMsgAwait); sendTo(sid, rpcTimeout, bufEnd); return await promise; } } function sendTo(sid: string, rpcTimeout: I_rpcTimeout | null, buf: Buffer) { const socket = app.rpcPool.getSocket(sid); if (socket) { if (rpcTimeout) { rpcRequest.set(rpcTimeout.id, rpcTimeout); } socket.send(buf); return; } let queue = msgQueueDic.get(sid); if (!queue) { queue = []; msgQueueDic.set(sid, queue); } queue.push({ rpcTimeout, buf, time: outTime - 3000 }); if (queue.length > msgCacheCount) { for (const one of queue.splice(0, 20)) { if (one.rpcTimeout) { timeoutCall(one.rpcTimeout); } } } } /** * Get rpcId */ function getRpcId() { const id = rpcId++; if (rpcId > 99999999) { rpcId = 1; } return id; } /** * rpc timeout detection */ function checkTimeout() { const now = Date.now(); for (const queue of msgQueueDic.values()) { let deleteCount = 0; for (const one of queue) { if (one.time < now) { deleteCount++; } else { break; } } if (deleteCount > 0) { for (const one of queue.splice(0, deleteCount)) { if (one.rpcTimeout) { timeoutCall(one.rpcTimeout); } } } } for (const [id, v] of rpcRequest.entries()) { if (v.time < now) { const one = v; rpcRequest.delete(id); timeoutCall(one); } } } function timeoutCall(one: I_rpcTimeout) { process.nextTick(() => { one.await ? one.cb(undefined) : one.cb(true); }); } /** * Send rpc message * * [4] [1] [1] [...] [...] * allMsgLen msgType rpcBufLen rpcBuf msgBuf */ function getRpcMsg(rpcMsg: I_rpcMsg, msgBuf: Buffer, t: define.Rpc_Msg) { const rpcBuf = Buffer.from(JSON.stringify(rpcMsg)); const buffEnd = Buffer.allocUnsafe(6 + rpcBuf.length + msgBuf.length); buffEnd.writeUInt32BE(buffEnd.length - 4, 0); buffEnd.writeUInt8(t, 4); buffEnd.writeUInt8(rpcBuf.length, 5); rpcBuf.copy(buffEnd, 6); msgBuf.copy(buffEnd, 6 + rpcBuf.length); return buffEnd; } /** * Send rpc message to this server */ function sendRpcMsgToSelf(cmd: { 'serverType': string; 'file_method': string }, msgBuf: Buffer, cb?: Function) { const args = JSON.parse(msgBuf.toString()); if (cb) { const id = getRpcId(); rpcRequest.set(id, { id, cb, time: outTime, await: false }); args.push(getCallBackFuncSelf(id)); } process.nextTick(() => { const route = cmd.file_method.split('.'); const file = msgHandler.get(route[0]); file[route[1]](...args); }); } /** * Send rpc message to this server await */ async function sendRpcMsgToSelfAwait(cmd: { 'serverType': string; 'file_method': string }, msgBuf: Buffer, notify: boolean) { const args = JSON.parse(msgBuf.toString()); if (notify) { process.nextTick(() => { const route = cmd.file_method.split('.'); const file = msgHandler.get(route[0]); file[route[1]](...args); }); return; } let cb: Function = null as any; const promise = new Promise((resolve) => { cb = resolve; }); const id = getRpcId(); rpcRequest.set(id, { id, cb, time: outTime, await: true }); process.nextTick(async () => { const route = cmd.file_method.split('.'); const file = msgHandler.get(route[0]); let data = await file[route[1]](...args); const timeout = rpcRequest.get(id); if (!timeout) { return; } rpcRequest.delete(id); if (data === undefined) { data = null; } timeout.cb(JSON.parse(JSON.stringify(data))); }); return await promise; } /** * rpc callback */ function getCallBackFunc(sid: string, id: number) { return function (...args: any[]) { const bufEnd = getRpcMsg({ id }, Buffer.from(JSON.stringify(args)), define.Rpc_Msg.rpcMsg); sendTo(sid, null, bufEnd); }; } /** * rpc callback self server */ function getCallBackFuncSelf(id: number) { return function (...args: any[]) { args = JSON.parse(JSON.stringify(args)); process.nextTick(() => { const timeout = rpcRequest.get(id); if (timeout) { rpcRequest.delete(id); timeout.cb(...args); } }); }; }