import md5 from 'md5'; import * as path from 'path'; import * as _ from 'lodash'; import bluebird from 'bluebird'; import * as AWS from 'aws-sdk'; // @ownzones import { Queue, Task } from '@ownzones/rrtq'; import { s3Lib as ownS3 } from '@ownzones/lib'; // app import * as express from 'express'; import { CacheManager } from './cache'; import { Honeycomb } from './tracing'; import { ConcatSegment, SegmentBuilderHelper } from './segments'; import { config, log } from '../config'; import { IFile, IUser, } from './connect-api'; import { IAudioSegment, IImageSegment, ISegment, PlaylistBuilder, SegmentType, ITranscodingTask, TranscodingSegment, ITrackPlaylists, ICustomAudioSegment, isCustomAudioSegment, isImageSegment, isAudioSegment, } from './playlist-builder'; import { AudioDemux } from './workers/audio-demux'; import { wrappedFs as fs } from '../utils/async-wrappers'; import { Nullable, TaskResponse, TaskResponses } from '../utils/types'; import { sanitizeJson } from '../utils/sanitize-json'; const { s3 } = ownS3; export interface ISegmentLoad { segments: TranscodingSegment[]; noMappingSegments: Nullable; segmentIndex: number; res: express.Response; req: express.Request; next: express.NextFunction; trace: Record; timeout: number; origin: string; fileId: string; sourceFile: IFile; orgSlug: string; accessToken: string; cacheBuster: boolean; } class InvalidCacheError extends Error { // ids of the segments which are cached in redis, but their TS files are missing from S3 public segments: ISegment[] = []; } export class SegmentLoader { public static SUBSEGMENTS_COUNT: number = config.subsegmentsCount; /** * Image segments cache * @param segment * @param responses * @param cacheBuster * @param user?? */ // ToDo Change method name. Refactor the instance method scheduleImageSubSegments() to use this. public static async scheduleImageSegmentsCache( segment: IImageSegment, responses: TaskResponses, cacheBuster: boolean, user?: IUser, ): Promise { // find subsegments for this segment const subsegments = SegmentLoader.generateSubsegments(segment, user, cacheBuster); const cachedSubsegments: Record = _.reduce( await CacheManager.getInstance().checkCacheBatch( _.map( subsegments, (subsegment) => ({ orgId: segment.orgId, fileId: segment.fileId as string, segmentId: subsegment.id as string, }), ), ), (result, cachedSegment) => ({ ...result, [cachedSegment.id as string]: cachedSegment.s3Path, }), {}, ); let segmentComplete = true; if (!responses) { segmentComplete = false; } else { for (const subsegment of subsegments) { const subsegmentId = subsegment.id as string; if ((responses[subsegmentId] && responses[subsegmentId].error) || !_.has(cachedSubsegments, subsegmentId)) { segmentComplete = false; break; } } } if (segmentComplete) { return { ...segment, id: SegmentLoader.generateSegmentId(segment), urls: _.values(cachedSubsegments), type: SegmentType.SubsegmentConcat, }; } log.warn(`Subsegments not successfully cached, skipping final segment cache: fileId:${segment.fileId as string} entryPoint:${segment.entryPoint as number} duration:${segment.duration} `); return null; } public static generateSubsegments(segment: IImageSegment, user?: IUser, cacheBuster?: boolean): IImageSegment[] { const subsegments: IImageSegment[] = []; const editUnitsPerSubsegment = Math.ceil(segment.editUnits / SegmentLoader.SUBSEGMENTS_COUNT); let usedEditUnits = 0; if (cacheBuster) { segment.cacheBuster = Math.random(); } for (let index = 0; index < SegmentLoader.SUBSEGMENTS_COUNT && usedEditUnits < segment.editUnits; index += 1) { const editUnits = Math.min(segment.editUnits - usedEditUnits, editUnitsPerSubsegment); const subsegment = { ...segment, editUnits, user, duration: editUnits / segment.editRate, entryPoint: (segment.entryPoint as number) + usedEditUnits, startTime: (segment.startTime as number) + usedEditUnits / segment.editRate, }; // add the unique segment id subsegment.id = SegmentLoader.generateSegmentId(subsegment); if (index > 0 && subsegment.duration < PlaylistBuilder.MERGE_SMALL_SEGMENTS_THRESHOLD) { subsegments[subsegments.length - 1].duration += subsegment.duration; subsegments[subsegments.length - 1].editUnits += subsegment.editUnits; } else { subsegments.push(subsegment); } usedEditUnits += editUnits; } return subsegments; } // Generates a unique id for a given segment. It ignores some fields like: honeycomb ids, total start time, id ... public static generateSegmentId(segment: ISegment): string { const removedKeys = Honeycomb.sortKeys(segment, { honeycombTraceId: true, honeycombParentId: true, totalStartTime: true, id: true, user: true, }); const serialization = JSON.stringify(removedKeys); return md5(serialization); } public static generateCustomAudioSegmentFileId(segment: ICustomAudioSegment): string | undefined { return _.first( _.uniq( _.flatMap( segment.inputs, (input) => _.map( _.values(input.streams), (stream) => stream.fileId, ), ), ), ); } private static async handleInvalidCacheError(req: express.Request, res: express.Response, segments: ISegment[]): Promise { const rmCachePromises = []; for (const segment of segments) { rmCachePromises.push(CacheManager.getInstance().removeCache(segment.orgId, segment.fileId as string, segment.id as string)); } await bluebird.all(rmCachePromises); // TODO: limit the number of retries return res.redirect(req.url); } public constructor(public videoQueue: Queue, public audioQueue: Queue) {} /** * Schedules a new task for computing a new segment and return its content * The timeout is changed to be 600 seconds for image sequences */ // eslint-disable-next-line consistent-return public async load( { segments, segmentIndex, res, req, next, trace, timeout = 21000, fileId, sourceFile, orgSlug, cacheBuster, noMappingSegments, }: ISegmentLoad, ): Promise { const segment = segments[segmentIndex]; const segmentLoadSpan = Honeycomb.startSpan( path.basename(__filename), 'segmentLoad', { fileId, orgSlug }, ); try { // --> ToDo Move this to a separate method segment.honeycombTraceId = segmentLoadSpan['trace.trace_id']; segment.honeycombParentId = segmentLoadSpan['trace.span_id']; if (cacheBuster) { segment.cacheBuster = Math.random(); } if (isCustomAudioSegment(segment) && _.isEmpty(segment.fileId)) { segment.fileId = SegmentLoader.generateCustomAudioSegmentFileId(segment); } segment.id = SegmentLoader.generateSegmentId(segment); segment.user = _.pick(res.locals.user, 'email'); // <-- let segmentContent: Buffer; if (isImageSegment(segment)) { try { const usedTimeout = segment.firstFrameIndex != null ? 600 * 1000 : timeout; segmentContent = await this.loadImageSegment(segment, sourceFile, usedTimeout, res.locals.user, cacheBuster); } catch (error) { if (error instanceof InvalidCacheError) { return await SegmentLoader.handleInvalidCacheError(req, res, error.segments); } throw error; } res.set('X-Honeycomb-Id', trace['trace.trace_id']); res.set('X-Type', 'video'); return res.send(segmentContent); } if (isCustomAudioSegment(segment)) { const def = []; for (const input of segment.inputs) { const streamsIndexes = Object.keys(input.streams).map((strIdx) => Number.parseInt(strIdx, 10)); for (const streamIndex of streamsIndexes) { const stream = input.streams[streamIndex]; for (let i = 0; i < stream.channelIndices.length; i += 1) { def.push({ channel: stream.channelIndices[i], label: stream.channelLabels[i], fileId: stream.fileId, trackIndex: streamIndex, duration: segment.duration, totalStartTime: segment.totalStartTime, }); } } } if (def && def.length) { const lastSegment = (segmentIndex === segments.length - 1) ? segments[segmentIndex - 1] : undefined; const seg = await AudioDemux.muxChannelsToAac(segment.cacheLocation, def, segmentIndex, lastSegment); if (seg) { log.debug(`Audio Segment: ${segmentIndex} loaded from PCM file.`); res.set('X-Type', 'audio'); return res.send(seg); } const mappingSegments = noMappingSegments // ToDo When would this not be set? ? _.reduce(noMappingSegments, (result: { virtualTrackId: string, segment: IAudioSegment }[], value, key) => [ ...result, { virtualTrackId: key, segment: value.segments[segmentIndex] as IAudioSegment }, ], []) : null; if (mappingSegments && mappingSegments.length) { const cacheResults: { cache: IAudioSegment }[] = []; await bluebird.all(mappingSegments.map(async (mapSeg) => { const cachedResponse = await CacheManager.getInstance().checkCache( mapSeg.segment.orgId, mapSeg.segment.fileId as string, SegmentLoader.generateSegmentId(mapSeg.segment), ) as IAudioSegment; if (cachedResponse) { cacheResults.push({ cache: cachedResponse, }); } })); if (cacheResults && cacheResults.length) { // check if we have all the cache segments for that specific definition const channelDef: { [key: number]: number[] } = {}; for (const customChannel of def) { if (!channelDef[customChannel.trackIndex]) { channelDef[customChannel.trackIndex] = []; } channelDef[customChannel.trackIndex].push(customChannel.channel); } const channelCache: { [key: number]: number[] } = {}; for (const cacheResult of cacheResults) { if (!channelCache[cacheResult.cache.streamIndex]) { channelCache[cacheResult.cache.streamIndex] = []; } channelCache[cacheResult.cache.streamIndex].push(...new Array(cacheResult.cache.channels).keys()); } let canContinue = true; for (const trackIdx of Object.keys(channelDef).map((ch) => Number.parseInt(ch, 10))) { if (!Object.prototype.hasOwnProperty.call(channelCache, trackIdx)) { canContinue = false; break; } for (const chIdx of channelDef[trackIdx]) { if (!channelCache[trackIdx].includes(chIdx)) { canContinue = false; break; } } } if (canContinue) { const muxedSegment = await AudioDemux.muxCachedAAcToAac(cacheResults, def); if (muxedSegment) { log.debug(`Audio Segment: ${segmentIndex} loaded from cached file.`); res.set('X-Type', 'audio'); return res.send(muxedSegment); } } } } } } // generate the audio segment if (isCustomAudioSegment(segment) || isAudioSegment(segment)) { const preWarmPromises = []; for (let i = segmentIndex + 1; i < Math.min(segments.length, segmentIndex + 3); i += 1) { preWarmPromises.push(Honeycomb.startAsyncSpan(path.basename(__filename), 'preWarmAudioSegment', async (span) => { const preSegment = segments[i] as ICustomAudioSegment | IAudioSegment; preSegment.honeycombTraceId = span['trace.trace_id']; preSegment.honeycombParentId = span['trace.span_id']; if (isCustomAudioSegment(preSegment) && _.isEmpty(preSegment.fileId)) { preSegment.fileId = SegmentLoader.generateCustomAudioSegmentFileId(preSegment); } preSegment.id = SegmentLoader.generateSegmentId(preSegment); preSegment.user = res.locals.user as IUser; await this.loadAudioSegment(preSegment, preSegment.id, sourceFile, timeout, false, cacheBuster); Honeycomb.endSpan(span); })); } await bluebird.all(preWarmPromises); const processedSegment = await this.loadAudioSegment( segment, segment.id, sourceFile, timeout, true, cacheBuster, ) as IAudioSegment; const s3Path = processedSegment.s3Path as string; res.set('X-Honeycomb-Id', trace['trace.trace_id']); res.set('X-Type', 'audio'); try { let content; if (segment.totalStartTime !== processedSegment.totalStartTime) { const s3signedPath = await ownS3.signUrl(s3Path); const segmentWithOffsetPath = await SegmentBuilderHelper.ffmpegSetOffset( s3signedPath, segment.totalStartTime + 1, false, ); // @ts-ignore content = await fs.readFileAsync(segmentWithOffsetPath); } else { const { bucket, key } = ownS3.parseUrl(s3Path); const request = s3.getObject({ Bucket: bucket, Key: key }); content = (await request.promise()).Body; } res.send(content); } catch (err) { if ((err as Error).message.toString().includes('404 Not Found') || (err as Error).name === 'NoSuchKey') { return await SegmentLoader.handleInvalidCacheError(req, res, [segment]); } log.error((err as Error).message); return next(err); } } } finally { Honeycomb.endSpan(segmentLoadSpan); } } public async scheduleImageSubSegments( segment: IImageSegment, sourceFile: IFile, user?: IUser, timeout?: number, cacheBuster?: boolean, ):Promise<{subsegments: IImageSegment[], tasks: Record, cachedSubsegments: Record}> { // split the segment and create smaller tasks const subsegments = SegmentLoader.generateSubsegments(segment, user, cacheBuster); const cachedSubsegments: Record = _.reduce( await CacheManager.getInstance().checkCacheBatch( _.map( subsegments, (subsegment) => ({ orgId: segment.orgId, fileId: segment.fileId as string, segmentId: subsegment.id as string, }), ), ), (result, cachedSegment) => ({ ...result, [cachedSegment.id as string]: { s3Path: cachedSegment.s3Path }, }), {}, ); log.debug(sanitizeJson(subsegments)); log.debug(`SEG COUNT ${SegmentLoader.SUBSEGMENTS_COUNT}`); log.debug(sanitizeJson(segment)); const taskCreatePromises = []; for (const subsegment of subsegments) { if (cacheBuster || !_.has(cachedSubsegments, subsegment.id as string)) { const transcodingTask: ITranscodingTask = { sourceFile, segment: subsegment, }; taskCreatePromises.push(this.videoQueue.addTask(transcodingTask, subsegment.id, timeout)); } } const tasks: { [key: string]: Task } = {}; (await bluebird.all(taskCreatePromises)) .forEach((task, idx) => { tasks[subsegments[idx].id as string] = task; }); return { subsegments, tasks, cachedSubsegments }; } public async loadAudioSegment( segment: IAudioSegment | ICustomAudioSegment, segmentId: string, sourceFile: IFile, timeout?: number, wait = true, cacheBuster?: boolean, ): Promise { const cachedResponse = await CacheManager.getInstance().checkCache(segment.orgId, segment.fileId as string, segmentId); if (cachedResponse && !cacheBuster) { return cachedResponse as IAudioSegment; } // add a new task if the segment didn't exist const transcodingTask: ITranscodingTask = { segment, sourceFile, }; const task = await this.audioQueue.addTask(transcodingTask, segmentId, timeout); if (wait) { return Honeycomb.startAsyncSpan(path.basename(__filename), 'waitingForSegment', async (span) => { // wait until the segment is computed and return it const response = await task.waitForResponse(timeout, 500) as TaskResponse; if (response === null) { Honeycomb.addCustomContext({ error: 'timeout' }); Honeycomb.endSpan(span); throw new Error('Segment builder timeout'); } if (response.error) { Honeycomb.addCustomContext({ error: response.errorObject.message }); Honeycomb.endSpan(span); throw response.errorObject; } Honeycomb.endSpan(span); return response.result; }); } return null; } private async loadImageSegment( segment: IImageSegment, sourceFile: IFile, timeout: number, user: IUser, cacheBuster?: boolean, ): Promise { const cachedSegment = await CacheManager.getInstance().checkCache(segment.orgId, segment.fileId as string, segment.id as string); if (cachedSegment && !cacheBuster) { try { log.debug(`Big segment cache found, segId: ${(segment.id as string)}`); if (segment.totalStartTime !== cachedSegment.totalStartTime) { log.debug(`Re-offsetting segment ${cachedSegment.totalStartTime} -> ${segment.totalStartTime}`); return await ConcatSegment.offsetSegmentStartTime(cachedSegment.s3Path as string, segment.totalStartTime); } const s3data = (await ownS3.getObjectByUrl(cachedSegment.s3Path as string)) as AWS.S3.Types.GetObjectOutput; return s3data.Body as Buffer; } catch (err) { const error = err as Error; if (error.message.includes('404 Not Found') || error.name === 'NoSuchKey') { log.error(`Invalid cache error: ${(segment.id as string)}`); const invalidCacheError = new InvalidCacheError(error.message); invalidCacheError.segments = [segment]; log.error(sanitizeJson(invalidCacheError.stack)); throw invalidCacheError; } throw error; } } const startTime = Date.now(); const { subsegments, tasks, cachedSubsegments } = await this.scheduleImageSubSegments(segment, sourceFile, user, timeout, cacheBuster); const urls = Object.values(cachedSubsegments).map((cachedSubsegment) => cachedSubsegment.s3Path); // wait for tasks const waitingTaskIds: string[] = []; const taskWaitPromises: Promise>[] = []; for (const subsegment of subsegments) { const task = tasks[subsegment.id as string]; if (task) { const taskTimeout = timeout - (Date.now() - startTime); waitingTaskIds.push(task.id); taskWaitPromises.push(task.waitForResponse(taskTimeout)); } } (await bluebird.all(taskWaitPromises)).forEach((taskResponse, idx) => { const taskId = waitingTaskIds[idx]; if (!taskResponse) { throw new Error(`Task ${taskId} response is missing`); } if (taskResponse.error) { throw taskResponse.errorObject; } urls.push(taskResponse.result.s3Path); }); // concat tasks' responses and set the offset let finalSegmentPath: string; const segmentOffsetSpan = Honeycomb.startSpan(path.basename(__filename), 'ffmpegSegmentOffset'); try { finalSegmentPath = await ConcatSegment.concatSegments(urls, segment.totalStartTime); const readSegmentFromDisk = Honeycomb.startSpan(path.basename(__filename), 'readSegmentFromDisk'); try { // @ts-ignore const content = await fs.readFileAsync(finalSegmentPath); const s3Path = `${segment.cacheLocation}${segment.id as string}.ts`; await ownS3.putObjectAtUrl(s3Path, content); const cacheSegment = { ...segment, s3Path, size: content.length, }; await CacheManager.getInstance().saveCache(segment.orgId, segment.id as string, cacheSegment); return content; } finally { Honeycomb.endSpan(readSegmentFromDisk); } await fs.unlinkAsync(finalSegmentPath); } catch (err) { const error = err as Error; if (error.message.includes('404 Not Found') || error.name === 'NoSuchKey') { const invalidCacheError = new InvalidCacheError(error.message); invalidCacheError.segments = [...subsegments]; log.error(sanitizeJson(invalidCacheError.stack)); throw invalidCacheError; } throw error; } finally { Honeycomb.endSpan(segmentOffsetSpan); } } }