/* eslint-disable no-underscore-dangle */ /** * @module base-worker.ts * @description Implements a base class for media view workers */ // @ownzones import { Consumer, Rrtq, Task } from '@ownzones/rrtq'; import { EventEmitter } from 'events'; // app import { ConfigType } from '../../config'; //---------------------------------------------------- // #region - types //---------------------------------------------------- export enum WorkerState { new = 'new', ready = 'ready', working = 'working', stopped = 'stopped', } export enum WorkerEvents { taskDone = 'taskDone', } //---------------------------------------------------- // #endregion - types //---------------------------------------------------- /** * Base class for media view workers */ export abstract class BaseWorker extends EventEmitter { //---------------------------------------------------- // #region - class members //---------------------------------------------------- // Associated Redis task queue protected rrtq: Rrtq; // Associated task queue Consumer protected consumer: Consumer; // Worker state protected _state: WorkerState = WorkerState.new; //---------------------------------------------------- // #endregion - class members //---------------------------------------------------- //---------------------------------------------------- // #region - class methods //---------------------------------------------------- /** * Create a BaseWorker instance and subscribes it to Rrtq. * @param configuration - the worker configuration * @param namespace - the associated Rrtq namespace * @param maxAllowedTasks - the max number of tasks allowed for the work to run */ protected constructor(configuration: ConfigType, namespace: string, maxAllowedTasks?: number) { super(); this.rrtq = new Rrtq(configuration); this.consumer = this.rrtq.getConsumerForNamespace( namespace, this.workerFuncWrapper.bind(this), { consumerName: configuration.podName, cleanupQueuesIntervalTimeout: configuration.rrtq.cleanupQueuesIntervalTimeout, }, ); this.consumer.subscribe(maxAllowedTasks); this._state = WorkerState.ready; } /** * Get the worker state. */ public get state(): WorkerState { return this._state; } /** * Stops the worker, un-registers from task queue. * @param gracefulTimeout - milliseconds: wait for any running task to complete before shutting down Rrtq */ public async stop(gracefulTimeout?: number): Promise { this._state = WorkerState.stopped; await this.consumer.stop(); if (gracefulTimeout) { await new Promise((resolve) => setTimeout(resolve, gracefulTimeout)); } await this.rrtq.stop(); } /** * Worker function wrapper, used for worker state management. * @emits WorkerEvents.taskDone */ private async workerFuncWrapper(queueTask: Task): Promise { let result; let error; if (this.state === WorkerState.stopped) { return null; } this._state = WorkerState.working; try { result = await this.workerFunc(queueTask); } catch (e: unknown) { error = e; } this.emit(WorkerEvents.taskDone); if (this.state === WorkerState.working) { (this._state = WorkerState.ready); } if (error) { throw error; } return result; } /** * Worker function, needs to be implemented in derived classes. */ protected abstract workerFunc(queueTask: Task): Promise; //---------------------------------------------------- // #endregion - class methods //---------------------------------------------------- }