// @ts-nocheck import * as commander from 'commander'; import * as AWS from 'aws-sdk'; import * as _ from 'lodash'; import * as bluebird from 'bluebird'; import * as cliProgress from 'cli-progress'; import { retry } from 'ts-retry-promise'; import { config } from '../config'; import { SegmentLoader } from '../lib/segment-loader'; import { SegmentType } from '../lib/playlist-builder'; (async () => { const command = new commander.Command(); command .option('-k --key-prefix ', 'Key prefix for the redis cache', config.cachePrefix) .option('-r --dynamo-region ', 'Region for the Dynamo DB Table', config.cacheDynamo.region) .option('-t --dynamo-table ', 'Name for the Dynamo DB Table', config.cacheDynamo.tableName) .option('-l --dynamo-pagination-limit ', 'The page size for a Dynamo DB query', String(config.cacheDynamo.paginationLimit)) .option('-p, --parallelism ', 'Organization parallelism', '4') .option('-o --organizations ', 'The organizations you want to migrate, if you want all do not specify this option') .option('-v --verbose', 'Verbose logging', false) .parse(process.argv); const dynamoClient = new AWS.DynamoDB.DocumentClient({ region: command.dynamoRegion, maxRetries: 25, }); const dynamoPaginationLimit = Number.parseInt(command.dynamoPaginationLimit, 10); const multiBar = new cliProgress.MultiBar( { format: '[{bar}] {percentage}% | ETA: {eta}s | {org} | Migrated: {migrated}, Total: {total} |' }, cliProgress.Presets.shades_classic, ); if (command.verbose) { console.log('Started'); } const orgs: string[] = []; const orgsFilter = _.compact(_.split(command.organizations, ',')); const storageInfoQueryInput: AWS.DynamoDB.DocumentClient.QueryInput = { TableName: command.dynamoTable, Limit: dynamoPaginationLimit, KeyConditionExpression: 'pk = :pk', ExpressionAttributeValues: { ':pk': 'STORAGE_INFO' }, }; do { const result = await dynamoClient.query(storageInfoQueryInput).promise(); storageInfoQueryInput.ExclusiveStartKey = result.LastEvaluatedKey; orgs.push( ..._.filter( _.map( result.Items, (item) => item.sk, ), (org) => _.isEmpty(orgsFilter) || _.includes(orgsFilter, org), ), ); } while (storageInfoQueryInput.ExclusiveStartKey != null); await bluebird.map(orgs, async (org) => { if (command.verbose) { console.log(`- Migrating Organization ${org}`); } let totalSegments = 0; let migratedSegments = 0; const pending = []; let bar; if (!command.verbose) { bar = multiBar.create( totalSegments, 0, { org, migrated: migratedSegments, }, ); } const segmentsQueryInput: AWS.DynamoDB.DocumentClient.QueryInput = { TableName: command.dynamoTable, Limit: dynamoPaginationLimit, KeyConditionExpression: 'pk = :pk', ExpressionAttributeValues: { ':pk': `SEGMENT#${org}` }, }; let hasEnded = false; do { const result = await dynamoClient.query(segmentsQueryInput).promise(); if (result.Count === 0) { hasEnded = true; } totalSegments += result.Count; if (!command.verbose) { bar.setTotal(totalSegments); bar.update({ org, migrated: migratedSegments, }); } if (_.size(result.Items) > 0) { for (const item of result.Items) { if (command.verbose) { console.log(`Segment ${item.sk}`); } const segment = { ...item.data, id: item.sk, orgId: _.last(_.split(item.pk, '#')), }; if (segment.type === SegmentType.Audio && _.isEmpty(segment.fileId) && _.size(segment.inputs) > 0) { const oldSegmentId = segment.id; segment.fileId = SegmentLoader.generateCustomAudioSegmentFileId(segment); segment.id = SegmentLoader.generateSegmentId(segment); if (command.verbose) { console.log(`Custom Audio Segment Detected, Generated FileId: ${segment.fileId}, Old Id: ${oldSegmentId}, New Id: ${segment.id}`); } } pending.push( ...[ { PutRequest: { Item: { pk: `SEGMENT#${segment.orgId}#${segment.fileId}`, sk: segment.id, data: _.omit( segment, [ 'id', 'orgId', 'fileId', ], ), }, }, }, { DeleteRequest: { Key: { pk: item.pk, sk: item.sk, }, }, }, ], ); } } while (_.size(pending) >= 1) { const nrPending = _.size(pending); const chunk = pending.splice(0, 25); if (command.verbose) { console.log(`processing ${_.size(chunk)} operations of ${nrPending}`); } const size = _.size( _.filter( chunk, (op) => _.has(op, 'PutRequest'), ), ); await retry( async () => { try { await dynamoClient.batchWrite({ RequestItems: { [command.dynamoTable]: chunk, }, }).promise(); migratedSegments += size; if (!command.verbose) { bar.increment( size, { org, migrated: migratedSegments, }, ); } } catch (e) { if (command.verbose) { console.error(e); } throw e; } }, { backoff: 'EXPONENTIAL' }, ); } } while (!hasEnded); if (command.verbose) { console.log(`- Finished Organization ${org}, Total Items: ${totalSegments}`); } }, { concurrency: Number.parseInt(command.parallelism, 10) }); if (!command.verbose) { multiBar.stop(); } })();