// npm import * as _ from 'lodash'; import * as assert from 'assert'; import { v4 as uuid } from 'uuid'; import * as rp from 'request-promise'; import sleep from 'sleep-promise'; import * as sinon from 'sinon'; import bluebird from 'bluebird'; // app import { CacheManager, CacheManagerEvent, CacheType, DynamoDBCacheDataLock, DynamodbCacheManager, } from '../lib/cache'; import { ISegment, SegmentType } from '../lib/playlist-builder'; import { config } from '../config'; import { init } from '../server'; import { ensureTestDynamoDBTable, flushDynamoDBTable } from './utils/dynamodb'; import { SegmentLoader } from '../lib/segment-loader'; import { GenericRequestPromise } from '../utils/types'; import { createTokenVerifierMock, getOrCreateOrganization, IMockOrganization } from './utils/zypline-mock'; describe('DynamoDB Cache Manager', () => { let mediaviewServer: { close: () => Promise }; let cacheManager: DynamodbCacheManager; let originalConfigCacheSelect: typeof config.cacheSelect; let originalConfigCleanupInterval: typeof config.cacheCleanupInterval; let originalConfigDynamoPartitionSegmentsPerFiles: typeof config.cacheDynamo.partitionSegmentsPerFiles; let org: IMockOrganization; before(async () => { originalConfigCacheSelect = config.cacheSelect; originalConfigCleanupInterval = config.cacheCleanupInterval; originalConfigDynamoPartitionSegmentsPerFiles = config.cacheDynamo.partitionSegmentsPerFiles; config.cacheSelect = CacheType.DYNAMO; await ensureTestDynamoDBTable(); cacheManager = CacheManager.getInstance() as DynamodbCacheManager; mediaviewServer = await init(createTokenVerifierMock(), 10 * 60 * 1000, 1); org = getOrCreateOrganization('test'); }); beforeEach(async () => { await flushDynamoDBTable(); config.cacheCleanupInterval = originalConfigCleanupInterval; config.cacheDynamo.partitionSegmentsPerFiles = originalConfigDynamoPartitionSegmentsPerFiles; cacheManager.startCacheGC(); await cacheManager.setMaxStorage('1', 10); await cacheManager.setMaxStorage('2', 10); cacheManager.removeAllListeners(CacheManagerEvent.SAVE_CACHE); cacheManager.removeAllListeners(CacheManagerEvent.REMOVE_CACHE); }); after(async () => { await flushDynamoDBTable(); cacheManager.stopCacheGC(); await mediaviewServer.close(); config.cacheCleanupInterval = originalConfigCleanupInterval; config.cacheDynamo.partitionSegmentsPerFiles = originalConfigDynamoPartitionSegmentsPerFiles; config.cacheSelect = originalConfigCacheSelect; CacheManager.destroyInstance(); }); it('should create storage info on cache save', async () => { const organization = uuid(); const segment: ISegment = { cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: organization, totalStartTime: 0, type: SegmentType.Audio, size: 1, s3Path: 'axa', fileId: '6bdce57f-20aa-4059-8aaf-adeb36b117b1', }; segment.id = SegmentLoader.generateSegmentId(segment); const beforeStorageSize = await cacheManager.getStorage(organization); await cacheManager.saveCache(segment.orgId, segment.id, segment); const afterStorageSize = await cacheManager.getStorage(organization); assert.equal(beforeStorageSize, 0); assert.equal(afterStorageSize, segment.size); }); it('should cleanup cache at max organization storage', async () => { const organization = uuid(); await cacheManager.setMaxStorage(organization, 10); await bluebird.map(_.range(10), async () => { const segment: ISegment = { cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: organization, totalStartTime: 0, type: SegmentType.Audio, size: 1, s3Path: 'axa', fileId: '6bdce57f-20aa-4059-8aaf-adeb36b117b1', }; segment.id = uuid(); await cacheManager.saveCache(segment.orgId, segment.id, segment); }); await cacheManager.setMaxStorage(organization, 2); await cacheManager.cleanupCache(); const current = await cacheManager.getStorage(organization); assert.equal(current, 2); }); it('should clear cache on request for specific organization', async () => { const organization = uuid(); await cacheManager.setMaxStorage(organization, 10); await bluebird.map(_.range(10), async () => { const segment: ISegment = { cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: organization, totalStartTime: 0, type: SegmentType.Audio, size: 1, s3Path: 'axa', fileId: '6bdce57f-20aa-4059-8aaf-adeb36b117b1', }; segment.id = uuid(); await cacheManager.saveCache(segment.orgId, segment.id, segment); }); await cacheManager.clearCache( organization, undefined, undefined, undefined, undefined, false, ); const current = await cacheManager.getStorage(organization); assert.equal(current, 0); }); it('should not run cleanup or clear cache if it is disabled', async () => { cacheManager.stopCacheGC(); const organization = uuid(); await cacheManager.setMaxStorage(organization, 10); await bluebird.map(_.range(10), async () => { const segment: ISegment = { cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: organization, totalStartTime: 0, type: SegmentType.Audio, size: 1, s3Path: 'axa', fileId: '6bdce57f-20aa-4059-8aaf-adeb36b117b1', }; segment.id = uuid(); await cacheManager.saveCache(segment.orgId, segment.id, segment); }); const before = await cacheManager.getStorage(organization); await cacheManager.setMaxStorage(organization, 2); await cacheManager.cleanupCache(); const afterCleanup = await cacheManager.getStorage(organization); await cacheManager.clearCache( organization, undefined, undefined, undefined, undefined, false, ); const afterClear = await cacheManager.getStorage(organization); assert.equal(before, 10); assert.equal(afterCleanup, before); assert.equal(afterClear, before); }); it('should partition segments by file if configured', async () => { config.cacheCleanupInterval = 0; config.cacheDynamo.partitionSegmentsPerFiles = true; const organization = 'testOrgId'; const segment: ISegment = { cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: organization, totalStartTime: 0, type: SegmentType.Audio, size: 1, s3Path: 'axa', }; segment.id = SegmentLoader.generateSegmentId(segment); segment.fileId = 'testFileId'; await cacheManager.saveCache(segment.orgId, segment.id, segment); const savedSegment = await cacheManager.checkCache(segment.orgId, segment.fileId, segment.id); const wasBilled = await cacheManager.wasBilled(segment.orgId, segment.fileId, segment.id); const wasBilled2 = await cacheManager.wasBilled(segment.orgId, segment.fileId, segment.id); await cacheManager.removeCache(segment.orgId, segment.fileId, segment.id); const removedSegment = await cacheManager.checkCache(segment.orgId, segment.fileId, segment.id); assert.deepEqual(savedSegment, segment); assert.equal(wasBilled, false); assert.equal(wasBilled2, true); assert.equal(removedSegment, undefined); }); it('should save cache and remove it', async () => { const segment: ISegment = { cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: '1', totalStartTime: 0, type: SegmentType.Audio, size: 1, s3Path: 'axa', }; segment.id = SegmentLoader.generateSegmentId(segment); segment.fileId = '6bdce57f-20aa-4059-8aaf-adeb36b117b1'; await cacheManager.saveCache(segment.orgId, segment.id, segment); await cacheManager.removeCache(segment.orgId, segment.fileId, segment.id); const resp = await cacheManager.checkCache(segment.orgId, segment.fileId, segment.id); const storage = await cacheManager.getStorage(segment.orgId); assert.equal(resp, null); assert.equal(storage, 0); }); it('should save cache and restore it', async () => { const segment: ISegment = { cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: '1', totalStartTime: 0, type: SegmentType.Audio, size: 1, s3Path: 'axa', }; segment.id = SegmentLoader.generateSegmentId(segment); segment.fileId = '6bdce57f-20aa-4059-8aaf-adeb36b117b1'; await cacheManager.saveCache(segment.orgId, segment.id, segment); const resp = await cacheManager.checkCache(segment.orgId, segment.fileId, segment.id); await cacheManager.removeCache(segment.orgId, segment.fileId, segment.id); assert.deepEqual(resp, segment); }); it('should save cache and check it in a single batch', async () => { const organization = uuid(); await cacheManager.setMaxStorage(organization, 10); const segments = _.map( _.range(10), () => ({ id: uuid(), cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: organization, totalStartTime: 0, type: SegmentType.Audio, size: 1, s3Path: 'axa', fileId: '6bdce57f-20aa-4059-8aaf-adeb36b117b1', }), ); await bluebird.map(segments, async (segment) => { await cacheManager.saveCache(segment.orgId, segment.id, segment); }); const cachedSegments = await cacheManager.checkCacheBatch( _.map( segments, (segment) => ({ orgId: segment.orgId, fileId: segment.fileId, segmentId: segment.id, }), ), ); const cachedSegmentsPartial = await cacheManager.checkCacheBatch([ { orgId: segments[0].orgId, fileId: segments[0].fileId, segmentId: segments[0].id, }, { orgId: 'non-existing-org', fileId: 'non-existing-file', segmentId: 'non-existing-segmentId', }, ]); const cachedSegmentsEmpty = await cacheManager.checkCacheBatch([ { orgId: 'non-existing-org', fileId: 'non-existing-file', segmentId: 'non-existing-segmentId', }, ]); assert.deepEqual(cachedSegments, segments); assert.deepEqual(cachedSegmentsPartial, [segments[0]]); assert.deepEqual(cachedSegmentsEmpty, []); }); it('should evict cache', async () => { const org1Segment: ISegment = { cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: '1', totalStartTime: 0, type: SegmentType.Audio, size: 11, s3Path: 'axa', }; org1Segment.id = SegmentLoader.generateSegmentId(org1Segment); org1Segment.fileId = '6bdce57f-20aa-4059-8aaf-adeb36b117b1'; const org2Segment: ISegment = { cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: '2', totalStartTime: 0, type: SegmentType.Audio, size: 10, s3Path: 'axa2', fileId: '6bdce57f-20aa-4059-8aaf-adeb36b117b2', }; org2Segment.id = SegmentLoader.generateSegmentId(org2Segment); org2Segment.fileId = '6bdce57f-20aa-4059-8aaf-adeb36b117b1'; await cacheManager.saveCache(org1Segment.orgId, org1Segment.id, org1Segment); await cacheManager.saveCache(org2Segment.orgId, org2Segment.id, org2Segment); await cacheManager.cleanupCache(); const resp = await cacheManager.checkCache(org1Segment.orgId, org1Segment.fileId, org1Segment.id); await cacheManager.removeCache(org1Segment.orgId, org1Segment.fileId, org1Segment.id); await cacheManager.removeCache(org2Segment.orgId, org2Segment.fileId, org2Segment.id); assert.equal(resp, null); }); it('should increment the cache only once for a segment', async () => { const segment: ISegment = { cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: '1', totalStartTime: 0, type: SegmentType.Audio, size: 10, s3Path: 'axa', }; segment.id = SegmentLoader.generateSegmentId(segment); segment.fileId = '6bdce57f-20aa-4059-8aaf-adeb36b117b1'; await cacheManager.saveCache(segment.orgId, segment.id, segment); await cacheManager.saveCache(segment.orgId, segment.id, segment); const storage = await cacheManager.getStorage(segment.orgId); await cacheManager.removeCache(segment.orgId, segment.fileId, segment.id); assert.strictEqual(segment.size, storage); }); it('should save segment cache for non-initialized organization', async () => { const segment: ISegment = { cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: uuid(), totalStartTime: 0, type: SegmentType.Audio, size: 10, s3Path: 'axa', fileId: '6bdce57f-20aa-4059-8aaf-adeb36b117b1', }; segment.id = SegmentLoader.generateSegmentId(segment); segment.fileId = '6bdce57f-20aa-4059-8aaf-adeb36b117b1'; await cacheManager.saveCache(segment.orgId, segment.id, segment); const cachedSegment = await cacheManager.checkCache(segment.orgId, segment.fileId, segment.id); assert.deepEqual(cachedSegment, segment); }); it('should gracefully handle non-existing org cache storage', async () => { const organizationId: string = uuid(); const maxOrgStorage: number = await cacheManager.getMaxStorage(organizationId); const orgStorage: number = await cacheManager.getStorage(organizationId); assert.equal(maxOrgStorage, 0); assert.equal(orgStorage, 0); await cacheManager.setMaxStorage(organizationId, 11); const maxOrgStorageAfterSet: number = await cacheManager.getMaxStorage(organizationId); assert.equal(maxOrgStorageAfterSet, 11); }); it('should gracefully handle non-existing org cache storage via the API', async () => { const organizationId: string = uuid(); const requestOptions = { url: `http://localhost:3000/org-cache?orgSlug=${organizationId}`, headers: { 'X-Organization-Slug': org.slug, Authorization: 'Bearer smth smth smth', }, timeout: 10 * 60 * 1000, }; const resp = await (rp.get(requestOptions) as GenericRequestPromise); assert.ok(resp); assert.deepEqual(JSON.parse(resp), { cacheUsed: 0 }); await cacheManager.setMaxStorage(organizationId, 10); const segment: ISegment = { cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: organizationId, totalStartTime: 0, type: SegmentType.Audio, size: 9, s3Path: 'axa', fileId: '6bdce57f-20aa-4059-8aaf-adeb36b117b1', }; segment.id = SegmentLoader.generateSegmentId(segment); await cacheManager.saveCache(organizationId, segment.id, segment); const resp2 = await (rp.get(requestOptions) as GenericRequestPromise); assert.ok(resp2); assert.deepEqual(JSON.parse(resp2), { cacheUsed: segment.size }); }); it('should set and then retrieve wasBilled', async () => { const organizationId: string = uuid(); const segment: ISegment = { cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: organizationId, totalStartTime: 0, type: SegmentType.Audio, size: 10, s3Path: 'axa', }; segment.id = SegmentLoader.generateSegmentId(segment); segment.fileId = '6bdce57f-20aa-4059-8aaf-adeb36b117b1'; const wasBilledBeforeCache = await cacheManager.wasBilled(segment.orgId, segment.fileId, segment.id); const wasBilledBeforeCache2 = await cacheManager.wasBilled(segment.orgId, segment.fileId, segment.id); await cacheManager.saveCache(segment.orgId, segment.id, segment); const wasBilledAfterCache = await cacheManager.wasBilled(segment.orgId, segment.fileId, segment.id); const wasBilledAfterCache2 = await cacheManager.wasBilled(segment.orgId, segment.fileId, segment.id); assert.equal(wasBilledBeforeCache, true); assert.equal(wasBilledBeforeCache2, true); assert.equal(wasBilledAfterCache, false); assert.equal(wasBilledAfterCache2, true); }); it('should throw assertion errors for invalid arguments on cache manager methods', async () => { await assert.rejects( async () => cacheManager.getStorage(''), { name: 'GetStorageError', message: 'Invalid organization id provided.', }, ); await assert.rejects( async () => cacheManager.getMaxStorage(''), { name: 'GetMaxStorageError', message: 'Invalid organization id provided.', }, ); await assert.rejects( async () => cacheManager.setMaxStorage('', Number.NaN), { name: 'SetMaxStorageError', message: 'Invalid organization id provided.', }, ); await assert.rejects( async () => cacheManager.setMaxStorage('testOrg', Number.NaN), { name: 'SetMaxStorageError', message: 'Invalid max storage provided: "NaN". Please provide a positive numeric value.', }, ); await assert.rejects( async () => cacheManager.wasBilled('', '', ''), { name: 'WasBilledError', message: 'Invalid organization id provided.', }, ); await assert.rejects( async () => cacheManager.wasBilled('testOrg', '', ''), { name: 'WasBilledError', message: 'Invalid file id provided.', }, ); await assert.rejects( async () => cacheManager.wasBilled('testOrg', 'testFile', ''), { name: 'WasBilledError', message: 'Invalid segment id provided.', }, ); await assert.rejects( async () => cacheManager.checkCache('', '', ''), { name: 'CheckCacheError', message: 'Invalid organization id provided.', }, ); await assert.rejects( async () => cacheManager.checkCache('testOrg', '', ''), { name: 'CheckCacheError', message: 'Invalid file id provided.', }, ); await assert.rejects( async () => cacheManager.checkCache('testOrg', 'testFile', ''), { name: 'CheckCacheError', message: 'Invalid segment id provided.', }, ); await assert.rejects( async () => cacheManager.checkCacheBatch([{ orgId: '', fileId: '', segmentId: '' }]), { name: 'CheckCacheBatchError', message: 'Invalid organization id provided.', }, ); await assert.rejects( async () => cacheManager.checkCacheBatch([{ orgId: 'testOrg', fileId: '', segmentId: '' }]), { name: 'CheckCacheBatchError', message: 'Invalid file id provided.', }, ); await assert.rejects( async () => cacheManager.checkCacheBatch([{ orgId: 'testOrg', fileId: 'testFile', segmentId: '' }]), { name: 'CheckCacheBatchError', message: 'Invalid segment id provided.', }, ); await assert.rejects( async () => cacheManager.removeCache('', '', ''), { name: 'RemoveCacheError', message: 'Invalid organization id provided.', }, ); await assert.rejects( async () => cacheManager.removeCache('testOrg', '', ''), { name: 'RemoveCacheError', message: 'Invalid file id provided.', }, ); await assert.rejects( async () => cacheManager.removeCache('testOrg', 'testFile', ''), { name: 'RemoveCacheError', message: 'Invalid segment id provided.', }, ); await assert.rejects( async () => cacheManager.saveCache('', '', {} as ISegment), { name: 'SaveCacheError', message: 'Invalid organization id provided.', }, ); await assert.rejects( async () => cacheManager.saveCache('testOrg', '', {} as ISegment), { name: 'SaveCacheError', message: 'Invalid segment id provided.', }, ); await assert.rejects( async () => cacheManager.saveCache('testOrg', 'testSegment', null as unknown as ISegment), { name: 'SaveCacheError', message: 'Invalid segment object provided.', }, ); const segment = { cacheLocation: 'abc', duration: 0, editUnits: 1, totalStartTime: 0, type: SegmentType.Audio, size: 10, s3Path: 'axa', fileId: '6bdce57f-20aa-4059-8aaf-adeb36b117b1', }; await assert.rejects( async () => cacheManager.saveCache('testOrg', 'testSegment', { ...segment, orgId: '' }), { name: 'SaveCacheError', message: 'The provided segment object has a different organization id than the one provided as argument. Argument OrgId: testOrg, Segment OrgId: .', }, ); await assert.rejects( async () => cacheManager.saveCache('testOrg', 'testSegment', { ...segment, orgId: 'testOrg', id: '' }), { name: 'SaveCacheError', message: 'The provided segment object has a different id than the one provided as argument. Argument Segment Id: testSegment, Segment Segment Id: .', }, ); }); it('should acquire a lock, heartbeat it and release it', async () => { const lockName = 'testLock1'; const lockTTL = 2; /* eslint-disable @typescript-eslint/dot-notation */ const lock1 = await cacheManager['acquireLock'](lockName, lockTTL); const lock1Retry1 = await cacheManager['acquireLock'](lockName, lockTTL); await sleep(3000); const lock1Retry2 = await cacheManager['acquireLock'](lockName, lockTTL); const lock1Retry3 = await cacheManager['acquireLock'](lockName, lockTTL); await cacheManager['releaseLock'](lock1Retry2 as DynamoDBCacheDataLock); const lock1Retry4 = await cacheManager['acquireLock'](lockName, lockTTL); await sleep(1500); await cacheManager['heartbeatLock'](lock1Retry4 as DynamoDBCacheDataLock); await sleep(1500); await cacheManager['heartbeatLock'](lock1Retry4 as DynamoDBCacheDataLock); await sleep(1500); const lock1Retry5 = await cacheManager['acquireLock'](lockName, lockTTL); /* eslint-enable @typescript-eslint/dot-notation */ assert.notEqual(lock1, undefined); assert.equal(lock1Retry1, undefined); assert.notEqual(lock1Retry2, undefined); assert.equal(lock1Retry3, undefined); assert.notEqual(lock1Retry4, undefined); assert.equal(lock1Retry5, undefined); }); it('should emit events for specific actions', async () => { const saveCacheEventCallback1 = sinon.spy(); const saveCacheEventCallback2 = sinon.spy(); const saveCacheEventCallback3 = sinon.spy(); cacheManager.on(CacheManagerEvent.SAVE_CACHE, saveCacheEventCallback1); cacheManager.on(CacheManagerEvent.SAVE_CACHE, saveCacheEventCallback2); cacheManager.on(CacheManagerEvent.SAVE_CACHE, saveCacheEventCallback3); const removeCacheEventCallback = sinon.spy(); cacheManager.on(CacheManagerEvent.REMOVE_CACHE, removeCacheEventCallback); const org1Segment: ISegment = { cacheLocation: 'abc', duration: 0, editUnits: 1, orgId: '1', totalStartTime: 0, type: SegmentType.Audio, size: 11, s3Path: 'axa', }; org1Segment.id = SegmentLoader.generateSegmentId(org1Segment); org1Segment.fileId = '6bdce57f-20aa-4059-8aaf-adeb36b117b1'; await cacheManager.saveCache(org1Segment.orgId, org1Segment.id, org1Segment); assert.ok(saveCacheEventCallback1.calledOnceWithExactly(org1Segment)); assert.ok(saveCacheEventCallback2.calledImmediatelyAfter(saveCacheEventCallback1)); assert.ok(saveCacheEventCallback3.calledImmediatelyAfter(saveCacheEventCallback2)); await cacheManager.removeCache(org1Segment.orgId, org1Segment.fileId, org1Segment.id); assert.ok(removeCacheEventCallback.calledOnceWithExactly(org1Segment, false)); removeCacheEventCallback.resetHistory(); await cacheManager.removeCache(org1Segment.orgId, org1Segment.fileId, org1Segment.id); assert.ok(removeCacheEventCallback.notCalled); removeCacheEventCallback.resetHistory(); await cacheManager.saveCache(org1Segment.orgId, org1Segment.id, org1Segment); await cacheManager.cleanupCache(); assert.ok(removeCacheEventCallback.calledOnceWithExactly(org1Segment, true)); removeCacheEventCallback.resetHistory(); await cacheManager.saveCache(org1Segment.orgId, org1Segment.id, org1Segment); await cacheManager.clearCache(org1Segment.orgId); assert.ok(removeCacheEventCallback.calledOnceWithExactly(org1Segment, true)); }); });