#!/usr/bin/env node // npm import 'reflect-metadata'; import * as commander from 'commander'; import blocked from 'blocked'; // @ownzones import { cloudWatchMetrics } from '@ownzones/ts-log'; // app import { config, log } from './config'; import { WorkerEvents, WorkerState } from './lib/workers/base-worker'; import { LongWorker } from './lib/workers/long-worker'; import { Worker } from './lib/workers/worker'; import { sentryInit } from './server'; import { Honeycomb } from './lib/tracing'; import { isInTuple } from './utils/types'; const command = new commander.Command(); command .option('--mode ', 'The type of worker to start') .parse(process.argv); const allowedModes = ['video', 'audio', 'precache', 'extract-audio-channels'] as const; enum WorkerType { normal = 'normal', long = 'long', } const usage = `Usage: node_modules/ts-node/dist/bin.js worker.ts --mode ${allowedModes.join('|')}`; if (require.main === module) { if (isInTuple(command.mode, allowedModes)) { const mode = command.mode; Honeycomb.initialize(); sentryInit(); let namespace; let workerType: WorkerType; let maxAllowedTasks; let worker: LongWorker | Worker; // eslint-disable-next-line default-case switch (mode) { case 'video': namespace = config.rrtq.videoNamespace; workerType = WorkerType.normal; break; case 'audio': namespace = config.rrtq.audioNamespace; workerType = WorkerType.normal; break; case 'precache': namespace = config.rrtq.videoNamespacePreCache; workerType = WorkerType.normal; maxAllowedTasks = config.worker.precache.maxAllowedTasks; break; case 'extract-audio-channels': namespace = config.rrtq.audioNamespaceChannelExtract; workerType = WorkerType.long; break; } if (workerType === WorkerType.normal) { worker = new Worker(config, namespace, maxAllowedTasks); } else if (workerType === WorkerType.long) { worker = new LongWorker(config, namespace); } // Capture event loop latency let eventLoopTimer: NodeJS.Timeout; if (config.monitoring.eventLoopLatency.enabled) { eventLoopTimer = blocked((ms: number) => log.infoMetrics( 'CaptureEventLoopLag', { Environment: config.envName, Service: config.app, Type: config.serviceName || 'default', Latency: ms, }, [cloudWatchMetrics.eventLoopLatency], ), { threshold: config.monitoring.eventLoopLatency.threshold, interval: config.monitoring.eventLoopLatency.interval, }); } // Graceful stop (['SIGINT', 'SIGTERM'] as const).forEach((signal: NodeJS.Signals) => { process.on(signal, () => { if (worker.state === WorkerState.working) { log.info(`Worker received ${signal} signal. Scheduling termination once running task is complete.`); // quit once task is done // HACK: this could cause a task to fail to report it's state, before the consumer stops - use a graceful timeout worker.addListener(WorkerEvents.taskDone, () => { log.info(`Stopping worker due to ${signal} scheduled termination...`); void worker.stop(5000).catch(() => {}).then(() => setImmediate(() => process.exit(0))); }); } else { log.info(`Worker received ${signal} signal. Stopping...`); // quit after stopping void worker.stop().catch(() => {}).then(() => setImmediate(() => process.exit(0))); } // Clear interval if (eventLoopTimer) { clearInterval(eventLoopTimer); } }); }); } else { log.error(usage); process.exit(0); } }