// npm import * as redisClient from 'redis'; import * as _ from 'lodash'; import bluebird from 'bluebird'; // app import { ISegment } from '../playlist-builder'; import { CacheManager, IRedisCacheManagerOptions, CacheSegmentFilter, CheckCacheError, ClearCacheError, RemoveCacheError, SaveCacheError, SetMaxStorageError, WasBilledError, ConfigurationError, GetStorageError, GetMaxStorageError, CacheManagerEvent, ICacheManagerCheckSegmentInput, CheckCacheBatchError, } from '.'; import { log } from '../../config'; import { DeepPartial } from '../../utils/types'; import { sanitizeJson } from '../../utils/sanitize-json'; export class RedisCacheManager extends CacheManager { private static readonly DEFAULT_OPTIONS: Partial = { keys: { prefix: 'mv-cache', organizationCurrentStorage: 'storageSize', organizationMaxStorage: 'maxStorageSize', organizationSet: 'organizations', lruSet: 'LRU', }, }; private static readonly LUA_GET_ELEMENT: string = ` local function getElement(elemId) local flat_map = redis.call('HGETALL', elemId) local result = {} for i = 1, #flat_map, 2 do result[flat_map[i]] = flat_map[i + 1] end return result; end`; private static readonly LUA_REMOVE_ELEMENT: string = ` local function removeElement (elemId, storageKey, LRUKey) local size = redis.call('hget', elemId, 'size') redis.call('zrem', LRUKey, elemId) redis.call('del', elemId) local newStorageSize = redis.call('decrby', storageKey, size) return newStorageSize end`; public static deserialize(segmentObj: Record): ISegment { // Convert properties to their respective primitive datatype. They all come as strings. return Object.fromEntries( Object.entries(segmentObj).map(([key, value]) => { let v: string | number | boolean = value; try { // This converts strings to number or boolean. When value is string SyntaxError is thrown by the JSON parser. v = JSON.parse(value) as number | boolean; } catch (e) { if (!(e instanceof SyntaxError)) { throw e; } } return [key, v]; }), ) as unknown as ISegment; } private static assertOptions(options: DeepPartial): void | never { if (!options) { throw new ConfigurationError('A valid options object must be provided'); } if (_.isEmpty(options.client)) { throw new ConfigurationError('A valid Redis client must be provided.'); } } public constructor(options: DeepPartial) { super(_.defaultsDeep(options, RedisCacheManager.DEFAULT_OPTIONS) as IRedisCacheManagerOptions); RedisCacheManager.assertOptions(options); } public close(): void { this.options.client.quit(); } public async checkCache(orgId: string, fileId: string, segmentId: string): Promise { log.debug(`${this.constructor.name}: checkCache`, { orgId, fileId, segmentId }); CacheManager.assertCheckCacheArguments(orgId, fileId, segmentId); try { const cachedResult = await this.options.client.hgetallAsync(this.getSegmentKey(segmentId, orgId)); if (cachedResult) { await this.options.client.zaddAsync(this.getLRUSetKey(orgId), Date.now(), this.getSegmentKey(segmentId, orgId)); return RedisCacheManager.deserialize(cachedResult); } } catch (error) { throw new CheckCacheError((error as Error).message); } return null; } public async checkCacheBatch(segments: ICacheManagerCheckSegmentInput[]): Promise { log.info(`${this.constructor.name}: checkCacheBatch`); CacheManager.assertCheckCacheBatchArguments(segments); const fetchMultipleSegments = ` local hgetall = function (key) local raw_data = redis.call('hgetall', key) local data = {} for idx = 1, #raw_data, 2 do data[raw_data[idx]] = raw_data[idx + 1] end return data; end local keyPrefix = ARGV[1] local keyLru = ARGV[2] local score = ARGV[3] local res = {} for i, key in ipairs(KEYS) do if redis.call('exists', key) == 1 then res[#res+1] = hgetall(key) end end if next(res) == nil then return "[]" end return cjson.encode(res) `; try { const response = await this.options.client.evalAsync( fetchMultipleSegments, _.size(segments), ..._.map( segments, ({ orgId, segmentId }) => this.getSegmentKey(segmentId, orgId), ), ); return JSON.parse(response) as ISegment[]; } catch (error) { throw new CheckCacheBatchError((error as Error).message); } } public async cleanupCache(): Promise { log.debug(`${this.constructor.name}: cleanupCache`); const popLRUCache = ` ${RedisCacheManager.LUA_GET_ELEMENT} ${RedisCacheManager.LUA_REMOVE_ELEMENT} local LRUKey = KEYS[1] local storageKey = KEYS[2] local orgPrefix = ARGV[1] local elemId = redis.call('zrange', LRUKey, 0, 0)[1] local segment = getElement(elemId) local newStorageSize = redis.call('get', storageKey) if redis.call('exists', elemId) == 1 then newStorageSize = removeElement(elemId, storageKey, LRUKey) else -- if it doesn't exist we still need to remove it from the LRU tree -- otherwise we will read it all the time redis.call('zremrangebyrank', LRUKey, 0, 0) end return { newStorageSize, elemId, cjson.encode(segment) } `; const orgs = this.options.client ? await this.options.client.smembersAsync(this.getOrganizationSetKey()) : []; await bluebird.map(orgs, async (org) => { const maxStorage = Number.parseInt(await this.options.client.getAsync(this.getOrganizationMaxStorageKey(org)), 10); let currentStorage = Number.parseInt(await this.options.client.getAsync(this.getOrganizationCurrentStorageKey(org)), 10); while (currentStorage > maxStorage) { try { // Not this will never work in redis cluster because the set can be in another location // compared to the redis key // eslint-disable-next-line no-await-in-loop const [newStorageSize, elemId, segmentString] = await this.options.client.evalAsync( popLRUCache, 2, this.getLRUSetKey(org), this.getOrganizationCurrentStorageKey(org), ); currentStorage = Number.parseInt(newStorageSize, 10); const segment = RedisCacheManager.deserialize(JSON.parse(segmentString)); log.debug(`Removed cache key ${elemId} new storage size ${currentStorage}`); this.emit(CacheManagerEvent.REMOVE_CACHE, segment, true); } catch (err) { if (err instanceof redisClient.ReplyError) { log.error('Possible mediaview cache inconsistency'); } log.error(err); break; } } }); } public async clearCache( orgId?: string, files?: string[], chunkSize?: number, deletionParallelism?: number, filter?: CacheSegmentFilter, dryRun?: boolean, ): Promise { log.debug(`${this.constructor.name}: clearCache`, { orgId, files, chunkSize, deletionParallelism, filter, }); try { const deletedFiles: string[] = []; const skippedFiles: string[] = []; const prefix = orgId ? `${this.options.keys.prefix}:${orgId}:*` : `${this.options.keys.prefix}:*`; const deleteSegment = async (key: string, segment: ISegment, currentOrgSlug: string) => { log.debug(`Deleting segment ${key}, duration: ${segment.duration}, type: ${segment.type}, fileId: ${segment.fileId as string}, ${segment.url as string}`); if (!dryRun) { try { this.emit(CacheManagerEvent.REMOVE_CACHE, segment, true); await this.options.client.multi() .del(key) .decrby(`mv-cache:${currentOrgSlug}:storageSize`, segment.size as number) .zrem(`mv-cache:${currentOrgSlug}:LRU`, key) .execAsync(); deletedFiles.push(segment.fileId as string); } catch (err) { log.error(`could not delete segment: ${segment.id as string}`, err); skippedFiles.push(segment.fileId as string); } } }; let cursor = '0'; /* eslint-disable no-await-in-loop */ do { const response = await this.options.client.scanAsync(cursor, 'MATCH', prefix, 'COUNT', chunkSize?.toString() || '10000'); cursor = response[0]; const keys = response[1]; await bluebird.map(keys, async (key: string) => { const id = key.split(':')[2]; const currentOrgSlug = key.split(':')[1]; if (id === 'maxStorageSize' || id === 'LRU' || id === 'storageSize' || key.split(':').length !== 3) { return; } const segment = RedisCacheManager.deserialize(await this.options.client.hgetallAsync(key)); if (!files || files.length === 0 || files.includes(segment.url as string)) { if (filter) { if (filter(segment)) { await deleteSegment(key, segment, currentOrgSlug); } } else { await deleteSegment(key, segment, currentOrgSlug); } } }, { concurrency: deletionParallelism || 10 }); } while (cursor !== '0'); /* eslint-enable no-await-in-loop */ log.debug(`Deleted files: ${_.uniq(deletedFiles).join(', ')}`); log.debug(`Skipped files: ${_.uniq(skippedFiles).join(', ')}`); } catch (error) { throw new ClearCacheError((error as Error).message); } } public async getMaxStorage(orgId: string): Promise { log.debug(`${this.constructor.name}: getMaxStorage`, { orgId }); CacheManager.assertGetMaxStorageArguments(orgId); try { const value = Number.parseInt(await this.options.client.getAsync(this.getOrganizationMaxStorageKey(orgId)), 10); return !_.isNaN(value) ? value : 0; } catch (error) { throw new GetMaxStorageError((error as Error).message); } } public async getStorage(orgId: string): Promise { log.debug(`${this.constructor.name}: getStorage`, { orgId }); CacheManager.assertGetStorageArguments(orgId); if (_.isEmpty(orgId)) { throw new GetStorageError('Invalid arguments provided, please provide a valid organization id.'); } try { const value = Number.parseInt(await this.options.client.getAsync(this.getOrganizationCurrentStorageKey(orgId)), 10); return !_.isNaN(value) ? value : 0; } catch (error) { throw new GetStorageError((error as Error).message); } } public async removeCache(orgId: string, fileId: string, segmentId: string): Promise { log.debug(`${this.constructor.name}: removeCache`, { orgId, fileId, segmentId }); CacheManager.assertRemoveCacheArguments(orgId, fileId, segmentId); try { // const segment = await this.checkCache(orgId, segmentId); const segmentString = await this.options.client.evalAsync(` ${RedisCacheManager.LUA_GET_ELEMENT} ${RedisCacheManager.LUA_REMOVE_ELEMENT} local key = KEYS[1] local storageKey = KEYS[2] local LRUKey = KEYS[3] if redis.call('exists', key) == 1 then local segment = getElement(key) removeElement(key, storageKey, LRUKey) return cjson.encode(segment) end `, 3, this.getSegmentKey(segmentId, orgId), this.getOrganizationCurrentStorageKey(orgId), this.getLRUSetKey(orgId)); // console.info('----segment remove', segment); if (segmentString) { this.emit(CacheManagerEvent.REMOVE_CACHE, RedisCacheManager.deserialize(JSON.parse(segmentString)), false); } } catch (error) { throw new RemoveCacheError((error as Error).message); } } public async saveCache(orgId: string, segmentId: string, segment: ISegment): Promise { log.debug(`${this.constructor.name}: saveCache`, sanitizeJson({ orgId, segmentId, segment })); CacheManager.assertSaveCacheArguments(orgId, segmentId, segment); try { const size = segment.size as number; const timestamp = Date.now(); const flattenedArgs = _.flatten(_.toPairs(segment)); const saveScript = ` if redis.call('exists', KEYS[1]) == 1 then local size = redis.call('hget', KEYS[1], 'size') redis.call('decrby', KEYS[3], size) end redis.call('hmset', KEYS[1], unpack(ARGV, 4)) redis.call('zadd', KEYS[2], ARGV[2], ARGV[1]) redis.call('incrby', KEYS[3], ARGV[3]) `; await this.options.client.evalAsync( saveScript, 3, this.getSegmentKey(segmentId, orgId), this.getLRUSetKey(orgId), this.getOrganizationCurrentStorageKey(orgId), this.getSegmentKey(segmentId, orgId), timestamp, size, ...flattenedArgs, ); this.emit(CacheManagerEvent.SAVE_CACHE, segment); } catch (error) { throw new SaveCacheError((error as Error).message); } } public async setMaxStorage(orgId: string, maxStorage: number): Promise { log.debug(`${this.constructor.name}: setMaxStorage`, { orgId, maxStorage }); CacheManager.assertSetMaxStorageArguments(orgId, maxStorage); try { await this.options.client.evalAsync(` redis.call('set', KEYS[1], ARGV[1]); redis.call('sadd', KEYS[2], ARGV[2]); `, 2, this.getOrganizationMaxStorageKey(orgId), this.getOrganizationSetKey(), maxStorage, orgId); } catch (error) { throw new SetMaxStorageError((error as Error).message); } } public async wasBilled(orgId: string, fileId: string, segmentId: string): Promise { log.debug(`${this.constructor.name}: wasBilled`, { orgId, fileId, segmentId }); CacheManager.assertWasBilledArguments(orgId, fileId, segmentId); try { const response = await this.options.client.evalAsync(` if redis.call('exists', KEYS[1]) == 1 then return redis.call('hset', KEYS[1], 'billed', 'true') end return 0 `, 1, this.getSegmentKey(segmentId, orgId)); return Number.parseInt(response, 10) === 0; } catch (error) { throw new WasBilledError((error as Error).message); } } private getSegmentKey(segmentId: string, orgId: string): string { const { prefix } = this.options.keys; return `${prefix}:${orgId}:${segmentId}`; } private getOrganizationCurrentStorageKey(orgId: string): string { const { prefix, organizationCurrentStorage } = this.options.keys; return `${prefix}:${orgId}:${organizationCurrentStorage}`; } private getOrganizationMaxStorageKey(orgId: string): string { const { prefix, organizationMaxStorage } = this.options.keys; return `${prefix}:${orgId}:${organizationMaxStorage}`; } private getOrganizationSetKey(): string { const { prefix, organizationSet } = this.options.keys; return `${prefix}:${organizationSet}`; } private getLRUSetKey(orgId: string): string { const { prefix, lruSet } = this.options.keys; return `${prefix}:${orgId}:${lruSet}`; } }