import * as amqp from 'amqplib'; import { CalculatedData, StatusExporter } from 'island-status-exporter'; import * as _ from 'lodash'; import * as uuid from 'uuid'; import { Environments } from '../utils/environments'; import { logger } from '../utils/logger'; export const STATUS_EXPORT: boolean = Environments.isStatusExport(); export const STATUS_EXPORT_TIME_MS: number = Environments.ISLAND_STATUS_EXPORT_TIME_MS; // 10 * 1000 const STATUS_FILE_NAME: string = Environments.getStatusFileName()!; const STATUS_EXPORT_TYPE: string = Environments.getStatusExportType(); const HOST_NAME: string = Environments.getHostName()!; const SERVICE_NAME: string = Environments.getServiceName()!; const CIRCUIT_BREAK_TIME_MS: number = Environments.ISLAND_CIRCUIT_BREAK_TIME_MS; const MIN_REQUEST_THRESHOLD: number = Environments.ISLAND_CIRCUIT_BREAK_REQUEST_THRESHOLD; const CIRCUIT_BREAK_THRESHOLD: number = Environments.ISLAND_CIRCUIT_BREAK_FAILRATE_THRESHOLD; const processUptime: Date = new Date(); // const SAVE_FILE_NAME: string = ''; enum EXPORT_TYPE { FILE = 'FILE', UDP = 'UDP' } const SLOW_WARNING_TIME_MS: number = 1000 * 5; const SLOW_CRITICAL_TIME_MS: number = 1000 * 10; if (STATUS_EXPORT) { StatusExporter.initialize({ name: STATUS_FILE_NAME, hostname: HOST_NAME, servicename: SERVICE_NAME, udpOption: STATUS_EXPORT_TYPE === EXPORT_TYPE.UDP && Environments.getUdpOption() || undefined }); } export interface Message { content: Buffer; properties: amqp.Options.Publish; } interface RequestStatistics { requestCount: number; executedCount: number; totalReceivedTime: number; totalExecutionTime: number; totalErrorTime: number; lastErrorCounts: { reqCount: number, errCount: number }[]; minimumExecutionTime: number; maximumExecutionTime: number; slowWarningCount: number; slowCriticalCount: number; } export interface CollectOptions { requestId?: string; msg?: Message; shard?: number; err?: any; ignoreTimestamp?: boolean; } class RequestStatisticsHelper { public static create(): RequestStatistics { return { requestCount: 0, executedCount: 0, totalReceivedTime: 0, totalExecutionTime: 0, totalErrorTime: 0, lastErrorCounts: new Array(Math.max(1, Math.ceil(CIRCUIT_BREAK_TIME_MS / STATUS_EXPORT_TIME_MS))), minimumExecutionTime: 0, maximumExecutionTime: 0, slowWarningCount: 0, slowCriticalCount: 0 } as RequestStatistics; } public static clearAndShift(stat: RequestStatistics) { stat.lastErrorCounts.shift(); stat.lastErrorCounts.push({ reqCount: stat.requestCount, errCount: stat.requestCount - stat.executedCount }); stat.requestCount = 0; stat.executedCount = 0; stat.totalReceivedTime = 0; stat.totalExecutionTime = 0; stat.totalErrorTime = 0; } public static needCircuitBreak(stat: RequestStatistics): boolean { let reqCount = 0; let errCount = 0; _.forEach(stat.lastErrorCounts, v => { if (!v) return; reqCount += v.reqCount; errCount += v.errCount; }); if (reqCount < MIN_REQUEST_THRESHOLD) return false; const failedRate = errCount / reqCount; return failedRate >= CIRCUIT_BREAK_THRESHOLD; } } function setDecimalPoint(int: number): number { return Number(int.toFixed(2)); } export type CollectedStatusExporter = (collected: CalculatedData) => Promise; export class StatusCollector { private collectedData: { [type: string]: RequestStatistics } = {}; private onGoingMap: Map = new Map(); private startedAt: number = +new Date(); private exporters: {[key: string]: CollectedStatusExporter} = { FILE: o => StatusExporter.saveStatusJsonFile(o), UDP: o => StatusExporter.sendStatusJsonByUdp(SERVICE_NAME, o) }; public async saveStatus() { const calculated: CalculatedData = this.calculateMeasurementsByType(); this.clearAndShiftData(); const exporter = this.exporters[STATUS_EXPORT_TYPE]; if (!exporter) return; // console.log(`[island.js][data] ${JSON.stringify(calculated, null, 2)}`); return exporter(calculated); } public async registerExporter(type: string, exporter: CollectedStatusExporter) { this.exporters[type] = exporter; } // Note: // island.js 통해 받지 않는 요청('gateway의 restify', 'push의 socket') 또는 보낸 곳에서 시간값을 주지 않은 요청의 경우는 time 필드가 없다. public collectRequestAndReceivedTime(type: string, name: string, options?: CollectOptions): string { const requestId = options && options.requestId ? options.requestId : uuid.v1(); const req = { type, name, reqTime: +new Date(), recvTime: 0 }; if (options && !options.ignoreTimestamp && options.msg && options.msg.properties && options.msg.properties.timestamp) { const elapsedFromPublished = req.reqTime - options.msg.properties.timestamp; if (elapsedFromPublished > 1000) { logger.warning('SLOW recv', name, elapsedFromPublished, options.shard); } req.recvTime = req.reqTime - options.msg.properties.timestamp; } this.onGoingMap.set(requestId, req); return requestId; } /** * Note: * 1. Error 처리된 Request는 측정 데이터에서 배제 된다 * 2. ErrorCount === RequestCount - ExecutedCount */ // tslint:disable-next-line cyclomatic-complexity public collectExecutedCountAndExecutedTime(type: string, name: string, options: CollectOptions) { if (!options.requestId) { const stat: RequestStatistics = this.getStat(type, name); ++stat.requestCount; if (!options.err) ++stat.executedCount; return; } const reqCache = this.onGoingMap.get(options.requestId); if (!reqCache) return; this.onGoingMap.delete(options.requestId); const stat: RequestStatistics = this.getStat(type, name); ++stat.requestCount; if (!options.err) ++stat.executedCount; const resTime = +new Date(); const reqTime = reqCache.reqTime || resTime; if (reqCache.recvTime) { stat.totalReceivedTime += reqCache.recvTime; } const executedTime = resTime - reqTime; if (options.err) { stat.totalErrorTime += executedTime; if (RequestStatisticsHelper.needCircuitBreak(stat)) { logger.warning(`Too Many Failure on ${type} with fail-rate ${MIN_REQUEST_THRESHOLD}`); } } else { stat.totalExecutionTime += executedTime; // min && max stat.maximumExecutionTime = Math.max(stat.maximumExecutionTime, executedTime); stat.minimumExecutionTime = stat.minimumExecutionTime === 0 ? executedTime : Math.min(stat.minimumExecutionTime, executedTime); // waring && critical if (executedTime >= SLOW_CRITICAL_TIME_MS) { stat.slowCriticalCount++; } else if (executedTime >= SLOW_WARNING_TIME_MS) { stat.slowWarningCount++; } } } public calculateMeasurementsByType(): CalculatedData { const measuringTime = +new Date() - this.startedAt; const cd = _.clone(this.collectedData); const result: CalculatedData = { processUptime, measurements: [], totalRequestCount: 0, totalExecutedCount: 0, minimumExecutionTime: 0, maximumExecutionTime: 0, avgExecutionTime: 0 }; const parsedData = {}; _.forEach(cd, (stat: RequestStatistics, type: string) => { parsedData[type] = parsedData[type] || { type, requestPerSeconds: 0, executedPerSeconds: 0, avgReceiveMessageTimeByMQ: 0, avgExecutionTime: 0 }; parsedData[type].requestPerSeconds += stat.requestCount; parsedData[type].executedPerSeconds += stat.executedCount; parsedData[type].avgReceiveMessageTimeByMQ += stat.totalReceivedTime; parsedData[type].avgExecutionTime += stat.totalExecutionTime; parsedData[type].minimumExecutionTime = stat.minimumExecutionTime; parsedData[type].maximumExecutionTime = stat.maximumExecutionTime; parsedData[type].requestCount = stat.requestCount; parsedData[type].executedCount = stat.executedCount; parsedData[type].slowWarningCount = stat.slowWarningCount; parsedData[type].slowCriticalCount = stat.slowCriticalCount; // A-island total result.totalRequestCount += stat.requestCount; result.totalExecutedCount += stat.executedCount; }); result.measurements = []; let totalTime = 0; let typeCount = 0; _.forEach(parsedData, (stat: any) => { stat.avgReceiveMessageTimeByMQ = setDecimalPoint(stat.avgReceiveMessageTimeByMQ / stat.requestPerSeconds); stat.avgExecutionTime = setDecimalPoint(stat.avgExecutionTime / stat.executedPerSeconds) || 0; stat.requestPerSeconds = setDecimalPoint(stat.requestPerSeconds * 1000 / measuringTime); stat.executedPerSeconds = setDecimalPoint(stat.executedPerSeconds * 1000 / measuringTime); result.measurements = result.measurements || []; result.measurements.push(stat); // A-island의 event, rpc, endpoint 상관 없는 실행 min/max 시간값 result.minimumExecutionTime = result.minimumExecutionTime === 0 ? stat.minimumExecutionTime : Math.min(result.minimumExecutionTime, stat.minimumExecutionTime); result.maximumExecutionTime = Math.max(result.maximumExecutionTime, stat.maximumExecutionTime); totalTime += stat.avgExecutionTime; typeCount++; }); result.avgExecutionTime = typeCount !== 0 && setDecimalPoint(totalTime / typeCount) || 0; return result; } public async sigInfo(type: string) { logger.info(`${type} Service onGoingRequestCount : ${this.getOnGoingRequestCount(type)}`); _.forEach(this.collectedData, (v: RequestStatistics, k: string) => { if (!k.startsWith(type)) return; logger.info(`${type} Service ${k} : ${v}`); }); } public getOnGoingRequestCount(type?: string): number { if (!type) return this.onGoingMap.size; let onGoingRequestCount = 0; this.onGoingMap.forEach((value: any) => { if (value.type && value.type.startsWith(type)) { onGoingRequestCount += 1; } }); return onGoingRequestCount; } public hasOngoingRequests(type?: string): boolean { return this.getOnGoingRequestCount(type) > 0; } public needCircuitBreak(type: string, name: string): boolean { const stat = this.getStat(type, name); return RequestStatisticsHelper.needCircuitBreak(stat); } private getStat(type: string, name: string): RequestStatistics { const typeName = [type, name].join('@'); this.collectedData[typeName] = this.collectedData[typeName] || RequestStatisticsHelper.create(); return this.collectedData[typeName]; } private clearAndShiftData() { switch (STATUS_EXPORT_TYPE) { case EXPORT_TYPE.UDP: this.collectedData = {}; break; case EXPORT_TYPE.FILE: _.forEach(this.collectedData, (stat: RequestStatistics) => { RequestStatisticsHelper.clearAndShift(stat); }); default: break; } this.startedAt = +new Date(); } } export const collector = new StatusCollector();