import { EventEmitter } from 'events'; import { requestFromIPC, subscribeIPC, subscribeWithTimeout } from './IPC.ts'; import { type Type, Deferred, generateId, merge, retry, MAX_CONCURRENT_CREATE_ROOM_WAIT_TIME, REMOTE_ROOM_SHORT_TIMEOUT, type MethodName, type ExtractMethodOrPropertyType } from './utils/Utils.ts'; import { isDevMode, cacheRoomHistory, getPreviousProcessId, getRoomRestoreListKey, reloadFromCache } from './utils/DevMode.ts'; import { RegisteredHandler } from './matchmaker/RegisteredHandler.ts'; import { type OnCreateOptions, Room, RoomInternalState } from './Room.ts'; import { LocalPresence } from './presence/LocalPresence.ts'; import { createScopedPresence, type Presence } from './presence/Presence.ts'; import { debugAndPrintError, debugMatchMaking } from './Debug.ts'; import { SeatReservationError } from './errors/SeatReservationError.ts'; import { ServerError } from './errors/ServerError.ts'; import { type IRoomCache, type MatchMakerDriver, type SortOptions, LocalDriver } from './matchmaker/LocalDriver/LocalDriver.ts'; import { controller } from './matchmaker/controller.ts'; import * as stats from './Stats.ts'; import { logger } from './Logger.ts'; import type { AuthContext, Client } from './Transport.ts'; import { getLockId, initializeRoomCache, type ExtractRoomCacheMetadata } from './matchmaker/driver.ts'; import { type ISeatReservation, CloseCode, ErrorCode } from '@colyseus/shared-types'; import { getDefaultDriver, getDefaultPresence, getDefaultPublicAddress } from './utils/Env.ts'; export type { ISeatReservation, ExtractRoomCacheMetadata }; export { controller, stats, type MatchMakerDriver }; export type ClientOptions = any; export type SelectProcessIdCallback = (roomName: string, clientOptions: ClientOptions) => Promise; const handlers: {[id: string]: RegisteredHandler} = {}; const rooms: {[roomId: string]: Room} = {}; const events = new EventEmitter(); export let publicAddress: string; export let processId: string; export let presence: Presence; export let driver: MatchMakerDriver; /** * Function to select the processId to create the room on. * By default, returns the process with least amount of rooms created. * @returns The processId to create the room on. */ export let selectProcessIdToCreateRoom: SelectProcessIdCallback = async function () { return (await stats.fetchAll()) .sort((p1, p2) => p1.roomCount > p2.roomCount ? 1 : -1)[0]?.processId || processId; }; /** * Whether health checks are enabled or not. (default: true) * * Health checks are automatically performed on theses scenarios: * - At startup, to check for leftover/invalid processId's * - When a remote room creation request times out * - When a remote seat reservation request times out */ let enableHealthChecks: boolean = true; export function setHealthChecksEnabled(value: boolean) { enableHealthChecks = value; } export let onReady: Deferred = new Deferred(); // onReady needs to be immediately available to @colyseus/auth integration. export const MatchMakerState = { INITIALIZING: 0, READY: 1, SHUTTING_DOWN: 2, } as const; export type MatchMakerState = (typeof MatchMakerState)[keyof typeof MatchMakerState]; /** * Internal MatchMaker state */ export let state: MatchMakerState; /** * @private */ export async function setup( _presence?: Presence, _driver?: MatchMakerDriver, _publicAddress?: string, _selectProcessIdToCreateRoom?: SelectProcessIdCallback, ) { if (onReady === undefined) { // // for testing purposes only: onReady is turned into undefined on shutdown // (needs refactoring.) // onReady = new Deferred(); } state = MatchMakerState.INITIALIZING; presence = _presence || await getDefaultPresence(); driver = _driver || await getDefaultDriver(); publicAddress = _publicAddress || getDefaultPublicAddress(); stats.reset(false); // devMode: try to retrieve previous processId if (isDevMode) { processId = await getPreviousProcessId(); } // ensure processId is set if (!processId) { processId = generateId(); } /** * Override default `selectProcessIdToCreateRoom` function. */ if (_selectProcessIdToCreateRoom) { selectProcessIdToCreateRoom = _selectProcessIdToCreateRoom; } // boot driver if necessary (e.g. RedisDriver/PostgresDriver) if (driver.boot) { await driver.boot(); } onReady.resolve(); } /** * - Accept receiving remote room creation requests * - Check for leftover/invalid processId's on startup * @private */ export async function accept(isStandalone: boolean = false) { await onReady; // make sure "processId" is available /** * Skip setting up IPC and health checks if this process is running as a * standalone match-maker. * * When in "standalone" mode, this process will not spawn rooms and will only * be responsible for matchmaking. */ if (isStandalone) { state = MatchMakerState.READY; return; } /** * Process-level subscription * - handle remote process healthcheck * - handle remote room creation */ await subscribeIPC(presence, getProcessChannel(), (method: string, args: any) => { if (method === 'healthcheck') { // health check for this processId return true; } else { // handle room creation return handleCreateRoom.apply(undefined, args); } }); /** * Check for leftover/invalid processId's on startup */ if (enableHealthChecks) { await healthCheckAllProcesses(); /* * persist processId every 1 minute * * FIXME: this is a workaround in case this `processId` gets excluded * (`stats.excludeProcess()`) by mistake due to health-check failure */ stats.setAutoPersistInterval(); } state = MatchMakerState.READY; await stats.persist(); if (isDevMode) { await reloadFromCache(); } } /** * Join or create into a room and return seat reservation */ export async function joinOrCreate(roomName: string, clientOptions: ClientOptions = {}, authContext?: AuthContext) { return await retry>(async () => { const authData = await callOnAuth(roomName, clientOptions, authContext); let room: IRoomCache = await findOneRoomAvailable(roomName, clientOptions); if (!room) { const handler = getHandler(roomName); const filterOptions = handler.getFilterOptions(clientOptions); const concurrencyKey = getLockId(filterOptions); // // Prevent multiple rooms of same filter from being created concurrently // await concurrentJoinOrCreateRoomLock(handler, concurrencyKey, async (roomId?: string) => { if (roomId) { room = await driver.findOne({ roomId }) } // If the room is not found or is already locked, try to find a new one if (!room || room.locked) { room = await findOneRoomAvailable(roomName, clientOptions); } if (!room) { // // TODO [?] // should we expose the "creator" auth data of the room during `onCreate()`? // it would be useful, though it could be accessed via `onJoin()` for now. // room = await createRoom(roomName, clientOptions); // Notify waiting concurrent requests about the new room presence.publish(`concurrent:${handler.name}:${concurrencyKey}`, room.roomId); } return room; }); } return await reserveSeatFor(room, clientOptions, authData); }, 5, [SeatReservationError]); } /** * Create a room and return seat reservation */ export async function create(roomName: string, clientOptions: ClientOptions = {}, authContext?: AuthContext) { const authData = await callOnAuth(roomName, clientOptions, authContext); const room = await createRoom(roomName, clientOptions); return reserveSeatFor(room, clientOptions, authData); } /** * Join a room and return seat reservation */ export async function join(roomName: string, clientOptions: ClientOptions = {}, authContext?: AuthContext) { return await retry>(async () => { const authData = await callOnAuth(roomName, clientOptions, authContext); const room = await findOneRoomAvailable(roomName, clientOptions); if (!room) { throw new ServerError(ErrorCode.MATCHMAKE_INVALID_CRITERIA, `no rooms found with provided criteria`); } return reserveSeatFor(room, clientOptions, authData); }); } /** * Join a room by id and return seat reservation */ export async function reconnect(roomId: string, clientOptions: ClientOptions = {}) { const room = await driver.findOne({ roomId }); if (!room) { // TODO: support a "logLevel" out of the box? if (process.env.NODE_ENV !== 'production') { logger.info(`āŒ room "${roomId}" has been disposed. Did you miss .allowReconnection()?\nšŸ‘‰ https://docs.colyseus.io/room#allow-reconnection`); } throw new ServerError(ErrorCode.MATCHMAKE_INVALID_ROOM_ID, `room "${roomId}" has been disposed.`); } // check for reconnection const reconnectionToken = clientOptions.reconnectionToken; if (!reconnectionToken) { throw new ServerError(ErrorCode.MATCHMAKE_UNHANDLED, `'reconnectionToken' must be provided for reconnection.`); } // respond to re-connection! const sessionId = await remoteRoomCall(room.roomId, 'checkReconnectionToken', [reconnectionToken]); if (sessionId) { return buildSeatReservation(room, sessionId); } else { // TODO: support a "logLevel" out of the box? if (process.env.NODE_ENV !== 'production') { logger.info(`āŒ reconnection token invalid or expired. Did you miss .allowReconnection()?\nšŸ‘‰ https://docs.colyseus.io/room#allow-reconnection`); } throw new ServerError(ErrorCode.MATCHMAKE_EXPIRED, `reconnection token invalid or expired.`); } } /** * Join a room by id and return client seat reservation. An exception is thrown if a room is not found for roomId. * * @param roomId - The Id of the specific room instance. * @param clientOptions - Options for the client seat reservation (for `onJoin`/`onAuth`) * @param authContext - Optional authentication token * * @returns Promise - A promise which contains `sessionId` and `IRoomCache`. */ export async function joinById(roomId: string, clientOptions: ClientOptions = {}, authContext?: AuthContext) { const room = await driver.findOne({ roomId }); if (!room) { throw new ServerError(ErrorCode.MATCHMAKE_INVALID_ROOM_ID, `room "${roomId}" not found`); } else if (room.locked) { throw new ServerError(ErrorCode.MATCHMAKE_INVALID_ROOM_ID, `room "${roomId}" is locked`); } const authData = await callOnAuth(room.name, clientOptions, authContext); return reserveSeatFor(room, clientOptions, authData); } /** * Perform a query for all cached rooms */ export async function query( conditions: Partial> = {}, sortOptions?: SortOptions, ) { return await driver.query(conditions, sortOptions); } /** * Find for a public and unlocked room available. * * @param roomName - The Id of the specific room. * @param filterOptions - Filter options. * @param sortOptions - Sorting options. * * @returns Promise - A promise contaning an object which includes room metadata and configurations. */ export async function findOneRoomAvailable( roomName: string, filterOptions: ClientOptions, additionalSortOptions?: SortOptions, ) { const handler = getHandler(roomName); const sortOptions = Object.assign({}, handler.sortOptions ?? {}); if (additionalSortOptions) { Object.assign(sortOptions, additionalSortOptions); } return await driver.findOne({ locked: false, name: roomName, private: false, ...handler.getFilterOptions(filterOptions), }, sortOptions); } /** * Call a method or return a property on a remote room. * * @param roomId - The Id of the specific room instance. * @param method - Method or attribute to call or retrive. * @param args - Array of arguments for the method * * @returns Promise - Returned value from the called or retrieved method/attribute. */ export async function remoteRoomCall( roomId: string, method: keyof TRoom, args?: any[], rejectionTimeout = REMOTE_ROOM_SHORT_TIMEOUT, ): Promise> { const room = rooms[roomId] as TRoom; if (!room) { try { return await requestFromIPC(presence, getRoomChannel(roomId), method as string, args, rejectionTimeout); } catch (e: any) { // // the room cache from an unavailable process might've been used here. // perform a health-check on the process before proceeding. // (this is a broken state when a process wasn't gracefully shut down) // if (method === '_reserveSeat' && e.message === "ipc_timeout") { throw e; } // TODO: for 1.0, consider always throwing previous error directly. const request = `${String(method)}${args && ' with args ' + JSON.stringify(args) || ''}`; throw new ServerError( ErrorCode.MATCHMAKE_UNHANDLED, `remote room (${roomId}) timed out, requesting "${request}". (${rejectionTimeout}ms exceeded)`, ); } } else { return (!args && typeof (room[method]) !== 'function') ? room[method as string] : (await room[method as string].apply(room, args && JSON.parse(JSON.stringify(args)))); } } export function defineRoomType>( roomName: string, klass: T, defaultOptions?: OnCreateOptions, ): RegisteredHandler> { const registeredHandler = new RegisteredHandler(klass, defaultOptions) as unknown as RegisteredHandler>; registeredHandler.name = roomName; handlers[roomName] = registeredHandler; if (klass.prototype['onAuth'] !== Room.prototype['onAuth']) { // TODO: soft-deprecate instance level `onAuth` on 0.16 // logger.warn("DEPRECATION WARNING: onAuth() at the instance level will be deprecated soon. Please use static onAuth() instead."); if (klass['onAuth'] !== Room['onAuth']) { logger.info(`āŒ "${roomName}"'s onAuth() defined at the instance level will be ignored.`); } } return registeredHandler; } export function addRoomType(handler: RegisteredHandler) { handlers[handler.name] = handler; } export function removeRoomType(roomName: string) { delete handlers[roomName]; } export function getAllHandlers() { return handlers; } export function getHandler(roomName: string) { const handler = handlers[roomName]; if (!handler) { throw new ServerError(ErrorCode.MATCHMAKE_NO_HANDLER, `provided room name "${roomName}" not defined`); } return handler; } export function getRoomClass(roomName: string): Type { return handlers[roomName]?.klass; } /** * Creates a new room. * * @param roomName - The identifier you defined on `gameServer.define()` * @param clientOptions - Options for `onCreate` * * @returns Promise - A promise contaning an object which includes room metadata and configurations. */ export async function createRoom(roomName: string, clientOptions: ClientOptions): Promise { // // - select a process to create the room // - use local processId if MatchMaker is not ready yet // const selectedProcessId = (state === MatchMakerState.READY) ? await selectProcessIdToCreateRoom(roomName, clientOptions) : processId; let room: IRoomCache; if (selectedProcessId === undefined) { if (isDevMode && processId === undefined) { // // WORKAROUND: wait for processId to be available // TODO: Remove this check on 1.0 // // - This is a workaround when using matchMaker.createRoom() before the processId is available. // - We need to use top-level await to retrieve processId // await onReady; return createRoom(roomName, clientOptions); } else { throw new ServerError(ErrorCode.MATCHMAKE_UNHANDLED, `no processId available to create room ${roomName}`); } } else if (selectedProcessId === processId) { // create the room on this process! room = await handleCreateRoom(roomName, clientOptions); } else { // ask other process to create the room! try { room = await requestFromIPC( presence, getProcessChannel(selectedProcessId), undefined, [roomName, clientOptions], REMOTE_ROOM_SHORT_TIMEOUT, ); } catch (e: any) { if (e.message === "ipc_timeout") { debugAndPrintError(`${e.message}: create room request timed out for ${roomName} on processId ${selectedProcessId}.`); // // clean-up possibly stale process from redis. // when a process disconnects ungracefully, it may leave its previous processId under "roomcount" // if the process is still alive, it will re-add itself shortly after the load-balancer selects it again. // if (enableHealthChecks) { await stats.excludeProcess(selectedProcessId); } // if other process failed to respond, create the room on this process room = await handleCreateRoom(roomName, clientOptions); } else { // re-throw intentional exception thrown during remote onCreate() throw e; } } } if (isDevMode) { presence.hset(getRoomRestoreListKey(), room.roomId, JSON.stringify({ "clientOptions": clientOptions, "roomName": roomName, "processId": processId })); } return room; } export async function handleCreateRoom(roomName: string, clientOptions: ClientOptions, restoringRoomId?: string): Promise { const handler = getHandler(roomName); const room: Room = new handler.klass(); // set room public attributes if (restoringRoomId && isDevMode) { room.roomId = restoringRoomId; } else { room.roomId = generateId(); } // // Initialize .state (if set). // // Define getters and setters for: // - autoDispose // - patchRate // room['__init'](); room.roomName = roomName; room.presence = createScopedPresence(room, presence); // initialize a RoomCache instance room['_listing'] = initializeRoomCache({ name: roomName, processId, ...handler.getMetadataFromOptions(clientOptions) }); // assign public host if (publicAddress) { room['_listing'].publicAddress = publicAddress; } if (room.onCreate) { try { await room.onCreate(merge({}, clientOptions, handler.options)); } catch (e: any) { debugAndPrintError(e); throw new ServerError( e.code || ErrorCode.MATCHMAKE_UNHANDLED, e.message, ); } } room['_internalState'] = RoomInternalState.CREATED; room['_listing'].roomId = room.roomId; room['_listing'].maxClients = room.maxClients; // imediatelly ask client to join the room debugMatchMaking('creating room \'%s\', roomId: \'%s\', processId: \'%s\'', roomName, room.roomId, processId); // increment amount of rooms this process is handling stats.local.roomCount++; stats.persist(); room['_events'].on('lock', lockRoom.bind(undefined, room)); room['_events'].on('unlock', unlockRoom.bind(undefined, room)); room['_events'].on('join', onClientJoinRoom.bind(undefined, room)); room['_events'].on('leave', onClientLeaveRoom.bind(undefined, room)); room['_events'].once('dispose', disposeRoom.bind(undefined, roomName, room)); if (handler.realtimeListingEnabled) { room['_events'].on('visibility-change', onVisibilityChange.bind(undefined, room)); room['_events'].on('metadata-change', onMetadataChange.bind(undefined, room)); } // when disconnect()'ing, keep only join/leave events for stat counting room['_events'].once('disconnect', () => { room['_events'].removeAllListeners('lock'); room['_events'].removeAllListeners('unlock'); room['_events'].removeAllListeners('dispose'); if (handler.realtimeListingEnabled) { room['_events'].removeAllListeners('visibility-change'); room['_events'].removeAllListeners('metadata-change'); } // // emit "no active rooms" event when there are no more rooms in this process // (used during graceful shutdown) // if (stats.local.roomCount <= 0) { events.emit('no-active-rooms'); } }); // room always start unlocked await createRoomReferences(room, true); // persist room data only if match-making is enabled if (state !== MatchMakerState.SHUTTING_DOWN) { await driver.persist(room['_listing'], true); } handler.emit('create', room); return room['_listing']; } /** * Get room data by roomId. * This method does not return the actual room instance, use `getLocalRoomById` for that. */ export function getRoomById(roomId: string) { return driver.findOne({ roomId }); } /** * Get local room instance by roomId. (Can return "undefined" if the room is not available on this process) */ export function getLocalRoomById(roomId: string) { return rooms[roomId]; } /** * Disconnects every client on every room in the current process. */ export function disconnectAll(closeCode?: number) { const promises: Array> = []; for (const roomId in rooms) { if (!rooms.hasOwnProperty(roomId)) { continue; } promises.push(rooms[roomId].disconnect(closeCode)); } return promises; } async function lockAndDisposeAll(): Promise { // remove processId from room count key // (stops accepting new rooms on this process) await stats.excludeProcess(processId); // clear auto-persisting stats interval if (enableHealthChecks) { stats.clearAutoPersistInterval(); } const noActiveRooms = new Deferred(); if (stats.local.roomCount <= 0) { // no active rooms to dispose noActiveRooms.resolve(); } else { // wait for all rooms to be disposed // TODO: set generous timeout in case events.once('no-active-rooms', () => noActiveRooms.resolve()); } // - lock all local rooms to prevent new joins // - trigger `onBeforeShutdown()` on each room for (const roomId in rooms) { if (!rooms.hasOwnProperty(roomId)) { continue; } const room = rooms[roomId]; room.lock(); if (isDevMode) { // call default implementation of onBeforeShutdown() in dev mode Room.prototype.onBeforeShutdown.call(room); } else { // call custom implementation of onBeforeShutdown() in production room.onBeforeShutdown(); } } await noActiveRooms; } export async function gracefullyShutdown(): Promise { if (state === MatchMakerState.SHUTTING_DOWN) { return Promise.reject('already_shutting_down'); } debugMatchMaking(`${processId} is shutting down!`); state = MatchMakerState.SHUTTING_DOWN; onReady = undefined; if (isDevMode) { // Reject pending allowReconnection() deferreds BEFORE caching. // This should trigger a state cleanup via user's onLeave (e.g. players.delete) // so the cached state doesn't contain stale player data from clients for (const roomId in rooms) { if (!rooms.hasOwnProperty(roomId)) { continue; } rooms[roomId]['_rejectPendingReconnections']?.("devmode_restart"); } // Wait for async onLeave handlers to finish state cleanup. await new Promise(resolve => setTimeout(resolve, 50)); await cacheRoomHistory(rooms); } // - lock existing rooms // - stop accepting new rooms on this process // - wait for all rooms to be disposed await lockAndDisposeAll(); // make sure rooms are removed from cache await removeRoomsByProcessId(processId); // unsubscribe from process id channel presence.unsubscribe(getProcessChannel()); // make sure all rooms are disposed return Promise.all(disconnectAll( (isDevMode) ? CloseCode.MAY_TRY_RECONNECT : CloseCode.SERVER_SHUTDOWN )); } /** * DO NOT USE THIS IN PRODUCTION. * THIS METHOD IS MEANT TO BE USED FOR VITE DEV SERVER ONLY. * --------------------------------------------------------- * * * Lightweight HMR reload for dev mode. * * Unlike gracefullyShutdown() + setup() + accept(), this preserves the * matchMaker infrastructure (presence, driver, IPC subscriptions, processId) * and only cycles room instances: cache state → dispose → restore. */ export async function hotReload(): Promise { state = MatchMakerState.SHUTTING_DOWN; // Reject pending allowReconnection() deferreds BEFORE caching. // Triggers onLeave state cleanup so cached state is clean. for (const roomId in rooms) { if (!rooms.hasOwnProperty(roomId)) { continue; } rooms[roomId]['_rejectPendingReconnections']?.("devmode_restart"); } // Wait for async onLeave handlers to finish state cleanup. await new Promise(resolve => setTimeout(resolve, 50)); await cacheRoomHistory(rooms); // Lock all rooms and trigger default onBeforeShutdown (dev mode impl). const noActiveRooms = new Deferred(); if (stats.local.roomCount <= 0) { noActiveRooms.resolve(); } else { events.once('no-active-rooms', () => noActiveRooms.resolve()); } for (const roomId in rooms) { if (!rooms.hasOwnProperty(roomId)) { continue; } const room = rooms[roomId]; room.lock(); Room.prototype.onBeforeShutdown.call(room); } await noActiveRooms; // Disconnect all clients — they will auto-reconnect. await Promise.all(disconnectAll(CloseCode.MAY_TRY_RECONNECT)); // Clear driver cache so reloadFromCache() can recreate rooms. await removeRoomsByProcessId(processId); // Restore rooms from cached state. state = MatchMakerState.READY; await reloadFromCache(); await stats.persist(); } /** * Reserve a seat for a client in a room */ export async function reserveSeatFor(room: IRoomCache, options: ClientOptions, authData?: any) { const sessionId: string = authData?.sessionId || generateId(); let successfulSeatReservation: boolean; try { successfulSeatReservation = await remoteRoomCall( room.roomId, '_reserveSeat' as keyof Room, [sessionId, options, authData], REMOTE_ROOM_SHORT_TIMEOUT, ); } catch (e: any) { debugMatchMaking(e); // // the room cache from an unavailable process might've been used here. // (this is a broken state when a process wasn't gracefully shut down) // perform a health-check on the process before proceeding. // if ( e.message === "ipc_timeout" && !( enableHealthChecks && await healthCheckProcessId(room.processId) ) ) { throw new SeatReservationError(`process ${room.processId} is not available.`); } else { successfulSeatReservation = false; } } if (!successfulSeatReservation) { throw new SeatReservationError(`${room.roomId} is already full.`); } return buildSeatReservation(room, sessionId); } /** * Reserve multiple seats for clients in a room */ export async function reserveMultipleSeatsFor(room: IRoomCache, clientsData: Array<{ sessionId: string, options: ClientOptions, auth: any }>) { let sessionIds: string[] = []; let options: ClientOptions[] = []; let authData: any[] = []; for (const clientData of clientsData) { sessionIds.push(clientData.sessionId); options.push(clientData.options); authData.push(clientData.auth); } debugMatchMaking( 'reserving multiple seats. sessionIds: \'%s\', roomId: \'%s\', processId: \'%s\'', sessionIds.join(', '), room.roomId, processId, ); let successfulSeatReservations: boolean[]; try { successfulSeatReservations = await remoteRoomCall( room.roomId, '_reserveMultipleSeats' as keyof Room, [sessionIds, options, authData], REMOTE_ROOM_SHORT_TIMEOUT, ); } catch (e: any) { debugMatchMaking(e); // // the room cache from an unavailable process might've been used here. // (this is a broken state when a process wasn't gracefully shut down) // perform a health-check on the process before proceeding. // if ( e.message === "ipc_timeout" && !( enableHealthChecks && await healthCheckProcessId(room.processId) ) ) { throw new SeatReservationError(`process ${room.processId} is not available.`); } else { throw new SeatReservationError(`${room.roomId} is already full.`); } } return successfulSeatReservations; } /** * Build a seat reservation object. * @param room - The room to build a seat reservation for. * @param sessionId - The session ID of the client. * @returns A seat reservation object. */ export function buildSeatReservation(room: IRoomCache, sessionId: string) { const seatReservation: ISeatReservation = { name: room.name, sessionId, roomId: room.roomId, processId: room.processId, }; if (isDevMode) { seatReservation.devMode = isDevMode; } if (room.publicAddress) { seatReservation.publicAddress = room.publicAddress; } return seatReservation; } async function callOnAuth(roomName: string, clientOptions?: ClientOptions, authContext?: AuthContext) { const roomClass = getRoomClass(roomName); if (roomClass && roomClass['onAuth'] && roomClass['onAuth'] !== Room['onAuth']) { const result = await roomClass['onAuth'](authContext.token, clientOptions, authContext) if (!result) { throw new ServerError(ErrorCode.AUTH_FAILED, 'onAuth failed'); } return result; } } /** * Perform health check on all processes */ export async function healthCheckAllProcesses() { const allStats = await stats.fetchAll(); const activeProcessChannels = (await presence.channels("p:*")).map(c => c.substring(2)); if (allStats.length > 0) { await Promise.all( allStats .filter(stat => ( stat.processId !== processId && // skip current process !activeProcessChannels.includes(stat.processId) // skip if channel is still listening )) .map(stat => healthCheckProcessId(stat.processId)) ); } } /** * Perform health check on a remote process * @param processId */ const _healthCheckByProcessId: { [processId: string]: Promise } = {}; export function healthCheckProcessId(processId: string) { // // re-use the same promise if health-check is already in progress // (may occur when _reserveSeat() fails multiple times for the same 'processId') // if (_healthCheckByProcessId[processId] !== undefined) { return _healthCheckByProcessId[processId]; } _healthCheckByProcessId[processId] = new Promise(async (resolve, reject) => { logger.debug(`> Performing health-check against processId: '${processId}'...`); try { const requestTime = Date.now(); await requestFromIPC( presence, getProcessChannel(processId), 'healthcheck', [], REMOTE_ROOM_SHORT_TIMEOUT, ); logger.debug(`āœ… Process '${processId}' successfully responded (${Date.now() - requestTime}ms)`); // succeeded to respond resolve(true) } catch (e) { // process failed to respond - remove it from stats logger.debug(`āŒ Process '${processId}' failed to respond. Cleaning it up.`); await stats.excludeProcess(processId); // clean-up possibly stale room ids if (!isDevMode) { await removeRoomsByProcessId(processId); } resolve(false); } finally { delete _healthCheckByProcessId[processId]; } }); return _healthCheckByProcessId[processId]; } /** * Remove cached rooms by processId * @param processId */ async function removeRoomsByProcessId(processId: string) { // // clean-up possibly stale room ids // (ungraceful shutdowns using Redis can result on stale room ids still on memory.) // await driver.cleanup(processId); } async function createRoomReferences(room: Room, init: boolean = false): Promise { rooms[room.roomId] = room; if (init) { await subscribeIPC( presence, getRoomChannel(room.roomId), (method, args) => { return (!args && typeof (room[method]) !== 'function') ? room[method] : room[method].apply(room, args); }, ); } return true; } /** * Used only during `joinOrCreate` to handle concurrent requests for creating a room. */ async function concurrentJoinOrCreateRoomLock( handler: RegisteredHandler, concurrencyKey: string, callback: (roomId?: string) => Promise ): Promise { return new Promise(async (resolve, reject) => { const hkey = getConcurrencyHashKey(handler.name); const concurrency = await presence.hincrbyex( hkey, concurrencyKey, 1, // increment by 1 MAX_CONCURRENT_CREATE_ROOM_WAIT_TIME * 2 // expire in 2x the time of MAX_CONCURRENT_CREATE_ROOM_WAIT_TIME ) - 1; // do not consider the current request const fulfill = async (roomId?: string) => { try { resolve(await callback(roomId)); } catch (e) { reject(e); } finally { await presence.hincrbyex(hkey, concurrencyKey, -1, MAX_CONCURRENT_CREATE_ROOM_WAIT_TIME * 2); } }; if (concurrency > 0) { debugMatchMaking( 'receiving %d concurrent joinOrCreate for \'%s\' (%s)', concurrency, handler.name, concurrencyKey ); try { const roomId = await subscribeWithTimeout( presence, `concurrent:${handler.name}:${concurrencyKey}`, (MAX_CONCURRENT_CREATE_ROOM_WAIT_TIME + (Math.min(concurrency, 3) * 0.2)) * 1000 // convert to milliseconds ); return await fulfill(roomId); } catch (error) { // Ignore ipc_timeout error } } return await fulfill(); }); } function onClientJoinRoom(room: Room, client: Client) { // increment local CCU stats.local.ccu++; stats.persist(); handlers[room.roomName].emit('join', room, client); } function onClientLeaveRoom(room: Room, client: Client, willDispose: boolean) { // decrement local CCU stats.local.ccu--; stats.persist(); handlers[room.roomName].emit('leave', room, client, willDispose); } function lockRoom(room: Room): void { // emit public event on registered handler handlers[room.roomName].emit('lock', room); } async function unlockRoom(room: Room) { if (await createRoomReferences(room)) { // emit public event on registered handler handlers[room.roomName].emit('unlock', room); } } function onVisibilityChange(room: Room, isInvisible: boolean): void { handlers[room.roomName].emit('visibility-change', room, isInvisible); } function onMetadataChange(room: Room): void { handlers[room.roomName].emit('metadata-change', room); } async function disposeRoom(roomName: string, room: Room) { debugMatchMaking('disposing \'%s\' (%s) on processId \'%s\' (graceful shutdown: %s)', roomName, room.roomId, processId, state === MatchMakerState.SHUTTING_DOWN); // // FIXME: this call should not be necessary. // // there's an unidentified edge case using LocalDriver where Room._dispose() // doesn't seem to be called [?], but "disposeRoom" is, leaving the matchmaker // in a broken state. (repeated ipc_timeout's for seat reservation on // non-existing rooms) // driver.remove(room['_listing'].roomId); stats.local.roomCount--; // decrease amount of rooms this process is handling if (state !== MatchMakerState.SHUTTING_DOWN) { stats.persist(); // remove from devMode restore list if (isDevMode) { await presence.hdel(getRoomRestoreListKey(), room.roomId); } } // emit disposal on registered session handler handlers[roomName].emit('dispose', room); // unsubscribe from remote connections presence.unsubscribe(getRoomChannel(room.roomId)); // remove actual room reference delete rooms[room.roomId]; } // // Presence keys // function getRoomChannel(roomId: string) { return `$${roomId}`; } function getConcurrencyHashKey(roomName: string) { // concurrency hash return `ch:${roomName}`; } function getProcessChannel(id: string = processId) { return `p:${id}`; }