/** * @author xiangshouding * @date 2017-07-06 14:34:17 */ import * as thrift from 'thrift'; import * as debug_ from 'debug'; import {resolve} from 'path'; import * as compose from 'koa-compose'; import {Map} from './types/lang'; import {Config} from './config'; import {Address, Protocol as ProtocolInterface} from './protocol/protocol'; import {Http as HttpClient} from './protocol/http'; import {Thrift as ThriftClient} from './protocol/thrift'; import {random as BalanceRandom} from './balance/random'; import {Logger} from '@byted/logger'; import * as util from './util'; const debug = debug_('client'); export class Client { transport: any; service: any; config: Config; connection: any; provider: Map = {}; logger: any; constructor(options: Map, idc?: string, ctx?: any) { this.config = new Config(options, idc, ctx); this.register('HTTP', HttpClient); this.register('THRIFT', ThriftClient); this.logger = new Logger({ logFile: this.config.getLogFile(), level: this.config.getLogLevel() }); return this; } /** * * @param options * @param cb */ public request(options ?: Map, cb ?: (err: any, service: any, conn?:any) => void) { if (Object.prototype.toString.call(options) != '[object Object]') { cb = <(err: any, service: any, conn?:any) => void> options; options = {}; } var handle = this.provider[this.config.getProtocol()]; if (!handle) { return cb(new Error(`protocol ${this.config.getProtocol()} not support`), null); } var __this = this; options['timeout'] = this.config.options['timeout']; options['service'] = this.service; options['transport'] = this.config.options['transport']; options = this.filterInvalidValue(options); let taskHandler = function (order, max) { let record = { cmd: '', addr: '', retry: order + '/' + max, error: '', logId: __this.config.log.logId } return function (ctx, go) { ctx.idx++; let next = function (err) { if (ctx.idx === max) { return cb(err, null); } else { return go(); } } __this.config.getHosts(function (err, hosts) { if (err) { record.cmd = 'HOST_FEATCH_FAILED'; record.error = err.message; __this.logger.fatal(__this.logToString(record)); return next(err); } let addr = __this.getAddress(hosts); if (!addr) { return cb(new Error('service hosts is emtpy'), null, null); } record.cmd = 'REQUEST_SERVICE_START'; record.addr = addr.getAddress(); __this.logger.notice(__this.logToString(record)); let client = new handle(addr, options); client.request(function (err, service, conn) { if (err) { record.cmd = 'REQUEST_SERVICE_FAILED'; record.error = err.message; __this.logger.fatal(__this.logToString(record)); return next(err); } // log record.cmd = 'REQUEST_SERVICE_SUCCESS'; record.error = ''; __this.logger.notice(__this.logToString(record)); cb(err, service, conn); }); }); }; }; let task = [], i = 0; do { i++; task.push(taskHandler(i, this.config.retry)); } while(i < this.config.retry); return compose(task)({idx:0}).catch(cb); } public loadService(genPath: string) { if (resolve(genPath)) { this.service = require(genPath); } return this; } public register(protocol: string, handle: any) { this.provider[protocol] = handle; } private filterInvalidValue(mapA) { let mapB = {}; Object.keys(mapA).filter(function (key) { if (mapA[key]) { mapB[key] = mapA[key]; } }); return mapB; } /** * chioce a host from hosts list. * * @param hosts */ private getAddress(hosts: Array
) : Address { let balance_ = this.config.balance || 'random'; switch (balance_) { case 'random': return BalanceRandom(hosts); case 'rate_with_idc': return require('./balance/rateWithIDC')( this.config.getHostsWithIDC() ); default: return new Address(); } } private logToString(obj: Map) { let log = ''; let common = { protocol: this.config.getProtocol(), service: typeof this.config.service == 'string' ? this.config.service : '-', time: Date.now() }; util.forEachObject(util.assign(common, obj), function (value, key) { log += `${key}=[${value}] `; }); return log; } }