/** * @module worker.ts * @description Implements a class for simple media view workers */ // npm import * as path from 'path'; import assert from 'assert'; import * as Sentry from '@sentry/node'; // @ownzones import { s3 } from '@ownzones/lib'; import { Task } from '@ownzones/rrtq'; // app import { ConfigType, log } from '../../config'; import { Profiler } from '../profiler'; import { AudioSegment, ConcatSegment, helper, IAudioSegmentResult, TimedTextSegment, VideoSegment, } from '../segments'; import { CacheManager } from '../cache'; import { Honeycomb } from '../tracing'; import { isAudioSegment, isCustomAudioSegment, isImageSegment, ITranscodingTask, SegmentType, } from '../playlist-builder'; import { validateOrganizationSlug } from '../../utils/iam-service'; import { isInTuple } from '../../utils/types'; import { sanitizeJson } from '../../utils/sanitize-json'; import { BaseWorker } from './base-worker'; Honeycomb.initialize(); /** * Class implementation of a worker which is responsible with transcoding a part of a MXF to a MPEG segment */ export class Worker extends BaseWorker { //---------------------------------------------------- // #region - class methods //---------------------------------------------------- /** * Create a Worker instance and subscribes it to Rrtq. * @param configuration - the worker configuration * @param namespace - the associated Rrtq namespace * @param maxAllowedTasks - the max number of tasks allowed for the work to run */ public constructor(configuration: ConfigType, namespace: string, maxAllowedTasks?: number) { super(configuration, namespace, maxAllowedTasks); } /** * Worker function, handles the supplied task * @param queueTask - the task to be handled, should have a payload of type ITranscodingTask */ /* eslint-disable class-methods-use-this */ protected async workerFunc(queueTask: Task): Promise { const { segment } = queueTask.payload as ITranscodingTask; const segmentType = segment.type; if (!isInTuple(segmentType, [SegmentType.Image, SegmentType.Audio, SegmentType.SubsegmentConcat, SegmentType.TimedText])) { throw new Error(`Wrong segment type ${segment.type} provided`); } const { organizationId, organizationSlug } = await validateOrganizationSlug(segment.orgId); if (organizationId === null || organizationSlug === null) { throw new Error(`Invalid organization id: ${segment.orgId}`); } let responseMessage = null; try { const t0 = new Date(); Profiler.tag(segment.id as string, 'start'); // ToDo Looks like we don't need this // fetch segment's details from redis Profiler.tag(segment.id as string, 'fetchedRedis'); log.debug(`Execute new task - segmentId ${queueTask.id} `, sanitizeJson({ ...segment, content: null })); let segmentBuilder; if (segment.type === SegmentType.SubsegmentConcat) { segmentBuilder = new ConcatSegment(segment); } else if (isImageSegment(segment)) { segmentBuilder = new VideoSegment(segment); } else if (isAudioSegment(segment) || isCustomAudioSegment(segment)) { segmentBuilder = new AudioSegment(segment); } else { segmentBuilder = new TimedTextSegment(segment); } const trace = Honeycomb.start({ name: segment.type }, segment.honeycombTraceId, segment.honeycombParentId); // tracing const spanTranscode = Honeycomb.startSpan(path.basename(__filename), 'transcode', { 'segment.type': segment.type, 'queueTask.id': queueTask.id, }); Honeycomb.addCustomContext({ 'segment.duration': segment.duration, 'segment.startTime': segment.startTime, }); if (isImageSegment(segment)) { Honeycomb.addCustomContext({ 'segment.width': segment.width, 'segment.height': segment.height, 'segment.codec': segment.codec, }); } // tracing Profiler.tag(segment.id as string, 'transcodeStart'); const transcodeResult = await segmentBuilder.transcode(); const { segmentData: content } = transcodeResult; const extension = segmentBuilder instanceof AudioSegment ? (transcodeResult as IAudioSegmentResult).extension : '.ts'; Profiler.tag(segment.id as string, 'transcodeEnd'); const took = (Date.now() - t0.getTime()) / 1000; Honeycomb.endSpan(spanTranscode); const spanS3 = Honeycomb.startSpan(path.basename(__filename), 'saveS3'); Profiler.tag(segment.id as string, 'saveToS3Start'); assert(queueTask.id, segment.id as string); const s3Path = `${segment.cacheLocation}${queueTask.id}${extension}`; await s3.putObjectAtUrl(s3Path, content); Profiler.tag(segment.id as string, 'saveToS3End'); log.debug(`Segment ${queueTask.id} took ${took}s`, sanitizeJson({ took, id: queueTask.id, ...segment })); log.debug(Profiler.printable(segment.id as string)); Profiler.clean(segment.id as string); Honeycomb.endSpan(spanS3); const size = content.length; responseMessage = { ...segment, s3Path, size, }; await CacheManager.getInstance().saveCache(segment.orgId, queueTask.id, responseMessage); Honeycomb.finishTrace(trace); } catch (err) { const error = err as Error; if ((error.message.includes('HTTP error 403') || error.message.includes('HTTP error 400'))) { helper.resetSignedUrlsCache(); } else { Sentry.captureException(error); } throw error; } return responseMessage; } //---------------------------------------------------- // #endregion - class methods //---------------------------------------------------- }