// @ts-nocheck import * as commander from 'commander'; import * as redis from 'redis'; import * as AWS from 'aws-sdk'; import * as _ from 'lodash'; import * as bluebird from 'bluebird'; import { retry } from 'ts-retry-promise'; import * as cliProgress from 'cli-progress'; import { config } from '../config'; // import { flushDynamoDBTable } from '../test/utils/dynamodb'; bluebird.promisifyAll(redis.RedisClient.prototype); bluebird.promisifyAll(redis.Multi.prototype); (async () => { const command = new commander.Command(); command .option('-k --key-prefix ', 'Key prefix for the redis cache', config.cachePrefix) .option('-r --dynamo-region ', 'Region for the Dynamo DB Table', config.cacheDynamo.region) .option('-t --dynamo-table ', 'Name for the Dynamo DB Table', config.cacheDynamo.tableName) .option('-h --redis-host ', 'Redis host', config.cacheRedis.host) .option('-p --redis-port ', 'Redis port', String(config.cacheRedis.port)) .option('-d --redis-db ', 'Redis db', config.cacheRedis.database.toString()) .option('-p, --parallelism ', 'Organization parallelism', '4') .option('-o --organizations ', 'The organizations you want to migrate, if you want all do not specify this option') .option('-f --flush', 'Flush DynamoDB Table before running the migration', false) .option('-v --verbose', 'Verbose logging', false) .parse(process.argv); const redisConfig = { host: command.redisHost, port: command.redisPort, db: Number.parseInt(command.redisDb, 10), }; if (process.env.FORCE_REDIS_SSL && process.env.FORCE_REDIS_SSL.toLowerCase() === 'true') { redisConfig.tls = {}; } const redisClient = redis.createClient(redisConfig) as any; const dynamoClient = new AWS.DynamoDB.DocumentClient({ region: command.dynamoRegion, maxRetries: 25 }); const orgSet = `${command.keyPrefix}:organizations`; const multiBar = new cliProgress.MultiBar( { format: '[{bar}] {percentage}% | ETA: {eta}s | {org} | Pending: {pending}, Migrated: {migrated}, Skipped: {skipped}, Remaining: {remaining}, Total: {total} |' }, cliProgress.Presets.shades_classic, ); if (command.flush) { if (command.verbose) { console.log(`Flushing DynamoDB Table: ${command.dynamoTable}`); } const input: AWS.DynamoDB.DocumentClient.ScanInput = { TableName: command.dynamoTable, Limit: 25, }; do { const output: AWS.DynamoDB.DocumentClient.ScanOutput = await dynamoClient.scan(input).promise(); input.ExclusiveStartKey = output.LastEvaluatedKey; await dynamoClient.batchWrite({ RequestItems: { [command.dynamoTable]: _.map( output.Items, (item) => ({ DeleteRequest: { Key: _.pick(item, ['pk', 'sk']), }, }), ), }, }); } while (input.ExclusiveStartKey != null); } const orgs = await redisClient.smembersAsync(orgSet); const orgsFilter = _.compact(_.split(command.organizations, ',')); await bluebird.map(orgs, async (org) => { if (!_.isEmpty(orgsFilter) && !_.includes(orgsFilter, org)) { return; } if (command.verbose) { console.log(`### Copying from org ${org} ###`); } const maxStorage = Number.parseInt(await redisClient.getAsync(`${command.keyPrefix}:${org}:maxStorageSize`), 10); const currentStorage = Number.parseInt(await redisClient.getAsync(`${command.keyPrefix}:${org}:storageSize`), 10); if (maxStorage && currentStorage) { if (command.verbose) { console.log(`### Creating storageInfo for ${org} ###`); } const newStorageInfo = { current: currentStorage, max: maxStorage, }; await dynamoClient.put({ TableName: command.dynamoTable, Item: { pk: 'STORAGE_INFO', sk: org, data: newStorageInfo, }, }).promise(); } if (command.flush) { await redisClient.delAsync(`${command.keyPrefix}:${org}:migration`); } const pendingSegments = []; const totalSegments = await redisClient.evalAsync(` local allKey = KEYS[1] local migrationsKey = KEYS[2] local LRUKey = KEYS[3] local storageSizeKey = KEYS[4] local maxStorageSizeKy = KEYS[5] local nrAll = #redis.call('keys', allKey); local hasMigrations = redis.call('exists', migrationsKey); local hasLRU = redis.call('exists', LRUKey); local hasStorageSize = redis.call('exists', storageSizeKey); local hasMaxStorageSize = redis.call('exists', maxStorageSizeKy); return nrAll - hasMigrations - hasLRU - hasStorageSize - hasMaxStorageSize; `, 5, `${command.keyPrefix}:${org}:*`, `${command.keyPrefix}:${org}:migration`, `${command.keyPrefix}:${org}:LRU`, `${command.keyPrefix}:${org}:storageSize`, `${command.keyPrefix}:${org}:maxStorageSize`); let skippedSegments = 0; let migratedSegments = 0; let bar; if (!command.verbose) { bar = multiBar.create( totalSegments, 0, { org, pending: _.size(pendingSegments), skipped: skippedSegments, migrated: migratedSegments, remaining: totalSegments, }, ); } let cursor = '0'; do { const response = await redisClient.scanAsync(cursor, 'MATCH', `${command.keyPrefix}:${org}:*`, 'COUNT', '1000'); cursor = response[0]; const keys = response[1]; for (const key of keys) { const id = key.split(':')[2]; if (id === 'maxStorageSize' || id === 'LRU' || id === 'storageSize' || id === 'migration' || key.split(':').length !== 3) { continue; } if (command.verbose) { console.log(`Checking segment ${id}`); } const lua = ` local LRUKey = KEYS[1] local migrationKey = KEYS[2] local segmentKey = KEYS[3] local segmentId = KEYS[4] local orgPrefix = ARGV[1] local migrated = redis.call('hexists', migrationKey, segmentId) if migrated == 1 then return { migrated } else local score = redis.call('zscore', LRUKey, segmentKey) local segmentData = redis.call('hgetall', segmentKey) return { migrated, score, segmentData } end `; const [migrated, score, rawSegmentData] = await redisClient.evalAsync( lua, 4, `${command.keyPrefix}:${org}:LRU`, `${command.keyPrefix}:${org}:migration`, key, id, ); if (migrated) { skippedSegments += 1; if (command.verbose) { console.log(`Skipping segment ${id}, already migrated`); } else { bar.increment( 1, { org, pending: _.size(pendingSegments), skipped: skippedSegments, migrated: migratedSegments, remaining: totalSegments - skippedSegments - migratedSegments, }, ); } continue; } const segmentData = _.mapValues( _.fromPairs(_.chunk(rawSegmentData, 2)), (value, segKey) => { if (value === 'null') { return null; } if (value === 'true') { return true; } if (value === 'false') { return false; } if (_.includes(['duration', 'editRate', 'editUnits', 'entryPoint', 'height', 'width', 'size', 'startTime', 'streamIndex', 'totalStartTime'], segKey)) { return Number(value); } return value; }, ); pendingSegments.push({ PutRequest: { Item: { pk: `SEGMENT#${org}`, sk: id, score: Number(score), data: segmentData, }, }, }); if (!command.verbose) { bar.update( bar.value, { org, pending: _.size(pendingSegments), skipped: skippedSegments, migrated: migratedSegments, remaining: totalSegments - skippedSegments - migratedSegments, }, ); } } while (_.size(pendingSegments) >= (cursor === '0' ? 1 : 25)) { const nrPendingSegments = _.size(pendingSegments); const chunk = pendingSegments.splice(0, 25); if (command.verbose) { console.log(`processing ${_.size(chunk)} segments of ${nrPendingSegments}`); } await retry( async () => { try { await dynamoClient.batchWrite({ RequestItems: { [command.dynamoTable]: chunk, }, }).promise(); await Promise.all(_.map( chunk, ({ PutRequest: { Item: { sk } } }) => redisClient.hsetAsync(`${command.keyPrefix}:${org}:migration`, sk, 'true'), )); migratedSegments += _.size(chunk); if (!command.verbose) { bar.increment( _.size(chunk), { org, pending: _.size(pendingSegments), skipped: skippedSegments, migrated: migratedSegments, remaining: totalSegments - skippedSegments - migratedSegments, }, ); } } catch (e) { if (command.verbose) { console.error(e); } throw e; } }, { backoff: 'EXPONENTIAL' }, ); } if (command.verbose) { console.log(`remaining ${_.size(pendingSegments)} segments`); } } while (cursor !== '0'); }, { concurrency: Number.parseInt(command.parallelism, 10) }); if (!command.verbose) { multiBar.stop(); } await redisClient.quitAsync(); })();