import { ILogger } from '../../logger/types'; import { Method, MultiConfigs, MultiMethodExceptions, MultiMethodLatencies } from '../../sync/submitters/types'; import { KeyBuilderSS } from '../KeyBuilderSS'; import { ITelemetryCacheAsync } from '../types'; import { findLatencyIndex } from '../findLatencyIndex'; import { getTelemetryConfigStats } from '../../sync/submitters/telemetrySubmitter'; import { CONSUMER_MODE, STORAGE_REDIS } from '../../utils/constants'; import { isNaNNumber, isString } from '../../utils/lang'; import { MAX_LATENCY_BUCKET_COUNT, newBuckets } from '../inMemory/TelemetryCacheInMemory'; import { parseLatencyField, parseExceptionField, parseMetadata } from '../utils'; import type { RedisAdapter } from './RedisAdapter'; export class TelemetryCacheInRedis implements ITelemetryCacheAsync { /** * Create a Telemetry cache that uses Redis as storage. * @param log - Logger instance. * @param keys - Key builder. * @param redis - Redis client. */ constructor(private readonly log: ILogger, private readonly keys: KeyBuilderSS, private readonly redis: RedisAdapter) { } recordLatency(method: Method, latencyMs: number) { const [key, field] = this.keys.buildLatencyKey(method, findLatencyIndex(latencyMs)).split('::'); return this.redis.hincrby(key, field, 1) .catch(() => { /* Handle rejections for telemetry */ }); } recordException(method: Method) { const [key, field] = this.keys.buildExceptionKey(method).split('::'); return this.redis.hincrby(key, field, 1) .catch(() => { /* Handle rejections for telemetry */ }); } recordConfig() { const [key, field] = this.keys.buildInitKey().split('::'); const value = JSON.stringify(getTelemetryConfigStats(CONSUMER_MODE, STORAGE_REDIS)); return this.redis.hset(key, field, value).catch(() => { /* Handle rejections for telemetry */ }); } /** * Pop telemetry latencies. * The returned promise rejects if redis operations fail. */ popLatencies(): Promise { return this.redis.hgetall(this.keys.latencyPrefix).then((latencies: Record) => { const result: MultiMethodLatencies = new Map(); Object.keys(latencies).forEach(field => { const parsedField = parseLatencyField(field); if (isString(parsedField)) { this.log.error(`Ignoring invalid latency field: ${field}: ${parsedField}`); return; } const count = parseInt(latencies[field]); if (isNaNNumber(count)) { this.log.error(`Ignoring latency with invalid count: ${latencies[field]}`); return; } const [metadata, method, bucket] = parsedField; if (bucket >= MAX_LATENCY_BUCKET_COUNT) { this.log.error(`Ignoring latency with invalid bucket: ${bucket}`); return; } const methodLatencies = result.get(metadata) || {}; methodLatencies[method] = methodLatencies[method] || newBuckets(); methodLatencies[method]![bucket] = count; result.set(metadata, methodLatencies); }); return this.redis.del(this.keys.latencyPrefix).then(() => result); }); } /** * Pop telemetry exceptions. * The returned promise rejects if redis operations fail. */ popExceptions(): Promise { return this.redis.hgetall(this.keys.exceptionPrefix).then((exceptions: Record) => { const result: MultiMethodExceptions = new Map(); Object.keys(exceptions).forEach(field => { const parsedField = parseExceptionField(field); if (isString(parsedField)) { this.log.error(`Ignoring invalid exception field: ${field}: ${parsedField}`); return; } const count = parseInt(exceptions[field]); if (isNaNNumber(count)) { this.log.error(`Ignoring exception with invalid count: ${exceptions[field]}`); return; } const [metadata, method] = parsedField; if (!result.has(metadata)) result.set(metadata, {}); result.get(metadata)![method] = count; }); return this.redis.del(this.keys.exceptionPrefix).then(() => result); }); } /** * Pop telemetry configs. * The returned promise rejects if redis operations fail. */ popConfigs(): Promise { return this.redis.hgetall(this.keys.initPrefix).then((configs: Record) => { const result: MultiConfigs = new Map(); Object.keys(configs).forEach(field => { const parsedField = parseMetadata(field); if (isString(parsedField)) { this.log.error(`Ignoring invalid config field: ${field}: ${parsedField}`); return; } const [metadata] = parsedField; try { const config = JSON.parse(configs[field]); result.set(metadata, config); } catch (e) { this.log.error(`Ignoring invalid config: ${configs[field]}`); } }); return this.redis.del(this.keys.initPrefix).then(() => result); }); } }