// @ts-nocheck import * as bluebird from 'bluebird'; import * as redisClient from 'redis'; import * as commander from 'commander'; import * as fs from 'fs'; import * as md5 from 'md5'; import * as _ from 'lodash'; import { Honeycomb } from '../tracing'; import { s3Lib } from '@ownzones/lib'; bluebird.promisifyAll(redisClient.RedisClient.prototype); bluebird.promisifyAll(redisClient.Multi.prototype); const files = []; const migratedFiles = []; const skippedFiles = []; (async () => { commander .option('-h, --host ', 'Redis host to work with') .option('-p, --port ', 'The port on which to connect') .option('-d, --db ', 'The Redis db used.') .option('-o, --org-slug ', 'Org slug') .option('--all', 'Delete all orgs all cache') .option('-nd, --no-dry-run', 'Don\'t run in dry run mode') .option('-f, --files-file ', 'A file containing the files that need to be removed one per line') .option('-z, --single-file ', 'The url of the file you want to remove from cache') .option('-s, --chunk-size ', 'Size of chunk for scan. (Default: 10000)') .option('-j, --deletion-parallelism ', 'Deletion parallelism') .parse(process.argv); if (!commander.orgSlug && !commander.all) { console.log('You need to specify the org slug for which you want to evict or specify all'); return; } const orgSlug = commander.orgSlug; const prefix = orgSlug ? `mv-cache:${orgSlug}:*` : 'mv-cache:*'; if (!commander.host) { console.log('Please specify the redis host'); return; } const config = { host: commander.host, port: commander.port || '6379', database: commander.db || 7, tls: {}, }; const redis: any = redisClient.createClient(config); if (config.database) { redis.select(config.database); } if (commander.filesFile) { const newFiles = fs.readFileSync(commander.filesFile).toString().split('\n').map(x => x.trim()).filter(x => !!x); Array.prototype.push.apply(files, newFiles); } if (commander.singleFile) { files.push(commander.singleFile.trim()); } const chunkSize = commander.chunkSize || '10000'; let parallelism = 10; if (commander.deletionParallelism) { parallelism = parseInt(commander.deletionParallelism, 10) || 10; } if (commander.dryRun) { console.log('Running in dry run mode.'); } function hash(obj) { const hashingObject = _.clone(obj); // Subsegment_concat has their ids generate before changing the type to subsegment_concat so if (hashingObject.type === 'subsegment_concat') { hashingObject.type = 'image'; } const sorted = Honeycomb.sortKeys(hashingObject, { honeycombTraceId: true, honeycombParentId: true, totalStartTime: true, billed: true, id: true, s3Path: true, size: true, urls: true, }); // JSON.stringify(sorted, null, 4); return md5(JSON.stringify(sorted)); } const numFields = [ 'startTime', 'totalStartTime', 'channels', 'duration', 'size', 'entryPoint', 'editUnits', 'sampleRate', 'streamIndex', 'height', 'width', // 'bitDepth', 'editRate', ]; let mc = 0; let mb = 0; let ac = 0; async function doTheBalaceanca(key) { // console.log(key); const segment = await redis.hgetallAsync(key); for (const prop in segment) { if (segment.hasOwnProperty(prop)) { if (segment[prop] === 'null') { segment[prop] = null; } if (segment[prop] === 'undefined') { segment[prop] = undefined; } } } for (const prop of numFields) { if (segment.hasOwnProperty(prop)) { segment[prop] = parseFloat(segment[prop]); } } const prevId = hash(segment); // @ts-ignore const oldS3Path = segment.s3Path; if (prevId !== segment.id) { // If this happens then we didn't parse the object correctly. It usually represents some old version of the structure. console.log(prevId, segment.id); // console.log(JSON.stringify(segment, null, 2)); mb += 1; } else { // In this case we know how to parse the segment from redis. Yey!! segment.mxfIndexUrl = segment.mxfIndexUrl || null; if (segment.type === 'audio') { segment.layout = segment.layout || null; } const newId = hash(segment); // Testing to see if we didn't encounter an already migrated segment. if (newId !== prevId) { const p = key.split(':'); p[2] = newId; const newKey = p.join(':'); segment.id = newId; // console.log(newId); // console.log(newKey); if (await redis.existsAsync(newKey)) { // In this case we already have the new segment generated console.log(`${newId} already exists, ${prevId} already migrated`); } else { // This is the case where we need to do the migration ac += 1; // console.dir(segment, { colors: true }); if (!commander.dryRun) { // Run try { const lruKey = `mv-cache:${p[1]}:LRU`; const lruValue = await redis.zscoreAsync(lruKey, prevId); const newS3Path = oldS3Path.replace(prevId, newId); await s3Lib.copyObject(oldS3Path, newS3Path); segment.s3Path = newS3Path; await redis.multi() .zrem(`mv-cache:${p[1]}:LRU`, prevId) .hmset(newKey, segment) .hset(key, 'newId', newId) .zadd(`mv-cache:${p[1]}:LRU`, lruValue, newId) .del(key) .execAsync(); // Error } catch (err) { console.log(err); console.log(`Skipping fileId: ${segment.fileId}`); skippedFiles.push(segment.fileId); } } console.log(`Migrated ${prevId} to ${newId} - fileId: ${segment.fileId}; url: ${segment.url}`); migratedFiles.push(segment.fileId); } } else { console.log(`${newId} is new version`); } } // console.log(newId); mc += 1; if (mc % 1000 === 0) { console.log(mc, mb, ac); } } // await doTheBalaceanca('mv-cache:viaplay:e3a5e737862cc3736f34335385af6cc8'); // await doTheBalaceanca('mv-cache:viaplay:79d4821ef192d5a5ad1f35d480c8c8e9'); // process.exit(0); // return; let cursor = '0'; do { const response = await redis.scanAsync(cursor, 'MATCH', prefix, 'COUNT', chunkSize); cursor = response[0]; const keys = response[1]; await bluebird.map(keys, async (key: string) => { const id = key.split(':')[2]; if (id === 'maxStorageSize' || id === 'LRU' || id === 'storageSize' || key.split(':').length !== 3) { return; } await doTheBalaceanca(key); // console.log(segment.totalStartTime, id); }, { concurrency: parallelism }); } while (cursor !== '0'); console.log('Migrated files:'); console.log(JSON.stringify(_.uniq(migratedFiles), null, 2)); console.log('Skipped files:'); console.log(JSON.stringify(_.uniq(skippedFiles), null, 2)); process.exit(0); })();