// npm import 'reflect-metadata'; import * as AWS from 'aws-sdk'; import e from 'express'; import * as express from 'express'; import cookieParser from 'cookie-parser'; import cors from 'cors'; import * as helmet from 'helmet'; import * as _ from 'lodash'; import compression from 'compression'; import bluebird from 'bluebird'; import * as rp from 'request-promise'; import nocache from 'nocache'; import blocked from 'blocked'; // @ownzones import { Rrtq, Queue } from '@ownzones/rrtq'; import { ICognitoTokenPayload, isIpWhitelisted, WhitelistType, getOrganizationBySlug, setContext, CognitoTokenVerifier, } from '@ownzones/iam-service'; import { AncillaryDataType } from '@ownzones/imf/dist/lib/st0377/writer/mxf-file-writer'; import { cloudWatchMetrics } from '@ownzones/ts-log'; import * as Sentry from '@sentry/node'; import { Honeycomb } from './lib/tracing'; // app import { config, log } from './config'; import { SegmentLoader } from './lib/segment-loader'; import { CustomAudioTrack, IPlaylistBuildInput, PlaylistBuilder, TranscodingSegment, } from './lib/playlist-builder'; import { IFile, IUser, } from './lib/connect-api'; import { CacheManager } from './lib/cache'; import { getAncillaryResource } from './lib/ancillary-resource'; import { NestedObject } from './utils/types'; import { sanitizeJson } from './utils/sanitize-json'; AWS.config.setPromisesDependency(bluebird); export function sentryInit(): void { Sentry.init({ environment: config.namespace, dsn: config.logs.sentry.dsn }); log.info(`Sentry initialized, env ${config.namespace} dsn ${config.logs.sentry.dsn}`); } async function getRrtqNamespaceMetrics(rrtq: Rrtq, namespace: string, deployment: string, kedaEnabled: boolean): Promise { if (config.mock) { return 1; } const queueNames = await rrtq.discoverQueues(namespace) as string[]; let taskCount = 0; // Keda disabled: get number of workers to divide the metric let numberOfWorkers = -1; if (!kedaEnabled) { try { const promQuery = await rp.get( `${config.prometheus.url}/api/v1/query?query=${encodeURIComponent( `kube_deployment_spec_replicas{namespace="${config.namespace}", deployment="${deployment}"}`, )}`, { json: true }, ) as NestedObject; const stringValue = _.get(promQuery, 'data.result.0.value.1') as string; if (stringValue) { numberOfWorkers = _.clamp(Number.parseInt(stringValue, 10), 1, 10000); } else { log.warn(`Prometheus ${namespace} metric failed, getting number of workers from rrtq`); numberOfWorkers = _.clamp(await rrtq.getNumberOfWorkers(namespace), 1, 10000); } } catch (err) { log.error(err); } } // Number of tasks await bluebird.map(queueNames, async (queueName) => { const queue = await Queue.fromName(rrtq.redisClient, `${namespace}:${queueName}`); if (queue) { const inProgress = await queue.getInProgressTasksCount(); const inPending = await queue.getPendingTasksCount(); taskCount += inProgress + inPending; } else { log.info(`Cant get the queue with name : ${queueName}`); } }, { concurrency: 20 }); let nsMetric = taskCount; // Keda disabled: divide the metric if (!kedaEnabled) { // In case we can't contact prometheus, let it scale be set to 1 so that it doesn't change the number of workers. if (numberOfWorkers < 0) { numberOfWorkers = taskCount || 1; } nsMetric /= numberOfWorkers; } return nsMetric; } const delay = (miliseconds: number) => new Promise((resolve) => setTimeout(resolve, miliseconds)); export async function init( cognitoTokenVerifier: CognitoTokenVerifier, waitTimeout = 21000, segmentSize = 7, ): Promise<{ close: () => Promise }> { let resBodyArr: string[] = []; const app = e(); app.use(Sentry.Handlers.requestHandler() as express.RequestHandler); app.use(compression({ filter: () => true, })); app.use(cookieParser()); app.use((_err: any, req: express.Request, _res: express.Response, next: express.NextFunction) => { log.info('Error before error sentry'); // check if requested origin is valid based on cors settings const requestOrigin = req.get('Origin'); if (requestOrigin && !(new RegExp(config.cors.origin).exec(requestOrigin))) { log.info('Throwing exception there'); throw new Error(`Invalid Origin header for '${requestOrigin}'.`); } log.info('Going to next'); return next(); }); app.use(cors({ maxAge: config.cors.maxAge, // to avoid unnecessary browser pre flight calls allowedHeaders: [ 'authorization', 'x-honeycomb-trace', 'x-organization-slug', ], methods: config.cors.allowMethods, credentials: true, origin: true, })); // security headers app.use(helmet.frameguard()); app.use(nocache()); app.use(helmet.noSniff()); app.use(helmet.xssFilter()); app.use(async (req, res, next) => { if (req.url === '/metrics') { return next(); } if (req.url.startsWith('/org-cache?')) { return next(); } if (!req.headers.authorization && !req.url.startsWith('/ancillary-resource?')) { log.error('Authorization header error'); return next(new Error('Authorization header is required. Code: 13')); } try { const orgSlug = req.url.startsWith('/ancillary-resource?') ? req.query.orgSlug as string : req.headers['x-organization-slug'] as string; const auth = req.url.startsWith('/ancillary-resource?') ? req.query.accessToken as string : req.headers.authorization; const token = auth?.replace('Bearer ', ''); let tokenPayload: ICognitoTokenPayload; if (!token) { throw new Error('Access token is required'); } try { tokenPayload = await cognitoTokenVerifier.verify(token); } catch (err) { log.error('Error validating token', err); throw err; } const org = await getOrganizationBySlug(orgSlug, setContext()); if (!org) { throw new Error(`Invalid org slug '${orgSlug}'`); } const ip = req.headers['x-forwarded-for']; if (!config.mock && ip && !isIpWhitelisted( org, ip as string, WhitelistType.VideoPlayer, )) { res.status(403); res.send('Error: Permission denied to perform current action.'); } res.locals.user = { id: tokenPayload.userId, email: tokenPayload.email, // Org fields are only exposed globally for backwards-compatibility. // Because they existed on the validate-token response `user` shape, they can still be referenced indirectly organizationId: org.id, organization: { id: org.id, orgId: org.id, slug: org.slug, mediaViewCacheFileLocator: org.mediaViewCacheFileLocator, organizationOptions: { cacheCapacity: org.organizationOptions.cacheCapacity, }, }, }; return next(); } catch (err) { return next(err); } }); const rrtq = new Rrtq({ redis: config.redis }); const videoQueue = await rrtq.createQueue( 'mediaview', { ...config.rrtq, namespace: config.rrtq.videoNamespace }, ); const audioQueue = await rrtq.createQueue( 'mediaview', { ...config.rrtq, namespace: config.rrtq.audioNamespace }, ); const segmentLoader = new SegmentLoader(videoQueue, audioQueue); // endpoint which returns a HLS media playlist. It receives a CPL url // and builds the equivalent M3U8 file // eslint-disable-next-line @typescript-eslint/no-misused-promises app.get('/media-playlist', async (req, res, next) => { const trace = Honeycomb.startWithHeaders({ name: 'media-playlist' }, req.get('X-Honeycomb-Trace')); const { virtualTrackId, fileId, customAudioTrack } = req.query; const cookies = req.cookies as Record; try { const options = { segmentSize, fileId: fileId as string, virtualTrackId: virtualTrackId as string | undefined, user: res.locals.user as IUser, accessToken: req.headers.authorization || cookies['zl-accessToken'], orgSlug: req.headers['x-organization-slug'] as string || cookies['zl-organizationSlug'] || 'test', customAudioTrack: customAudioTrack ? (JSON.parse(customAudioTrack as string) as CustomAudioTrack[]) : undefined, }; const playlist = (await PlaylistBuilder.build(options)).playlist.mediaPlayList; res.set('Content-Type', 'application/x-mpegURL'); return res.send(playlist); } catch (error) { log.error(sanitizeJson(error)); return next(error); } finally { Honeycomb.finishTrace(trace); } }); // endpoint which returns a MEPG-TS segment // eslint-disable-next-line @typescript-eslint/no-misused-promises app.get('/segment', async (req, res, next) => { const { virtualTrackId, fileId, customAudioTrack, cacheBuster, index, } = req.query; const cookies = req.cookies as Record; const orgSlug = req.headers['x-organization-slug'] as string || cookies['zl-organizationSlug']; const accessToken = req.headers.authorization || cookies['zl-accessToken']; const trace = Honeycomb.startWithHeaders({ name: 'serverSegment' }, req.get('X-Honeycomb-Trace')); try { const options: IPlaylistBuildInput = { segmentSize, orgSlug, accessToken, virtualTrackId: virtualTrackId as string, fileId: fileId as string, user: res.locals.user as IUser, customAudioTrack: customAudioTrack ? (JSON.parse(customAudioTrack as string) as CustomAudioTrack[]) : undefined, buildCustomSegments: !!customAudioTrack, }; const playlists = await PlaylistBuilder.build(options); const { segments, sourceFile } = playlists.playlist as {segments: TranscodingSegment[], sourceFile: IFile}; const noMappingSegments = playlists.customSegments ? playlists.customSegments : null; const segmentIndex = Number.parseInt(index as string, 10); const origin = req.headers.origin ? (Array.isArray(req.headers.origin) ? req.headers.origin.join(',') : req.headers.origin) : '*'; res.setHeader('Content-Type', 'video/MP2T'); await segmentLoader.load({ res, req, next, orgSlug, accessToken, origin, sourceFile, trace, segments, segmentIndex, noMappingSegments, // ToDo When do we get this? cacheBuster: Boolean(cacheBuster), fileId: fileId as string, timeout: waitTimeout, }); } catch (error) { log.error(sanitizeJson(error)); next(error); } finally { Honeycomb.finishTrace(trace); } }); // endpoint which returns a ancillary resource for a timed text MXF. // eslint-disable-next-line @typescript-eslint/no-misused-promises app.get('/ancillary-resource', async (req, res, next) => { const { fileId, type, fileName, orgSlug, } = req.query; const trace = Honeycomb.startWithHeaders({ name: 'ancillary-resource' }, req.get('X-Honeycomb-Trace')); try { const options = { fileId: fileId as string, type: type as AncillaryDataType, fileName: fileName as string, orgSlug: orgSlug as string || 'test', }; const body = await getAncillaryResource(options); res.set('Content-Type', type === AncillaryDataType.Image ? 'image/png' : 'application/x-fontopentype'); return res.send(body); } catch (error) { log.error(error); return next(error); } finally { Honeycomb.finishTrace(trace); } }); app.get('/metrics', (_req, res) => { res.set('Content-Type', 'text/plain'); res.send(resBodyArr.join('')); }); // eslint-disable-next-line @typescript-eslint/no-misused-promises app.get('/org-cache', async (req, res) => { const { orgSlug } = req.query; if (!orgSlug) { res.status(400).json({ error: 'Organization option missing' }); } else { try { res.json({ cacheUsed: await CacheManager.getInstance().getStorage(orgSlug.toString()) }); } catch { res.status(400).json({ error: 'Cannot retrieve organization cache storage' }); } } }); app.use(Sentry.Handlers.errorHandler() as express.ErrorRequestHandler); app.use((err: any, _req: express.Request, res: express.Response) => { // The error id is attached to `res.sentry` to be returned // and optionally displayed to the user for support. res.statusCode = 500; res.json(err); res.end(`${(res as express.Response & { sentry: string }).sentry}\n`); }); const port = 3000; const server = app.listen(port, () => { log.info(`Example app listening on port ${port}!`); }); server.on('error', (error) => { if (error) { log.error(`Failed to start the server on port ${port}`); } }); server.setTimeout(10 * 60 * 1000); server.keepAliveTimeout = config.server.keepAliveTimeout; let recurse = true; const computeMetrics = async () => { await delay(10 * 1000); const videoMetric = await getRrtqNamespaceMetrics(rrtq, config.rrtq.videoNamespace, 'mediaview-worker-video', config.keda.enabled); const audioMetric = await getRrtqNamespaceMetrics(rrtq, config.rrtq.audioNamespace, 'mediaview-worker-audio', config.keda.enabled); const precacheMetric = await getRrtqNamespaceMetrics(rrtq, config.rrtq.videoNamespacePreCache, 'mediaview-worker-precache', config.keda.enabled); const extractAudioChannelsMetric = await getRrtqNamespaceMetrics(rrtq, config.rrtq.audioNamespaceChannelExtract, 'mediaview-worker-extract-audio-channels', config.keda.enabled); const driverMetric = await getRrtqNamespaceMetrics(rrtq, config.rrtq.driverNamespace, 'mediaview-driver', false); resBodyArr = [ `# TYPE mv_video_queue_length gauge\nmv_video_queue_length ${videoMetric}\n`, `# TYPE mv_audio_queue_length gauge\nmv_audio_queue_length ${audioMetric}\n`, `# TYPE mv_precache_queue_length gauge\nmv_precache_queue_length ${precacheMetric}\n`, `# TYPE mv_driver_queue_length gauge\nmv_driver_queue_length ${driverMetric}\n`, `# TYPE mv_extract_audio_channels_queue_length gauge\nmv_extract_audio_channels_queue_length ${extractAudioChannelsMetric}\n`, ]; if (recurse) { await computeMetrics(); } }; void computeMetrics(); return { close: async () => { server.close(); await rrtq.stop(); recurse = false; }, }; } if (require.main === module) { Honeycomb.initialize(); sentryInit(); void init(new CognitoTokenVerifier(config.cognito)); // Capture event loop latency if (config.monitoring.eventLoopLatency.enabled) { blocked((ms: number) => log.infoMetrics( 'CaptureEventLoopLag', { Environment: config.envName, Service: config.app, Type: config.serviceName || 'default', Latency: ms, }, [cloudWatchMetrics.eventLoopLatency], ), { threshold: config.monitoring.eventLoopLatency.threshold, interval: config.monitoring.eventLoopLatency.interval, }); } }