// npm import * as _ from 'lodash'; import bluebird from 'bluebird'; import * as AWS from 'aws-sdk'; import { classToPlain, Expose, plainToClass } from 'class-transformer'; import { IsNotEmpty } from 'class-validator'; // app import { IDynamodbCacheManagerOptions, CacheManager, CacheSegmentFilter, CheckCacheError, CleanupCacheError, ClearCacheError, GetMaxStorageError, GetStorageError, RemoveCacheError, SaveCacheError, WasBilledError, SetMaxStorageError, ConfigurationError, CacheManagerEvent, ICacheManagerCheckSegmentInput, CheckCacheBatchError, } from '.'; import { ISegment } from '../playlist-builder'; import { log } from '../../config'; import { DeepPartial, Nullable } from '../../utils/types'; import { sanitizeJson } from '../../utils/sanitize-json'; enum DynamoDBExceptionCode { CONDITIONAL_CHECK_FAILED_EXCEPTION = 'ConditionalCheckFailedException', } enum DynamoDBCacheDataType { SEGMENT = 'SEGMENT', STORAGE_INFO = 'STORAGE_INFO', LOCK = 'LOCK', } interface IDynamoDBCacheDataStorageInfoData { current: number; max: number; } interface IDynamoDBCacheDataKey { pk: string; sk: string; } interface IDynamoDBCacheDataLockData { ttl: number; createdAt: number; expiresAt: number; heartbeats: number; } type CachedSegment = Exclude; abstract class DynamoDBCacheData implements IDynamoDBCacheDataKey { public static readonly COMPOSITE_KEY_SEPARATOR = '#'; @Expose() @IsNotEmpty() public readonly pk: string; @Expose() @IsNotEmpty() public readonly sk: string; @Expose() public data?: TData; protected constructor(pk: string, sk: string) { this.pk = pk; this.sk = sk; } public get key(): IDynamoDBCacheDataKey { return _.pick( this, [ 'pk', 'sk', ], ); } public get hasValidKey(): boolean { return !_.isEmpty(this.pk) && !_.isEmpty(this.sk); } public toObject(): object { return classToPlain(this, { excludeExtraneousValues: true }); } } class DynamoDBCacheDataStorageInfo extends DynamoDBCacheData { public static fromOutput(output: AWS.DynamoDB.GetItemOutput): DynamoDBCacheDataStorageInfo | null { if (output && output.Item) { return plainToClass(DynamoDBCacheDataStorageInfo, output.Item, { excludeExtraneousValues: true }); } return null; } public data?: IDynamoDBCacheDataStorageInfoData; public constructor(organization: string) { super(DynamoDBCacheDataType.STORAGE_INFO, organization); } public get organization(): string { return this.sk; } } class DynamoDBCacheDataSegment extends DynamoDBCacheData { public static fromAttributeMap( item: AWS.DynamoDB.DocumentClient.AttributeMap | AWS.DynamoDB.DocumentClient.AttributeMap[] | undefined, ): DynamoDBCacheDataSegment | DynamoDBCacheDataSegment[] | null { if (item) { return plainToClass(DynamoDBCacheDataSegment, item, { excludeExtraneousValues: true }); } return null; } public static fromOutput( output: AWS.DynamoDB.DocumentClient.GetItemOutput | AWS.DynamoDB.DocumentClient.UpdateItemOutput | AWS.DynamoDB.DocumentClient.DeleteItemOutput | AWS.DynamoDB.DocumentClient.PutItemOutput, ): DynamoDBCacheDataSegment | null { if (output) { const item = (output as AWS.DynamoDB.DocumentClient.GetItemOutput).Item || (output as AWS.DynamoDB.DocumentClient.UpdateItemOutput | AWS.DynamoDB.DocumentClient.DeleteItemOutput | AWS.DynamoDB.DocumentClient.PutItemOutput).Attributes as AWS.DynamoDB.DocumentClient.AttributeMap; return DynamoDBCacheDataSegment.fromAttributeMap(item) as DynamoDBCacheDataSegment; } return null; } @Expose() public score?: number; public constructor(organization: string, fileId: Nullable, id: string) { super( _.join( _.compact( [ DynamoDBCacheDataType.SEGMENT, organization, fileId, ], ), DynamoDBCacheData.COMPOSITE_KEY_SEPARATOR, ), id, ); } public get id(): string { return this.sk; } public get organization(): string { return _.split(this.pk, DynamoDBCacheData.COMPOSITE_KEY_SEPARATOR)[1]; } public get file(): string | undefined { const components = _.split(this.pk, DynamoDBCacheData.COMPOSITE_KEY_SEPARATOR); return _.size(components) === 3 ? components[2] : undefined; } } export class DynamoDBCacheDataLock extends DynamoDBCacheData { public static fromOutput( output: AWS.DynamoDB.DocumentClient.UpdateItemOutput | AWS.DynamoDB.DocumentClient.DeleteItemOutput | AWS.DynamoDB.DocumentClient.PutItemOutput, ): DynamoDBCacheDataLock | null { if (output && output.Attributes) { return plainToClass(DynamoDBCacheDataLock, output.Attributes, { excludeExtraneousValues: true }); } return null; } @Expose() public data: IDynamoDBCacheDataLockData; public constructor(reason: string, ttl = 60) { super(DynamoDBCacheDataType.LOCK, reason); this.data = { ttl, createdAt: Date.now(), expiresAt: Date.now() + (1000 * ttl), heartbeats: 0, }; } public get reason(): string { return this.sk; } public get isExpired(): boolean { return Date.now() > this.data.expiresAt; } public heartbeat(): void { this.data.expiresAt = Date.now() + (1000 * this.data.ttl); this.data.heartbeats += 1; } } export class DynamodbCacheManager extends CacheManager { public static readonly COUNTER_SAVE_STEPS = 100; private static readonly DEFAULT_OPTIONS: DeepPartial = { paginationLimit: 10, partitionSegmentsPerFiles: false, itemsPerGetBatch: 100, batchParallelism: 5, }; private static assertOptions(options: Partial): void { if (!options) { throw new ConfigurationError('A valid options object must be provided'); } if (_.isEmpty(options.tableName)) { throw new ConfigurationError('A valid DynamoDB table name must be provided.'); } // if (_.isEmpty(options.scoreIndexName)) { // throw new ConfigurationError('A valid DynamoDB index name must be provided for the segment score lookup.'); // } } public constructor(options: Partial) { super(_.defaultsDeep(options, DynamodbCacheManager.DEFAULT_OPTIONS)); DynamodbCacheManager.assertOptions(options); } public async setMaxStorage(orgId: string, maxStorage: number): Promise { log.debug(`${this.constructor.name}: setMaxStorage`, { orgId, maxStorage }); DynamodbCacheManager.assertSetMaxStorageArguments(orgId, maxStorage); try { await this.options.client.update({ TableName: this.options.tableName, Key: new DynamoDBCacheDataStorageInfo(orgId).key, UpdateExpression: 'SET #data.#max = :value', ConditionExpression: 'attribute_exists(#data.#max)', ExpressionAttributeNames: { '#data': 'data', '#max': 'max', }, ExpressionAttributeValues: { ':value': maxStorage }, }).promise(); } catch (err) { const error = err as AWS.AWSError; if (error.code === DynamoDBExceptionCode.CONDITIONAL_CHECK_FAILED_EXCEPTION) { const newStorageInfo = new DynamoDBCacheDataStorageInfo(orgId); newStorageInfo.data = { current: 0, max: maxStorage, }; await this.options.client.put({ TableName: this.options.tableName, Item: newStorageInfo.toObject(), }).promise(); } else { throw new SetMaxStorageError(error.message); } } } public async getMaxStorage(orgId: string): Promise { log.debug(`${this.constructor.name}: getMaxStorage`, { orgId }); DynamodbCacheManager.assertGetMaxStorageArguments(orgId); try { const cacheStorageInfo = await this.getCacheStorageInfo(orgId); return cacheStorageInfo?.data?.max ?? 0; } catch (error) { throw new GetMaxStorageError((error as Error).message); } } public async getStorage(orgId: string): Promise { log.debug(`${this.constructor.name}: getStorage`, { orgId }); DynamodbCacheManager.assertGetStorageArguments(orgId); try { const cacheStorageInfo = await this.getCacheStorageInfo(orgId); return cacheStorageInfo?.data?.current ?? 0; } catch (error) { throw new GetStorageError((error as Error).message); } } public async checkCache(orgId: string, fileId: string, segmentId: string): Promise { log.debug(`${this.constructor.name}: checkCache`, { orgId, fileId, segmentId }); DynamodbCacheManager.assertCheckCacheArguments(orgId, fileId, segmentId); try { const dataSegment = this.isGCEnabled() ? DynamoDBCacheDataSegment.fromOutput( await this.options.client.update({ TableName: this.options.tableName, Key: new DynamoDBCacheDataSegment(orgId, this.filterFileId(fileId), segmentId).key, ConditionExpression: 'attribute_exists(pk) AND attribute_exists(sk)', UpdateExpression: 'SET score = :sc', ExpressionAttributeValues: { ':sc': Date.now() }, ReturnValues: 'ALL_OLD', }).promise(), ) : DynamoDBCacheDataSegment.fromOutput( await this.options.client.get({ TableName: this.options.tableName, Key: new DynamoDBCacheDataSegment(orgId, this.filterFileId(fileId), segmentId).key, }).promise(), ); if (dataSegment) { return { ...dataSegment.data as CachedSegment, id: dataSegment.id, orgId: dataSegment.organization, ...(this.options.partitionSegmentsPerFiles ? { fileId: dataSegment.file } : {}), }; } } catch (err) { const error = err as AWS.AWSError; if (error.code !== DynamoDBExceptionCode.CONDITIONAL_CHECK_FAILED_EXCEPTION) { throw new CheckCacheError(error.message); } } return null; } public async checkCacheBatch(segments: ICacheManagerCheckSegmentInput[]): Promise { log.info(`${this.constructor.name}: checkCacheBatch`); DynamodbCacheManager.assertCheckCacheBatchArguments(segments); try { const results: Record = {}; const requestItems = _.map( segments, ({ orgId, fileId, segmentId }) => new DynamoDBCacheDataSegment(orgId, this.filterFileId(fileId), segmentId).key, ); while (!_.isEmpty(requestItems)) { const batches = _.chunk( requestItems.splice(0, this.options.itemsPerGetBatch * this.options.batchParallelism), this.options.batchParallelism, ); // eslint-disable-next-line no-await-in-loop await bluebird.map( batches, async (chunk) => { const response: AWS.DynamoDB.DocumentClient.BatchGetItemOutput = await this.options.client.batchGet({ RequestItems: { [this.options.tableName]: { Keys: chunk, }, }, }).promise(); if (response.UnprocessedKeys && response.UnprocessedKeys[this.options.tableName] && !_.isEmpty(response.UnprocessedKeys[this.options.tableName].Keys)) { requestItems.push(...response.UnprocessedKeys[this.options.tableName].Keys as IDynamoDBCacheDataKey[]); } const dataSegments = DynamoDBCacheDataSegment.fromAttributeMap( response.Responses?.[this.options.tableName], ) as DynamoDBCacheDataSegment[]; for (const dataSegment of dataSegments) { results[dataSegment.id] = { ...dataSegment.data as CachedSegment, id: dataSegment.id, orgId: dataSegment.organization, ...(this.options.partitionSegmentsPerFiles ? { fileId: dataSegment.file } : {}), }; } }, { concurrency: this.options.batchParallelism }, ); } return _.compact( _.map( segments, ({ segmentId }) => results[segmentId], ), ); } catch (error) { throw new CheckCacheBatchError((error as Error).message); } } public async cleanupCache(): Promise { if (!this.isGCEnabled()) { return; } log.debug(`${this.constructor.name}: cleanupCache`); const lock = await this.acquireLock('cleanup'); if (!lock) { log.debug(`${this.constructor.name}: cleanupCache: cannot acquire lock, another instance already running`); return; } try { const allStorageInfo = await this.getAllCacheStorageInfo(); await bluebird.map(allStorageInfo, async (storageInfo) => { if (storageInfo.data.max === 0 || storageInfo.data.current > storageInfo.data.max) { await this.reduceOrganizationStorage(storageInfo, Math.round(storageInfo.data.max * 0.9), undefined, false, lock); } }); } catch (error) { throw new CleanupCacheError((error as Error).message); } finally { await this.releaseLock(lock); } } public async clearCache( orgId?: string, files?: string[], chunkSize?: number, deletionParallelism?: number, filter?: CacheSegmentFilter, dryRun?: boolean, ): Promise { if (!this.isGCEnabled()) { return; } log.debug(`${this.constructor.name}: clearCache`, { orgId, files, chunkSize, deletionParallelism, filter, }); try { const organizations = orgId ? _.compact([await this.getCacheStorageInfo(orgId)]) : await this.getAllCacheStorageInfo(); await bluebird.map(organizations, async (storageInfo) => { await this.reduceOrganizationStorage(storageInfo, undefined, filter, dryRun); }); } catch (error) { throw new ClearCacheError((error as Error).message); } } public async removeCache(orgId: string, fileId: string, segmentId: string): Promise { log.debug(`${this.constructor.name}: removeCache`, { orgId, fileId, segmentId }); DynamodbCacheManager.assertRemoveCacheArguments(orgId, fileId, segmentId); try { const deletedSegment = DynamoDBCacheDataSegment.fromOutput( await this.options.client.delete({ TableName: this.options.tableName, Key: new DynamoDBCacheDataSegment(orgId, this.filterFileId(fileId), segmentId).key, ReturnValues: 'ALL_OLD', }).promise(), ); if (deletedSegment) { this.emit( CacheManagerEvent.REMOVE_CACHE, { ...deletedSegment.data, id: deletedSegment.id, orgId: deletedSegment.organization, ...(this.options.partitionSegmentsPerFiles ? { fileId: deletedSegment.file } : {}), }, false, ); await this.options.client.update({ TableName: this.options.tableName, Key: new DynamoDBCacheDataStorageInfo(orgId).key, UpdateExpression: 'ADD #data.#current :value', ExpressionAttributeNames: { '#data': 'data', '#current': 'current', }, ExpressionAttributeValues: { ':value': -((deletedSegment.data as CachedSegment).size as number) }, }).promise(); } } catch (error) { throw new RemoveCacheError((error as Error).message); } } public async saveCache(orgId: string, segmentId: string, segment: ISegment): Promise { log.debug(`${this.constructor.name}: saveCache`, sanitizeJson({ orgId, segmentId, segment })); DynamodbCacheManager.assertSaveCacheArguments(orgId, segmentId, segment); const cacheSegment = new DynamoDBCacheDataSegment(orgId, this.filterFileId(segment.fileId as string), segmentId); cacheSegment.score = Date.now(); cacheSegment.data = _.omit( segment, _.compact( [ 'id', 'orgId', this.options.partitionSegmentsPerFiles ? 'fileId' : null, ], ), ) as CachedSegment; try { const oldSegment = DynamoDBCacheDataSegment.fromOutput( await this.options.client.put({ TableName: this.options.tableName, Item: cacheSegment.toObject(), ReturnValues: 'ALL_OLD', }).promise(), ); this.emit(CacheManagerEvent.SAVE_CACHE, segment); const sizeDiff: number = (segment.size as number) - (oldSegment?.data?.size ?? 0); if (sizeDiff !== 0) { try { await this.options.client.update({ TableName: this.options.tableName, Key: new DynamoDBCacheDataStorageInfo(orgId).key, UpdateExpression: 'ADD #data.#current :value', ConditionExpression: 'attribute_exists(#data.#current)', ExpressionAttributeNames: { '#data': 'data', '#current': 'current', }, ExpressionAttributeValues: { ':value': sizeDiff }, }).promise(); } catch (error) { if ((error as AWS.AWSError).code === DynamoDBExceptionCode.CONDITIONAL_CHECK_FAILED_EXCEPTION) { const newStorageInfo = new DynamoDBCacheDataStorageInfo(orgId); newStorageInfo.data = { current: sizeDiff, max: 0, }; await this.options.client.put({ TableName: this.options.tableName, Item: newStorageInfo.toObject(), }).promise(); } else { throw error; } } } } catch (error) { throw new SaveCacheError((error as Error).message); } } public async wasBilled(orgId: string, fileId: string, segmentId: string): Promise { log.debug(`${this.constructor.name}: wasBilled`, { orgId, fileId, segmentId }); DynamodbCacheManager.assertWasBilledArguments(orgId, fileId, segmentId); try { const oldDataSegment = DynamoDBCacheDataSegment.fromOutput( await this.options.client.update({ TableName: this.options.tableName, Key: new DynamoDBCacheDataSegment(orgId, this.filterFileId(fileId), segmentId).key, ConditionExpression: 'attribute_exists(pk) AND attribute_exists(sk)', UpdateExpression: 'SET #data.#billed = :value', ExpressionAttributeNames: { '#data': 'data', '#billed': 'billed', }, ExpressionAttributeValues: { ':value': true }, ReturnValues: 'ALL_OLD', }).promise(), ); if (oldDataSegment) { return !!(oldDataSegment.data as CachedSegment).billed; } return true; } catch (err) { const error = err as AWS.AWSError; if (error.code === DynamoDBExceptionCode.CONDITIONAL_CHECK_FAILED_EXCEPTION) { return true; } throw new WasBilledError(error.message); } } private async getCacheStorageInfo(orgId: string): Promise { log.debug(`${this.constructor.name}: getCacheStorageInfo`, { orgId }); return DynamoDBCacheDataStorageInfo.fromOutput( await this.options.client.get({ TableName: this.options.tableName, Key: new DynamoDBCacheDataStorageInfo(orgId).key, }).promise(), ) as DynamoDBCacheDataStorageInfo & { data: IDynamoDBCacheDataStorageInfoData } | null; } private async getAllCacheStorageInfo(): Promise<(DynamoDBCacheDataStorageInfo & { data: IDynamoDBCacheDataStorageInfoData })[]> { log.debug(`${this.constructor.name}: getAllOrganizationStorageInfo`); const organizations = []; const storageInfoQueryInput: AWS.DynamoDB.DocumentClient.QueryInput = { TableName: this.options.tableName, Limit: this.options.paginationLimit, KeyConditionExpression: 'pk = :pk', ExpressionAttributeValues: { ':pk': DynamoDBCacheDataType.STORAGE_INFO }, }; do { // eslint-disable-next-line no-await-in-loop const result = await this.options.client.query(storageInfoQueryInput).promise(); storageInfoQueryInput.ExclusiveStartKey = result.LastEvaluatedKey; organizations.push(...plainToClass(DynamoDBCacheDataStorageInfo, result.Items as AWS.DynamoDB.DocumentClient.ItemList)); } while (storageInfoQueryInput.ExclusiveStartKey != null); return organizations as (DynamoDBCacheDataStorageInfo & { data: IDynamoDBCacheDataStorageInfoData })[]; } private async reduceOrganizationStorage( storageInfo: DynamoDBCacheDataStorageInfo & { data: IDynamoDBCacheDataStorageInfoData }, targetSize?: number, filter?: CacheSegmentFilter, dryRun?: boolean, lock?: DynamoDBCacheDataLock, ): Promise { if (!this.isGCEnabled()) { return; } log.debug(`${this.constructor.name}: reduceOrganizationStorageInfo`, { storageInfo, targetSize, filter, dryRun, }); let finished = false; // TODO: this needs to be disabled as it will never find the required data because of PK change. const segmentQueryParams: AWS.DynamoDB.DocumentClient.QueryInput = { TableName: this.options.tableName, IndexName: this.options.scoreIndexName, Limit: this.options.paginationLimit, KeyConditionExpression: 'pk = :pk', ExpressionAttributeValues: { ':pk': `${DynamoDBCacheDataType.SEGMENT}#${storageInfo.organization}` }, ScanIndexForward: true, }; const done = () => (!this.isGCEnabled() || (targetSize != null ? storageInfo.data.current <= targetSize : finished)); // This is reset at each counter update. let dataErasedSize = 0; // This is not reset and is logged at the end. let totalDataErasedSize = 0; // Updates the counter in S3 atomically by subtracting and receives the modified value, // which then it stores in the js variable in place of the old counter. const updateDataCounter = async () => { if (dataErasedSize > 0) { const result = await this.options.client.update({ TableName: this.options.tableName, Key: storageInfo.key, UpdateExpression: 'ADD #data.#current :value', ExpressionAttributeNames: { '#data': 'data', '#current': 'current', }, ExpressionAttributeValues: { ':value': -dataErasedSize }, ReturnValues: 'UPDATED_NEW', }).promise(); dataErasedSize = 0; storageInfo.data.current = (result.Attributes as typeof storageInfo).data.current; } }; // Hold the loop step in a variable and update the data counter once in a while, then reset the step. let step = 0; /* eslint-disable no-await-in-loop */ while (!done()) { const segmentQueryOutput = await this.options.client.query(segmentQueryParams).promise(); if (segmentQueryOutput.LastEvaluatedKey) { segmentQueryParams.ExclusiveStartKey = segmentQueryOutput.LastEvaluatedKey; } else { finished = true; } if (_.isEmpty(segmentQueryOutput.Items)) { storageInfo.data.current = 0; break; } for (const segment of plainToClass(DynamoDBCacheDataSegment, segmentQueryOutput.Items as AWS.DynamoDB.DocumentClient.ItemList)) { const data = segment.data as ISegment; if (!filter || (filter && filter(data))) { log.debug(`${this.constructor.name}: reduceOrganizationStorageInfo: deleting segment ${segment.id}, duration: ${data.duration}, type: ${data.type}, fileId: ${data.fileId as string}, ${data.url as string}`); if (!dryRun) { try { await this.options.client.delete({ TableName: this.options.tableName, Key: segment.key, }).promise(); dataErasedSize += data.size as number; totalDataErasedSize += data.size as number; storageInfo.data.current -= data.size as number; this.emit( CacheManagerEvent.REMOVE_CACHE, { ...segment.data, id: segment.id, orgId: segment.organization, ...(this.options.partitionSegmentsPerFiles ? { fileId: segment.file } : {}), }, true, ); } catch (error) { log.error(`${this.constructor.name}: reduceOrganizationStorageInfo: could not delete segment: ${segment.id}`, error); } } } if (lock) { await this.heartbeatLock(lock); } if (targetSize != null && done()) { break; } step += 1; if (step === DynamodbCacheManager.COUNTER_SAVE_STEPS) { await updateDataCounter(); step = 0; } } } /* eslint-enable no-await-in-loop */ await updateDataCounter(); if (totalDataErasedSize > 0) { log.debug(`${this.constructor.name}: reduceOrganizationStorageInfo: removed ${totalDataErasedSize}`); } } private filterFileId(fileId: string): string | null { return this.options.partitionSegmentsPerFiles ? fileId : null; } private async acquireLock(reason: string, ttl = 60): Promise { log.debug(`${this.constructor.name}: acquireLock`, { reason }); try { const lock = new DynamoDBCacheDataLock(reason, ttl); await this.options.client.put({ TableName: this.options.tableName, Item: lock.toObject(), ConditionExpression: 'attribute_not_exists(#data.#expiresAt) OR #data.#expiresAt < :currentTime', ExpressionAttributeNames: { '#data': 'data', '#expiresAt': 'expiresAt', }, ExpressionAttributeValues: { ':currentTime': Date.now(), }, }).promise(); return lock; } catch (error) { if ((error as AWS.AWSError).code !== DynamoDBExceptionCode.CONDITIONAL_CHECK_FAILED_EXCEPTION) { throw error; } } return null; } private async heartbeatLock(lock: DynamoDBCacheDataLock): Promise { log.debug(`${this.constructor.name}: heartbeatLock`, { lock }); if (lock && !lock.isExpired) { lock.heartbeat(); try { await this.options.client.update({ TableName: this.options.tableName, Key: lock.key, UpdateExpression: 'SET #data.#expiresAt = :value, #data.#heartbeats = :heartbeats', ConditionExpression: 'attribute_exists(pk) AND attribute_exists(sk)', ExpressionAttributeNames: { '#data': 'data', '#expiresAt': 'expiresAt', '#heartbeats': 'heartbeats', }, ExpressionAttributeValues: { ':value': lock.data.expiresAt, ':heartbeats': lock.data.heartbeats, }, }).promise(); } catch (error) { log.error(`${this.constructor.name}: heartbeatLock: failed`, error); } } } private async releaseLock(lock: DynamoDBCacheDataLock): Promise { log.debug(`${this.constructor.name}: releaseLock`, { lock }); if (lock) { try { await this.options.client.delete({ TableName: this.options.tableName, Key: lock.key, }).promise(); } catch (error) { log.error(`${this.constructor.name}: releaseLock: failed`, error); } } } }