import { ClusterClientEvents, EvalOptions, MessageTypes, Serialized, Awaitable, ValidIfSerializable, SerializableInput, ClusterClientData, PackageType } from '../types'; import { BaseMessage, BaseMessageInput, DataType, ProcessMessage } from '../other/message'; import { BrokerMessage, IPCBrokerClient } from '../handlers/broker'; import { detectLibraryFromClient, getInfo } from '../other/utils'; import { ClusterClientHandler } from '../handlers/message'; import type { RefShardingCoreClient } from './coreClient'; import { ShardingUtils } from '../other/shardingUtils'; import { RefClusterManager } from './clusterManager'; import { PromiseHandler } from '../handlers/promise'; import { WorkerClient } from '../classes/worker'; import { ChildClient } from '../classes/child'; import { Serializable } from 'child_process'; import { RefShardingClient } from './client'; import { Guild } from 'discord.js'; import EventEmitter from 'events'; export type ClientRefType = RefShardingClient | RefShardingCoreClient; /** Simplified Cluster instance available on the {@link ClusterClient}. */ export class ClusterClient< InternalClient extends ClientRefType = ClientRefType, InternalManager extends RefClusterManager = RefClusterManager, > extends EventEmitter { /** Ready state of the cluster. */ public ready: boolean; /** Handler that resolves sent messages and requests. */ public promise: PromiseHandler; /** Client that manages broker tunnels. */ readonly broker: IPCBrokerClient; // IPC Broker for the ClusterManager. /** Client that manages the cluster process. */ readonly process: ChildClient | WorkerClient | null; /** Handler that handles messages from the ClusterManager and the Cluster. */ private messageHandler: ClusterClientHandler; /** Package type. */ private packageType: PackageType | null; /** Creates an instance of ClusterClient. */ constructor (public client: InternalClient) { super(); this.ready = false; this.packageType = detectLibraryFromClient(client); this.broker = new IPCBrokerClient(this); this.process = (this.info.ClusterManagerMode === 'process' ? new ChildClient() : this.info.ClusterManagerMode === 'worker' ? new WorkerClient() : null); this.messageHandler = new ClusterClientHandler(this); // Handle messages from the ClusterManager. if (!this.process?.ipc) throw new Error('CLUSTERING_NO_PROCESS | No process to handle messages from.'); this.process.ipc.on('message', this._handleMessage.bind(this)); this.promise = new PromiseHandler(this); } /** Current cluster id. */ public get id(): number { return this.info.ClusterId; } /** Total number of shards. */ public get totalShards(): number { return this.info.TotalShards; } /** Total number of clusters. */ public get totalClusters(): number { return this.info.ClusterCount; } /** Utility function to get some info about the cluster. */ public get info(): ClusterClientData { return getInfo(); } /** Checks if the internal client is ready enough to acknowledge heartbeat pings. */ public async isReadyForHeartbeatAck(): Promise { switch (this.packageType) { case 'discord.js': { const discordClient = this.client as unknown as { isReady?: () => boolean; readyAt?: Date | null; }; if (typeof discordClient.isReady === 'function') return discordClient.isReady(); return Boolean(discordClient.readyAt); } case '@discordjs/core': { const coreClient = this.client as unknown as RefShardingCoreClient; const statuses = await coreClient.gateway.fetchStatus(); if (!statuses?.size) return false; return Array.from(statuses.values()).every((status) => Number(status) === 3); } default: { return this.ready; } } } /** Sends a message to the Cluster as child. (goes to Cluster on _handleMessage). */ public send(message: SerializableInput): Promise { if (!this.process) return Promise.reject(new Error('CLUSTERING_NO_PROCESS_TO_SEND_TO | No process to send the message to (#1).')); else if (!this.ready) return Promise.reject(new Error('CLUSTERING_NOT_READY | Cluster is not ready yet (#1).')); this.emit('debug', `[IPC] [Child ${this.id}] Sending message to cluster.`); return this.process.send({ data: message, _type: MessageTypes.CustomMessage, } as BaseMessage<'normal'>) as Promise; } /** Broadcasts a message to all clusters. */ public broadcast(message: SerializableInput, sendSelf: boolean = false): Promise { if (!this.process) return Promise.reject(new Error('CLUSTERING_NO_PROCESS_TO_SEND_TO | No process to send the message to (#2).')); else if (!this.ready) return Promise.reject(new Error('CLUSTERING_NOT_READY | Cluster is not ready yet (#2).')); this.emit('debug', `[IPC] [Child ${this.id}] Sending message to cluster.`); return this.process.send>({ data: { message, ignore: sendSelf ? undefined : this.id, }, _type: MessageTypes.ClientBroadcast, }) as Promise; } /** Sends a message to the Cluster. */ public _sendInstance(message: BaseMessage): Promise { if (!this.process) return Promise.reject(new Error('CLUSTERING_NO_PROCESS_TO_SEND_TO | No process to send the message to (#3).')); else if (!this.ready) return Promise.reject(new Error('CLUSTERING_NOT_READY | Cluster is not ready yet (#3).')); else if (!('_type' in message) || !('data' in message)) return Promise.reject(new Error('CLUSTERING_INVALID_MESSAGE | Invalid message object.' + JSON.stringify(message))); this.emit('debug', `[IPC] [Child ${this.id}] Sending message to cluster.`); return this.process.send(message) as Promise; } /** Evaluates a script on the master process, in the context of the {@link ClusterManager}. */ public async evalOnManager(script: ((manager: M, context: Serialized

) => Awaitable), options?: { context?: P, timeout?: number }): Promise> { if (!this.process) return Promise.reject(new Error('CLUSTERING_NO_PROCESS_TO_SEND_TO | No process to send the message to (#4).')); else if (!this.ready) return Promise.reject(new Error('CLUSTERING_NOT_READY | Cluster is not ready yet (#4).')); else if (typeof script !== 'function') return Promise.reject(new Error('CLUSTERING_INVALID_EVAL_SCRIPT | Eval script is not a function (#1).')); const nonce = ShardingUtils.generateNonce(); this.process.send>({ data: { options, script: `(${script})(this,${options?.context ? JSON.stringify(options.context) : undefined})`, }, _nonce: nonce, _type: MessageTypes.ClientManagerEvalRequest, }); return this.promise.create(nonce, options?.timeout); } /** Evaluates a script on all clusters in parallel. */ public async broadcastEval(script: string | ((client: C, context: Serialized

) => Awaitable), options?: EvalOptions

): Promise[]> { if (!this.process) return Promise.reject(new Error('CLUSTERING_NO_PROCESS_TO_SEND_TO | No process to send the message to (#5).')); else if (!this.ready) return Promise.reject(new Error('CLUSTERING_NOT_READY | Cluster is not ready yet (#5).')); else if (typeof script !== 'string' && typeof script !== 'function') return Promise.reject(new Error('CLUSTERING_INVALID_EVAL_SCRIPT | Eval script is not a function or string.')); const nonce = ShardingUtils.generateNonce(); this.process.send({ data: { options, script: typeof script === 'string' ? script : `(${script})(this,${options?.context ? JSON.stringify(options.context) : undefined})`, }, _nonce: nonce, _type: MessageTypes.ClientBroadcastRequest, } as BaseMessage<'eval'>); return this.promise.create(nonce, options?.timeout); } /** Evaluates a script on specific guild. */ public async evalOnGuild(guildId: string, script: (client: C, context: Serialized

, guild: E extends true ? Guild : Guild | undefined) => Awaitable, options?: EvalOptions

): Promise> { if (!this.process) return Promise.reject(new Error('CLUSTERING_NO_PROCESS_TO_SEND_TO | No process to send the message to (#6).')); else if (!this.ready) return Promise.reject(new Error('CLUSTERING_NOT_READY | Cluster is not ready yet (#6).')); else if (typeof script !== 'function') return Promise.reject(new Error('CLUSTERING_INVALID_EVAL_SCRIPT | Eval script is not a function (#2).')); else if (typeof guildId !== 'string') return Promise.reject(new TypeError('CLUSTERING_GUILD_ID_INVALID | Guild Id must be a string.')); else if (this.packageType !== 'discord.js') return Promise.reject(new Error('CLUSTERING_EVAL_GUILD_UNSUPPORTED | evalOnGuild is only supported in discord.js package type.')); const nonce = ShardingUtils.generateNonce(); this.process.send({ _type: MessageTypes.ClientBroadcastRequest, _nonce: nonce, data: { script: ShardingUtils.parseInput(script, options?.context, this.packageType, `this?.guilds?.cache?.get('${guildId}')`), options: { ...options, guildId }, }, } as BaseMessage<'eval'>); return this.promise.create(nonce, options?.timeout).then((data) => (data as unknown as T[])?.find((v) => v !== undefined)) as Promise>; } /** Evaluates a script on a current client, in the context of the {@link ShardingClient}. */ public async evalOnClient(script: string | ((client: C, context: Serialized

) => Awaitable), options?: EvalOptions

): Promise> { type EvalObject = { _eval: (script: string) => T; }; switch (this.packageType) { case 'discord.js': { const parsedScript = ShardingUtils.parseInput(script, options?.context, this.packageType); if (!(this.client as unknown as EvalObject)._eval) (this.client as unknown as EvalObject)._eval = function (_: string) { return (0, eval)(_); }.bind(this.client); return await (this.client as unknown as EvalObject)._eval(parsedScript); } case '@discordjs/core': { if (typeof script === 'function') return await script(this.client as unknown as C, options?.context as Serialized

) as Promise>; const fixedScript = script.replace(/\(this,/, '(client,'); const evalFunction = new Function('client', `return (${fixedScript})`); return await evalFunction(this.client, options?.context); } default: { return Promise.reject(new Error('CLUSTERING_EVAL_CLIENT_UNSUPPORTED | evalOnClient is only supported in discord.js and @discordjs/core package types.')); } } } /** Sends a request to the Cluster (cluster has to respond with a reply (cluster.on('message', (message) => message.reply('reply')))). */ public request(message: SerializableInput, options: { timeout?: number } = {}): Promise> { if (!this.process) return Promise.reject(new Error('CLUSTERING_NO_PROCESS_TO_SEND_TO | No process to send the message to (#7).')); else if (!this.ready) return Promise.reject(new Error('CLUSTERING_NOT_READY | Cluster is not ready yet (#7).')); this.emit('debug', `[IPC] [Child ${this.id}] Sending message to cluster.`); const nonce = ShardingUtils.generateNonce(); this.process.send>({ _type: MessageTypes.CustomRequest, _nonce: nonce, data: message, }); return this.promise.create(nonce, options.timeout); } /** Kills all running clusters and respawns them. */ public respawnAll(clusterDelay: number = 8000, respawnDelay: number = 5500, timeout: number = -1, except: number[] = []): Promise { if (!this.process) return Promise.reject(new Error('CLUSTERING_NO_PROCESS_TO_SEND_TO | No process to send the message to (#8).')); else if (!this.ready) return Promise.reject(new Error('CLUSTERING_NOT_READY | Cluster is not ready yet (#8).')); this.emit('debug', `[IPC] [Child ${this.id}] Sending message to cluster.`); return this.process.send({ _type: MessageTypes.ClientRespawnAll, data: { clusterDelay, respawnDelay, timeout, except, }, } as BaseMessage<'respawnAll'>); } /** Kills specific clusters and respawns them. */ public async respawnClusters(clusters: number[], clusterDelay: number = 8000, respawnDelay: number = 5500, timeout: number = -1): Promise { if (!this.process) return Promise.reject(new Error('CLUSTERING_NO_PROCESS_TO_SEND_TO | No process to send the message to (#8).')); else if (!this.ready) return Promise.reject(new Error('CLUSTERING_NOT_READY | Cluster is not ready yet (#8).')); this.emit('debug', `[IPC] [Child ${this.id}] Sending message to cluster.`); return this.process.send({ _type: MessageTypes.ClientRespawnSpecific, data: { clusterIds: clusters, clusterDelay, respawnDelay, timeout, }, } as BaseMessage<'respawnSome'>); } /** Handles a message from the ClusterManager. */ private _handleMessage(message: BaseMessage<'normal'> | BrokerMessage | unknown): void { if (!message || typeof message !== 'object') return; if ('_data' in message) return this.broker.handleMessage(message as BrokerMessage); if (!('_type' in message)) return; const ipcMessage = message as BaseMessage<'normal'>; // Debug. this.emit('debug', `[IPC] [Child ${this.id}] Received message from cluster.`); this.messageHandler.handleMessage(ipcMessage); // Emitted upon receiving a message from the child process/worker. if ([MessageTypes.CustomMessage, MessageTypes.CustomRequest].includes(ipcMessage._type)) { this.emit('message', new ProcessMessage(this, ipcMessage)); } } /** Sends a message to the master process. */ public _respond(message: BaseMessage): void { if (!this.process) throw new Error('CLUSTERING_NO_PROCESS_TO_SEND_TO | No process to send the message to (#9).'); this.process.send(message); } /** Triggers the ready event, do not use this unless you know what you are doing. */ public triggerReady(): boolean { if (this.ready) return this.ready; else if (!this.process) throw new Error('CLUSTERING_NO_PROCESS_TO_SEND_TO | No process to send the message to (#10).'); this.ready = true; this.process.send({ _type: MessageTypes.ClientReady, data: { packageType: this.packageType }, } as BaseMessage<'readyOrSpawn'>); this.emit('ready', this); return this.ready; } /** Spawns the next cluster, when queue mode is on 'manual'. */ public spawnNextCluster(): Promise { if (this.info.ClusterQueueMode === 'auto') throw new Error('Next Cluster can just be spawned when the queue is not on auto mode.'); else if (!this.process) return Promise.reject(new Error('CLUSTERING_NO_PROCESS_TO_SEND_TO | No process to send the message to (#12).')); else if (!this.ready) return Promise.reject(new Error('CLUSTERING_NOT_READY | Cluster is not ready yet (#9).')); return this.process.send({ _type: MessageTypes.ClientSpawnNextCluster, } as BaseMessage<'readyOrSpawn'>); } /** Kills the cluster. */ public _debug(message: string): void { this.emit('debug', message); } } export type RefClusterClient = ClusterClient; export declare interface ClusterClient { /** Emit an event. */ emit: ((event: K, ...args: ClusterClientEvents[K]) => boolean) & ((event: Exclude, ...args: unknown[]) => boolean); /** Remove an event listener. */ off: ((event: K, listener: (...args: ClusterClientEvents[K]) => void) => this) & ((event: Exclude, listener: (...args: unknown[]) => void) => this); /** Listen for an event. */ on: ((event: K, listener: (...args: ClusterClientEvents[K]) => void) => this) & ((event: Exclude, listener: (...args: unknown[]) => void) => this); /** Listen for an event once. */ once: ((event: K, listener: (...args: ClusterClientEvents[K]) => void) => this) & ((event: Exclude, listener: (...args: unknown[]) => void) => this); /** Remove all listeners for an event. */ removeAllListeners: ((event?: K) => this) & ((event?: Exclude) => this); }