import { isFiniteNumber, isNaNNumber } from '../../utils/lang'; import { KeyBuilderSS } from '../KeyBuilderSS'; import { ILogger } from '../../logger/types'; import { LOG_PREFIX } from './constants'; import { ISplit, ISplitFiltersValidation } from '../../dtos/types'; import { AbstractSplitsCacheAsync } from '../AbstractSplitsCacheAsync'; import { returnDifference } from '../../utils/lang/sets'; import type { RedisAdapter } from './RedisAdapter'; /** * Discard errors for an answer of multiple operations. */ function processPipelineAnswer(results: Array<[Error | null, unknown]> | null): string[] { return results ? results.reduce((accum: string[], errValuePair: [Error | null, unknown]) => { if (errValuePair[0] === null) accum.push(errValuePair[1] as string); return accum; }, []) : []; } /** * ISplitsCacheAsync implementation that stores split definitions in Redis. * Supported by Node.js */ export class SplitsCacheInRedis extends AbstractSplitsCacheAsync { private readonly log: ILogger; private readonly redis: RedisAdapter; private readonly keys: KeyBuilderSS; private redisError?: Error; private readonly flagSetsFilter: string[]; constructor(log: ILogger, keys: KeyBuilderSS, redis: RedisAdapter, splitFiltersValidation?: ISplitFiltersValidation) { super(); this.log = log; this.redis = redis; this.keys = keys; this.flagSetsFilter = splitFiltersValidation ? splitFiltersValidation.groupedFilters.bySet : []; // There is no need to listen for redis 'error' event, because in that case ioredis calls will be rejected and handled by redis storage adapters. // But it is done just to avoid getting the ioredis message `Unhandled error event`. this.redis.on('error', (e: Error) => { this.redisError = e; }); this.redis.on('connect', () => { this.redisError = undefined; }); } private _decrementCounts(split: ISplit) { const ttKey = this.keys.buildTrafficTypeKey(split.trafficTypeName); return this.redis.decr(ttKey).then((count: number) => { if (count === 0) return this.redis.del(ttKey); }); } private _incrementCounts(split: ISplit) { const ttKey = this.keys.buildTrafficTypeKey(split.trafficTypeName); return this.redis.incr(ttKey); } private _updateFlagSets(featureFlagName: string, flagSetsOfRemovedFlag?: string[], flagSetsOfAddedFlag?: string[]) { const removeFromFlagSets = returnDifference(flagSetsOfRemovedFlag, flagSetsOfAddedFlag); let addToFlagSets = returnDifference(flagSetsOfAddedFlag, flagSetsOfRemovedFlag); if (this.flagSetsFilter.length > 0) { addToFlagSets = addToFlagSets.filter(flagSet => { return this.flagSetsFilter.some(filterFlagSet => filterFlagSet === flagSet); }); } const items = [featureFlagName]; return Promise.all([ ...removeFromFlagSets.map(flagSetName => this.redis.srem(this.keys.buildFlagSetKey(flagSetName), items)), ...addToFlagSets.map(flagSetName => this.redis.sadd(this.keys.buildFlagSetKey(flagSetName), items)) ]); } /** * Add a given split. * The returned promise is resolved when the operation success * or rejected if it fails (e.g., redis operation fails) */ addSplit(split: ISplit): Promise { const name = split.name; const splitKey = this.keys.buildSplitKey(name); return this.redis.get(splitKey).then((splitFromStorage: string | null) => { // handling parsing error let parsedPreviousSplit: ISplit, stringifiedNewSplit; try { parsedPreviousSplit = splitFromStorage ? JSON.parse(splitFromStorage) : undefined; stringifiedNewSplit = JSON.stringify(split); } catch (e) { throw new Error('Error parsing feature flag definition: ' + e); } return this.redis.set(splitKey, stringifiedNewSplit).then(() => { // avoid unnecessary increment/decrement operations if (parsedPreviousSplit && parsedPreviousSplit.trafficTypeName === split.trafficTypeName) return; // update traffic type counts return this._incrementCounts(split).then(() => { if (parsedPreviousSplit) return this._decrementCounts(parsedPreviousSplit); }); }).then(() => this._updateFlagSets(name, parsedPreviousSplit && parsedPreviousSplit.sets, split.sets)); }).then(() => true); } /** * Remove a given split. * The returned promise is resolved when the operation success, with true or false indicating if the split existed (and was removed) or not. * or rejected if it fails (e.g., redis operation fails). */ removeSplit(name: string) { return this.getSplit(name).then((split) => { if (split) { return this._decrementCounts(split).then(() => this._updateFlagSets(name, split.sets)); } }).then(() => { return this.redis.del(this.keys.buildSplitKey(name)).then((status: number) => status === 1); }); } /** * Get split definition or null if it's not defined. * Returned promise is rejected if redis operation fails. */ getSplit(name: string): Promise { if (this.redisError) { this.log.error(LOG_PREFIX + this.redisError); return Promise.reject(this.redisError); } return this.redis.get(this.keys.buildSplitKey(name)) .then((maybeSplit: string | null) => maybeSplit && JSON.parse(maybeSplit)); } /** * Set till number. * The returned promise is resolved when the operation success, * or rejected if it fails. */ setChangeNumber(changeNumber: number): Promise { return this.redis.set(this.keys.buildSplitsTillKey(), changeNumber + '').then( (status: string | null) => status === 'OK' ); } /** * Get till number or -1 if it's not defined. * The returned promise is resolved with the changeNumber or -1 if it doesn't exist or a redis operation fails. * The promise will never be rejected. */ getChangeNumber(): Promise { return this.redis.get(this.keys.buildSplitsTillKey()).then((value: string | null) => { const i = parseInt(value as string, 10); return isNaNNumber(i) ? -1 : i; }).catch((e: unknown) => { this.log.error(LOG_PREFIX + 'Could not retrieve changeNumber from storage. Error: ' + e); return -1; }); } /** * Get list of all split definitions. * The returned promise is resolved with the list of split definitions, * or rejected if redis operation fails. */ // @TODO we need to benchmark which is the maximun number of commands we could pipeline without kill redis performance. getAll(): Promise { return this.redis.keys(this.keys.searchPatternForSplitKeys()) .then((listOfKeys: string[]) => this.redis.pipeline(listOfKeys.map((k: string) => ['get', k])).exec()) .then(processPipelineAnswer) .then((splitDefinitions: string[]) => splitDefinitions.map((splitDefinition: string) => { return JSON.parse(splitDefinition); })); } /** * Get list of split names. * The returned promise is resolved with the list of split names, * or rejected if redis operation fails. */ getSplitNames(): Promise { return this.redis.keys(this.keys.searchPatternForSplitKeys()).then( (listOfKeys: string[]) => listOfKeys.map(this.keys.extractKey) ); } /** * Get list of feature flag names related to a given list of flag set names. * The returned promise is resolved with the list of feature flag names per flag set, * or rejected if the pipelined redis operation fails (e.g., timeout). */ getNamesByFlagSets(flagSets: string[]): Promise[]> { return this.redis.pipeline(flagSets.map(flagSet => ['smembers', this.keys.buildFlagSetKey(flagSet)])).exec() .then((results: [Error | null, unknown][] | null) => results ? results.map(([e, value]: [Error | null, unknown], index: number) => { if (e === null) return value as string; this.log.error(LOG_PREFIX + `Could not read result from get members of flag set ${flagSets[index]} due to an error: ${e}`); }) : []) .then((namesByFlagSets: (string | undefined)[]) => namesByFlagSets.map((namesByFlagSet: string | undefined) => new Set(namesByFlagSet))); } /** * Check traffic type existence. * The returned promise is resolved with a boolean indicating whether the TT exist or not. * In case of redis operation failure, the promise resolves with a true value, assuming that the TT might exist. * It will never be rejected. */ trafficTypeExists(trafficType: string): Promise { // If there is a number there should be > 0, otherwise the TT is considered as not existent. return this.redis.get(this.keys.buildTrafficTypeKey(trafficType)) .then((ttCount: string | null | number) => { if (ttCount === null) return false; // if entry doesn't exist, means that TT doesn't exist ttCount = parseInt(ttCount as string, 10); if (!isFiniteNumber(ttCount) || ttCount < 0) { this.log.info(LOG_PREFIX + `Could not validate traffic type existence of ${trafficType} due to data corruption of some sorts.`); return false; } return ttCount > 0; }) .catch((e: unknown) => { this.log.error(LOG_PREFIX + `Could not validate traffic type existence of ${trafficType} due to an error: ${e}.`); // If there is an error, bypass the validation so the event can get tracked. return true; }); } // @TODO remove or implement. It is not being used. clear() { return Promise.resolve(); } /** * Fetches multiple splits definitions. * Returned promise is rejected if redis operation fails. */ getSplits(names: string[]): Promise> { if (this.redisError) { this.log.error(LOG_PREFIX + this.redisError); return Promise.reject(this.redisError); } const keys = names.map(name => this.keys.buildSplitKey(name)); return this.redis.mget(...keys) .then((splitDefinitions: (string | null)[]) => { const splits: Record = {}; names.forEach((name, idx) => { const split = splitDefinitions[idx]; splits[name] = split && JSON.parse(split); }); return Promise.resolve(splits); }) .catch((e: unknown) => { this.log.error(LOG_PREFIX + `Could not grab feature flags due to an error: ${e}.`); return Promise.reject(e); }); } }