import * as commander from 'commander'; import * as fs from 'fs'; import { S3 } from 'aws-sdk'; import { s3Lib } from '@ownzones/lib'; import { config, log } from '../config'; import {IAudioSegment, IImageSegment, PlaylistBuilder, TranscodingSegment} from '../lib/playlist-builder'; import { SegmentLoader } from '../lib/segment-loader'; import { getCompositionDefinition } from '../lib/connect-api'; import { Honeycomb } from '../lib/tracing'; import { CacheManager } from '../lib/cache'; import { sanitizeJson } from '../utils/sanitize-json'; let fileIds: string[] = []; const missingCacheFileIds: string[] = []; let options = { } as any; const s3Client = new S3({ signatureVersion: 'v4', }); async function checkCacheForFile(workerIndex: number, command: commander.Command): Promise { while (fileIds.length) { const fileId = fileIds.pop() as string; log.info(`worker ${workerIndex} - Starting scan for file id ${fileId} Queue: ${fileIds.length}`); const response = await getCompositionDefinition(fileId, options.orgSlug); if (!response || !response.compositionDefinition) { log.error(`worker ${workerIndex} - Failed to get composition definition for file ${fileId}`); continue; } else { log.info(`worker ${workerIndex} - Fetched definition for file ${fileId}`); } // Playlist const playlist = PlaylistBuilder.buildPlaylist( response.compositionDefinition, fileId, 7, response.org.cacheLocation, options.orgSlug, undefined, undefined, undefined, true ); // Video if (!command.skipVideoChecks) { const segments: any[] = []; const videoSegments = (playlist.segments as TranscodingSegment[]).filter(s => s.type === 'image') as IImageSegment[]; for (const videoSegment of videoSegments) { segments.push(videoSegment); const subSegments = SegmentLoader.generateSubsegments(videoSegment, undefined, false); for (const subSegment of subSegments) { segments.push(subSegment); } } let missingSegmentsDynamo = 0; let missingS3Segments = 0; for (const segment of segments) { const cacheResult = await CacheManager.getInstance().checkCache( options.orgSlug, segment.fileId, segment.id || SegmentLoader.generateSegmentId(segment) ); if (!cacheResult) { if (missingSegmentsDynamo === 0) { missingCacheFileIds.push(fileId); // fs.appendFileSync(options.outputFile, `${fileId}\n`, 'utf-8'); log.info(`worker ${workerIndex} - missing video segments for file ${fileId}`); log.info(`${sanitizeJson(segment)} `); } missingSegmentsDynamo += 1; break; } const { bucket, key } = s3Lib.parseUrl(cacheResult.s3Path as string); try { await s3Client.headObject({ Bucket: bucket, Key: key, }).promise(); } catch { if (missingS3Segments === 0) { missingCacheFileIds.push(fileId); // fs.appendFileSync(options.outputFile, `${fileId}\n`, 'utf-8'); log.info(`worker ${workerIndex} - missing s3 cache for file ${fileId}`); } missingS3Segments += 1; break; } } log.info(`PZB: File ${fileId} has ${missingSegmentsDynamo} dynamo and ${missingS3Segments} S3 out of ${segments.length} segments.`); } // Audio if (!command.skipAudioChecks) { const audioSegments = (playlist.segments as TranscodingSegment[]).filter(s => s.type === 'audio') as IAudioSegment[]; for (const segment of audioSegments) { const cacheResult = await CacheManager.getInstance().checkCache( segment.orgId, segment.fileId as string, segment.id || SegmentLoader.generateSegmentId(segment) ); if (!cacheResult) { missingCacheFileIds.push(fileId); // fs.appendFileSync(options.outputFile, `${fileId}\n`, 'utf-8'); log.info(`worker ${workerIndex} - missing audio segments for file ${fileId}`); break; } } } } } async function run(): Promise { Honeycomb.initialize(); const command = new commander.Command(); command .option('-h, --host ', 'Redis host to work with.') .option('-p, --port ', 'The port on which to connect.') .option('-d, --db ', 'The Redis db used.') .option('-o, --org-slug ', 'The organization slug.') .option('-f, --files-file ', 'A file containing the files that need to be removed one per line.') .option('-of, --output-file ', 'A file containing the files that have missing cache segments.') .option('-w, --workers ', 'The number of parallel workers.') .option('--skip-audio-checks', 'Skip audio checks', false) .option('--skip-video-checks', 'Skip video checks', false) .parse(process.argv); const workers = parseInt(command.workers, 10) || 1; if (!command.orgSlug) { log.info('Please specify an organization slug.'); return; } if (!command.filesFile) { log.info('Please specify a file with the file ids you want to check the cache for.'); return; } if (!command.outputFile) { log.info('Please specify an output file name.'); return; } const filesFilePath = `${__dirname}/${command.filesFile}`; if (!fs.existsSync(filesFilePath)) { log.info(`The file '${filesFilePath}' does not exist.`); return; } options = { host: command.host || config.cacheRedis.host, port: command.port || config.cacheRedis.port, db: command.db || config.cacheRedis.database, orgSlug: command.orgSlug, outputFile: command.outputFile, } as any; log.info(`Running with config: ${JSON.stringify(options)}`); // // Clear output file // fs.writeFileSync(options.outputFile, '', 'utf-8'); fileIds = fs.readFileSync(filesFilePath, 'utf-8').split('\n').map(x => x.trim()).filter(x => !!x); log.info(`Scanning cache for ${fileIds.length} files and with ${workers} workers.`); const promises: Promise[] = []; for (let i = 0; i < workers; i += 1) { promises.push(checkCacheForFile(i, command)); } await Promise.all(promises); // Clear output file fs.writeFileSync(options.outputFile, '', 'utf-8'); if (missingCacheFileIds.length) { for (const fileId of missingCacheFileIds) { fs.appendFileSync(options.outputFile, `${fileId}\n`, 'utf-8'); } log.info(`Work done. Check file ${options.outputFile} for results.`); } else { log.info('All is well. Odd...'); } } (async () => await run())();