import { Serializable } from "child_process"; import EventEmitter from "events"; import path from "path"; import { Child } from "../Structures/Child.js"; import { ClusterHandler } from "../Structures/IPCHandler.js"; import { BaseMessage, IPCMessage, RawMessage } from "../Structures/IPCMessage.js"; import { Worker } from "../Structures/Worker.js"; import { ClusterEvents, ClusterKillOptions, DjsDiscordClient, messageType } from "../types/shared"; // eslint-disable-line @typescript-eslint/no-unused-vars import { delayFor, generateNonce } from "../Util/Util"; import { ClusterManager } from "./ClusterManager"; /** * A self-contained cluster created by the {@link ClusterManager}. Each one has a {@link DjsDiscordClient} that contains * an instance of the bot and its {@link DjsDiscordClient}. When its child process/worker exits for any reason, the cluster will * spawn a new one to replace it as necessary. * @augments EventEmitter */ export class Cluster extends EventEmitter { THREAD: typeof Worker | typeof Child; /** * Manager that created the cluster */ manager: ClusterManager; /** * ID of the cluster in the manager */ id: number; /** * Arguments for the shard's process (only when {@link ShardingManager#mode} is `process`) */ args: string[]; /** * Arguments for the shard's process executable (only when {@link ShardingManager#mode} is `process`) */ execArgv: string[]; /** * Internal Shards which will get spawned in the cluster */ shardList: number[]; /** * the amount of real shards */ totalShards: number; /** * Environment variables for the cluster's process, or workerData for the cluster's worker */ env: NodeJS.ProcessEnv & { SHARD_LIST: number[]; TOTAL_SHARDS: number; CLUSTER_MANAGER: boolean; CLUSTER: number; CLUSTER_COUNT: number; DISCORD_TOKEN: string; }; /** * Process of the cluster (if {@link ClusterManager#mode} is `process`) */ thread: null | Worker | Child; restarts: { current: number; max: number; interval: number; reset?: NodeJS.Timeout; resetRestarts: () => void; cleanup: () => void; append: () => void; }; messageHandler: any; /** * Whether the cluster's {@link DjsDiscordClient} is ready */ ready: boolean; /** * @param manager Manager that is creating this cluster * @param id ID of this cluster * @param shardList * @param totalShards */ constructor(manager: ClusterManager, id: number, shardList: number[], totalShards: number) { super(); this.THREAD = manager.mode === 'worker' ? Worker : Child; this.manager = manager; this.id = id; this.args = manager.shardArgs || []; this.execArgv = manager.execArgv; this.shardList = shardList; this.totalShards = totalShards; this.env = Object.assign({}, process.env, { SHARD_LIST: this.shardList, TOTAL_SHARDS: this.totalShards, CLUSTER_MANAGER: true, CLUSTER: this.id, CLUSTER_COUNT: this.manager.totalClusters, DISCORD_TOKEN: this.manager.token as string, }); this.ready = false; this.thread = null; this.restarts = { current: this.manager.restarts.current ?? 0, max: this.manager.restarts.max, interval: this.manager.restarts.interval, reset: undefined, resetRestarts: () => { this.restarts.reset = setInterval(() => { this.restarts.current = 0; }, this.manager.restarts.interval); }, cleanup: () => { if (this.restarts.reset) clearInterval(this.restarts.reset); }, append: () => { this.restarts.current++; }, }; } /** * Forks a child process or creates a worker thread for the cluster. * You should not need to call this manually. * @param spawnTimeout The amount in milliseconds to wait until the {@link DjsDiscordClient} has become ready * before resolving. (-1 or Infinity for no wait) */ public async spawn(spawnTimeout = -1) { if (this.thread) throw new Error('CLUSTER ALREADY SPAWNED | ClusterId: ' + this.id); this.thread = new this.THREAD(path.resolve(this.manager.file), { ...this.manager.clusterOptions, execArgv: this.execArgv, env: this.env, /** Construct args with hooks, to provide parameters with in the context of a cluster */ args: this.manager.hooks.constructClusterArgs(this, this.args), clusterData: { ...this.env, ...this.manager.clusterData }, }); this.messageHandler = new ClusterHandler(this.manager, this, this.thread); this.thread .spawn() .on('message', this._handleMessage.bind(this)) .on('exit', this._handleExit.bind(this)) .on('error', this._handleError.bind(this)); /** * Emitted upon the creation of the cluster's child process/worker. * @event Cluster#spawn * @param {Child|Worker} process Child process/worker that was created */ this.emit('spawn', this.thread.process); await new Promise((resolve, reject) => { let spawnTimeoutTimer: NodeJS.Timeout | undefined = undefined; const cleanup = (death = false) => { clearTimeout(spawnTimeoutTimer); // Remove listeners if cluster died to prevent event emitter leaks if (death) { this.off('ready', onReady); this.off('death', onDeath); } }; const onReady = () => { this.manager.emit('clusterReady', this); this.restarts.cleanup(); this.restarts.resetRestarts(); cleanup(); resolve('Cluster is ready'); }; const onDeath = () => { cleanup(true); reject(new Error('CLUSTERING_READY_DIED | ClusterId: ' + this.id)); }; const onTimeout = () => { cleanup(); reject(new Error('CLUSTERING_READY_TIMEOUT | ClusterId: ' + this.id)); }; // If there is a spawn timeout wait and error if cluster does not get ready if (spawnTimeout !== -1 && spawnTimeout !== Infinity) { spawnTimeoutTimer = setTimeout(onTimeout, spawnTimeout); } else { // No timeout, next cluster will be spawned, without waiting for ready resolve('Skipping ready check'); } this.once('ready', onReady); this.once('death', onDeath); }); return this.thread.process; } /** * Immediately kills the clusters process/worker and does not restart it. * @param options Some Options for managing the Kill * @param options.force Whether the Cluster should be force kill and be ever respawned... */ public kill(options: ClusterKillOptions) { this.thread?.kill(); if (this.thread) { this.thread = null; } this.manager.heartbeat?.clusters.get(this.id)?.stop(); this.restarts.cleanup(); this.manager._debug('[KILL] Cluster killed with reason: ' + (options?.reason || 'not given'), this.id); } /** * Kills and restarts the cluster's process/worker. * @param options Options for respawning the cluster */ public async respawn({ delay = 5500, timeout = -1 } = this.manager.spawnOptions) { if (this.thread) this.kill({ force: true }); if (delay > 0) await delayFor(delay); this.manager.heartbeat?.clusters.get(this.id)?.stop(); return this.spawn(timeout); } /** * Sends a message to the cluster's process/worker. * @param message Message to send to the cluster */ public send(message: RawMessage) { if (typeof message === 'object') this.thread?.send(new BaseMessage(message).toJSON()); else return this.thread?.send(message); } /** * Sends a Request to the ClusterClient and returns the reply * @param message Message, which should be sent as request * @returns Reply of the Message * @example * client.cluster.request({content: 'hello'}) * .then(result => console.log(result)) //hi * .catch(console.error); * @see {@link IPCMessage#reply} */ public request(message: RawMessage) { message._type = messageType.CUSTOM_REQUEST; this.send(message); return this.manager.promise.create(message, message.options); } /** * Evaluates a script or function on the cluster, in the context of the {@link DjsDiscordClient}. * @param script JavaScript to run on the cluster * @param context * @param timeout * @returns Result of the script execution */ public async eval(script: string, context: any, timeout: number) { // Stringify the script if it's a Function const _eval = typeof script === 'function' ? `(${script})(this, ${JSON.stringify(context)})` : script; // cluster is dead (maybe respawning), don't cache anything and error immediately if (!this.thread) return Promise.reject(new Error('CLUSTERING_NO_CHILD_EXISTS | ClusterId: ' + this.id)); const nonce = generateNonce(); const message = { nonce, _eval, options: { timeout }, _type: messageType.CLIENT_EVAL_REQUEST }; await this.send(message); return await this.manager.promise.create(message, message.options); } /** * @param reason If maintenance should be enabled with a given reason or disabled when nonce provided */ public triggerMaintenance(reason?: string) { const _type = reason ? messageType.CLIENT_MAINTENANCE_ENABLE : messageType.CLIENT_MAINTENANCE_DISABLE; return this.send({ _type, maintenance: reason }); } /** * Handles a message received from the child process/worker. * @param message Message received * @private */ private _handleMessage(message: Serializable) { if (!message) return; const emit = this.messageHandler.handleMessage(message); if (!emit) return; let emitMessage; if (typeof message === 'object') { emitMessage = new IPCMessage(this, message); if (emitMessage._type === messageType.CUSTOM_REQUEST) this.manager.emit('clientRequest', emitMessage); } else emitMessage = message; /** * Emitted upon receiving a message from the child process/worker. * @event Shard#message * @param {*|IPCMessage} message Message that was received */ this.emit('message', emitMessage); } /** * Handles the cluster's process/worker exiting. * @private * @param {Number} exitCode */ private _handleExit(exitCode: number) { // eslint-disable-line @typescript-eslint/no-unused-vars /** * Emitted upon the cluster's child process/worker exiting. * @event Cluster#death * @param {Child|Worker} process Child process/worker that exited */ const respawn = this.manager.respawn; // Cleanup functions this.manager.heartbeat?.clusters.get(this.id)?.stop(); this.restarts.cleanup(); this.emit('death', this, this.thread?.process); this.manager._debug( '[DEATH] Cluster died, attempting respawn | Restarts Left: ' + (this.restarts.max - this.restarts.current), this.id, ); this.ready = false; this.thread = null; if (!respawn) return; if (this.restarts.current >= this.restarts.max) this.manager._debug( '[ATTEMPTED_RESPAWN] Attempted Respawn Declined | Max Restarts have been exceeded', this.id, ); if (respawn && this.restarts.current < this.restarts.max) this.spawn().catch(err => this.emit('error', err)); this.restarts.append(); } /** * Handles the cluster's process/worker error. * @param error the error, which occurred on the worker/child process * @private */ private _handleError(error: Error) { /** * Emitted upon the cluster's child process/worker error. * @event Cluster#error * @param {Child|Worker} process Child process/worker, where error occurred */ this.manager.emit('error', error); } } // Credits for EventEmitter typings: https://github.com/discordjs/discord.js/blob/main/packages/rest/src/lib/RequestManager.ts#L159 | See attached license export interface Cluster { emit: ((event: K, ...args: ClusterEvents[K]) => boolean) & ((event: Exclude, ...args: any[]) => boolean); off: ((event: K, listener: (...args: ClusterEvents[K]) => void) => this) & (( event: Exclude, listener: (...args: any[]) => void, ) => this); on: ((event: K, listener: (...args: ClusterEvents[K]) => void) => this) & (( event: Exclude, listener: (...args: any[]) => void, ) => this); once: ((event: K, listener: (...args: ClusterEvents[K]) => void) => this) & (( event: Exclude, listener: (...args: any[]) => void, ) => this); removeAllListeners: ((event?: K) => this) & ((event?: Exclude) => this); }