/* eslint-disable no-console */ import cluster, { Worker } from "cluster"; import { Invokable } from "./Invokable.js"; import { availableParallelism } from "os"; import sleep from "./sleep.js"; export class RecycledWorker { public get worker() { return this.currentWorker; } private currentWorker: Worker; private destroyed: boolean; private eventMap: Map = new Map(); constructor(env?) { this.currentWorker = cluster.fork(env); this.currentWorker.on("exit" , () => { if (this.destroyed) { return; } this.currentWorker = cluster.fork(env); for (const [msg, handler] of this.eventMap) { this.currentWorker.on(msg, handler); } }); } public on(msg: string, handler) { this.eventMap.set(msg, handler); if (this.destroyed) { return; } this.currentWorker.on(msg, handler); } public send(a) { if (this.destroyed) { return; } this.currentWorker.send(a); } public destroy() { this.destroyed = true; const { currentWorker } = this; this.currentWorker = null; if(!currentWorker) { return; } currentWorker.destroy(); setTimeout(() => { try { currentWorker.kill("SIGTERM"); } catch { // do nothing } }, 1000); } } const numCPUs = availableParallelism(); export default abstract class ClusterInstance extends Invokable { public get maxWorkerCount() { return numCPUs; } protected isPrimary: boolean; protected readonly workers: RecycledWorker[] = []; public run(arg: T) { this.isPrimary = cluster.isPrimary; if (cluster.isPrimary) { console.log(`Initializing Primary Cluster`); this.setupPrimary(arg).catch(console.error); } else { this.setupWorker(arg).catch(console.error); console.log(`Initializing Cluster Worker`); } } protected abstract runPrimary(arg: T): Promise; protected abstract runWorker(arg: T): Promise; protected fork(env?) { const worker = new RecycledWorker(env); this.install(worker); this.workers.push(worker); return worker; } protected async setupPrimary(arg: T) { try { await this.runPrimary(arg); console.log(`Starting Clusters`); while (true) { const workers = this.workers; workers.length = 0; // Start workers and listen for messages containing notifyRequest const n = this.maxWorkerCount; for (let i = 0; i < n; i++) { const worker = this.fork(); workers.push(worker); } await this.keepAlive(); for (const worker of workers) { worker.destroy(); } } } catch (error) { console.error(error); } } /** * keep worker process alive for 4 hours by default * you can choose your own recycle schedule. * * Recycling process is important to flush out dead connections. */ protected async keepAlive() { // sleep for 4 hour await sleep(4 * 60 * 60 * 1000); } protected async setupWorker(arg: T) { try { this.install(process); await this.runWorker(arg); process.send({ cmd: "ready"}); } catch (error) { console.error(error); } } protected invoke(invoke: string, ...args: any[]): Promise { if (!this.isPrimary) { return Invokable.invoke(this, process, invoke, args); } return Promise.all(this.workers.map((worker) => Invokable.invoke(this, worker, invoke, args))); } }