// npm import 'reflect-metadata'; import * as _ from 'lodash'; import bluebird from 'bluebird'; import blocked from 'blocked'; // @ownzones import { common } from '@ownzones/lib'; import { cloudWatchMetrics } from '@ownzones/ts-log'; import { Consumer, Queue, Rrtq, Task, } from '@ownzones/rrtq'; // Sentry import * as Sentry from '@sentry/node'; // app import { config, ConfigType, log } from './config'; import { CacheManager } from './lib/cache'; import { IJob, IUser, signalMediaviewPrecacheTask } from './lib/connect-api'; import { Honeycomb } from './lib/tracing'; import { SegmentLoader } from './lib/segment-loader'; import { IAudioSegment, IImageSegment, IPlaylist, IPlaylistBuildInput, isAudioSegment, isCustomAudioSegment, isImageSegment, ITranscodingTask, PlaylistBuilder, SegmentType, TranscodingSegment, } from './lib/playlist-builder'; import { AudioDemux, IAudioDemuxTask } from './lib/workers/audio-demux'; import { sentryInit } from './server'; import { TaskResponses } from './utils/types'; import { sanitizeJson } from './utils/sanitize-json'; export class Driver { public static async create(configuration: ConfigType): Promise { const rrtq = new Rrtq({ redis: configuration.redis }); const audioQueue = await rrtq.createQueue( 'mediaview', { ...configuration.rrtq, namespace: configuration.rrtq.audioNamespace }, ); const driver = new Driver(configuration, rrtq, audioQueue); driver.subscribeConsumer(); return driver; } public rrtq: Rrtq; public consumer: Consumer; public audioQueue: Queue; private constructor(configuration: ConfigType, rrtq: Rrtq, audioQueue: Queue) { this.rrtq = rrtq; this.audioQueue = audioQueue; this.consumer = this.rrtq.getConsumerForNamespace( configuration.rrtq.driverNamespace, this.handler.bind(this), { consumerName: configuration.podName, cleanupQueuesIntervalTimeout: configuration.rrtq.cleanupQueuesIntervalTimeout, }, ); } public async stop(): Promise { await this.consumer.stop(); await this.rrtq.stop(); } private subscribeConsumer(): void { this.consumer.subscribe(); } private async handler(task: Task) { let progressReporter: NodeJS.Timeout | null = null; let preCacheQueue: Queue | null = null; let channelExtractQueue: Queue | null = null; let segmentCacheQueue: Queue | null = null; const job = task.payload as IJob; const { fileId, user, compositionDefinition, organization, } = job; // If true the whole task will fail if a segment fails const ensureCacheIntegrity = _.get(organization, 'organizationOptions.ensureCacheIntegrity.enabled', false) as boolean; try { preCacheQueue = await this.rrtq.createQueue( `pre-${task.id}`, { ...config.rrtq, ...config.preCacheRrtq, namespace: config.rrtq.videoNamespacePreCache, uniqueTasks: true, }, ); segmentCacheQueue = await this.rrtq.createQueue( `segment-${task.id}`, { ...config.rrtq, ...config.preCacheRrtq, namespace: config.rrtq.videoNamespacePreCache, uniqueTasks: true, }, ); const cacheLocation = common.ensureTrailingSlash(organization.mediaViewCacheFileLocator.url); const segmentLoader = new SegmentLoader(preCacheQueue, preCacheQueue); const trace = Honeycomb.start({ name: 'serverSegment' }); const options: IPlaylistBuildInput = { fileId, user, cacheLocation, orgSlug: organization.slug, orgId: organization.id, definition: compositionDefinition, }; const { segments, sourceFile } = (await PlaylistBuilder.build({ ...options, preCache: job.cacheAllAudio ? 'all' : 'just_one_audio', })).playlist as IPlaylist & { segments: TranscodingSegment[] }; const hasUnknownChannelLayout = segments .some((segment) => isAudioSegment(segment) && segment.channels > 8); // The extract channels task is used if there is an explicit true parameter on the job or // if there is at least one segment from an audio track with an unknown channel layout (number of channels > 8). // If the extract channels cache is already generated don't generate it again. if ((job.extractChannels || (!job.fileIsCpl && hasUnknownChannelLayout)) && await AudioDemux.needsChannelExtract(compositionDefinition, cacheLocation)) { channelExtractQueue = await this.rrtq.createQueue( `extract-channels-${task.id}`, { ...config.rrtq, ...config.preCacheRrtq, namespace: config.rrtq.audioNamespaceChannelExtract, taskTimeout: config.worker.extractChannels.timeout, }, ); // We reuse the channel extract task for a driver task if the channel extract task // is in progress or it has finished successfully let addTask: boolean; if ((await channelExtractQueue.getInProgressTasksCount()) + (await channelExtractQueue.getPendingTasksCount()) > 0) { addTask = false; } else { const taskResponses = (await channelExtractQueue.getResponses()) as TaskResponses; addTask = Object.values(taskResponses).every((response: { error?: any }) => response.error); } if (addTask) { const audioDemuxTask: IAudioDemuxTask = { fileId, user, compositionDefinition, cacheLocation, type: SegmentType.ExtractAudioChannels, orgId: organization.id, orgSlug: organization.slug, }; await channelExtractQueue.addTask(audioDemuxTask); } } log.debug(`[DRIVER] ${task.id} --> Schedule`); await bluebird.map(segments, async (segment) => { // ToDo Guard seems useless. Investigate. if (segment) { const transcodeSegment = _.cloneDeep(segment); const segmentPrefetchSpan = Honeycomb.startSpan('server', 'segmentPrefetch'); transcodeSegment.honeycombTraceId = segmentPrefetchSpan['trace.trace_id']; transcodeSegment.honeycombParentId = segmentPrefetchSpan['trace.span_id']; transcodeSegment.id = SegmentLoader.generateSegmentId(segment); if (isImageSegment(transcodeSegment)) { await segmentLoader.scheduleImageSubSegments(transcodeSegment, sourceFile, user); } else if ((isAudioSegment(transcodeSegment) && transcodeSegment.channels <= 8) || isCustomAudioSegment(transcodeSegment)) { await segmentLoader.loadAudioSegment( transcodeSegment, transcodeSegment.id, sourceFile, undefined, false, false, ); } Honeycomb.endSpan(segmentPrefetchSpan); } }, { concurrency: config.cacheCheckConcurrency }); log.debug(`[DRIVER] ${task.id} <-- Schedule`); let precacheQueueDone = false; let segmentQueueDone = false; progressReporter = setInterval(() => { void (async () => { const precacheProgress = precacheQueueDone ? 100 : (await preCacheQueue?.getProgress() || 0); const segmentProgress = segmentQueueDone ? 100 : (await segmentCacheQueue?.getProgress() || 0); const progress = Math.min((precacheProgress * 2) / 3 + segmentProgress / 3, 99); await task.reportProgress(progress); })(); }, 5000); if (job.task) { await signalMediaviewPrecacheTask(job.task.id, job.task.workflowId, user as IUser, organization.slug); } await preCacheQueue.waitTasks(); const responses = await preCacheQueue.getResponses() as TaskResponses; let segmentFailed = Object.values(responses).some((response) => response.error); if (channelExtractQueue) { await channelExtractQueue.waitTasks(); } precacheQueueDone = true; // start Big segments cache const imageSegments = segments.filter((segment) => isImageSegment(segment)); const bigImageSegments = (await bluebird.map(imageSegments, (segment) => SegmentLoader .scheduleImageSegmentsCache(segment as IImageSegment, responses, false, user))) .filter((segment) => segment != null) as IImageSegment[]; const cachedImageSegments = await CacheManager.getInstance().checkCacheBatch(bigImageSegments.map((segment) => ({ orgId: segment.orgId, fileId: segment.fileId as string, segmentId: segment.id as string, }))) ?? []; const cachedImageSegmentsSet = new Set(cachedImageSegments.map((segment) => segment.id)); await bluebird.map(bigImageSegments, async (segment) => { if (!cachedImageSegmentsSet.has(segment.id)) { const transcodingTask: ITranscodingTask = { segment, sourceFile, }; await segmentCacheQueue?.addTask(transcodingTask, segment.id); } }, { concurrency: 4 }); await segmentCacheQueue.waitTasks(); segmentQueueDone = true; const imageCacheResponses = await segmentCacheQueue.getResponses() as TaskResponses; log.debug(sanitizeJson(imageCacheResponses)); segmentFailed = segmentFailed || Object.values(imageCacheResponses).some((response) => response.error); // end big segments cache // We only want to hit billing for the segments that we actually transcoded log.debug(`[DRIVER] ${task.id} --> Cache results - ${task.id}`); const eventsCache: Record = {}; await bluebird.map(Object.keys(responses), async (segmentId) => { const resp = responses[segmentId]; if (!resp.error) { const result = resp.result; await CacheManager.getInstance().wasBilled(organization.slug, result.fileId as string, segmentId); if (!Object.prototype.hasOwnProperty.call(eventsCache, result.type)) { eventsCache[result.type] = 0; } eventsCache[result.type] += result.duration; } }, { concurrency: config.cacheCheckConcurrency }); log.debug(`[DRIVER] ${task.id} <-- Cache results`); // Send events - 1 event / segment type log.debug(`[DRIVER] ${task.id} <-- Send results`); if (ensureCacheIntegrity && segmentFailed) { throw new Error('A segment has failed to be decoded.'); } Honeycomb.finishTrace(trace); } catch (e) { preCacheQueue = null; Sentry.captureException(e); throw e; } finally { if (progressReporter) { clearInterval(progressReporter); } if (preCacheQueue) { await preCacheQueue.deleteQueue(); } if (channelExtractQueue) { await channelExtractQueue.deleteQueue(); } if (segmentCacheQueue) { await segmentCacheQueue.deleteQueue(); } } } } if (require.main === module) { void (async () => { Honeycomb.initialize(); sentryInit(); const driver = await Driver.create(config); // Capture event loop latency let eventLoopTimer: NodeJS.Timeout; if (config.monitoring.eventLoopLatency.enabled) { eventLoopTimer = blocked((ms: number) => log.infoMetrics( 'CaptureEventLoopLag', { Environment: config.envName, Service: config.app, Type: config.serviceName || 'default', Latency: ms, }, [cloudWatchMetrics.eventLoopLatency], ), { threshold: config.monitoring.eventLoopLatency.threshold, interval: config.monitoring.eventLoopLatency.interval, }); } // Graceful stop process.on('SIGTERM', () => { void driver.stop(); // Clear interval if (eventLoopTimer) { clearInterval(eventLoopTimer); } }); })(); }