import { MongoClient, Db, Document, FindCursor, WithId } from 'mongodb'; import Moment from 'moment'; import { logger } from './'; import { SDP } from 'saferxlib'; import cfg from '../config'; import { RabbitMQ } from './RabbitMQ'; import { environment } from '../config/model'; const mongoConfig = cfg.mongodb; const afterMinutes = cfg.afterMinutes; const rabbitConfig = cfg.rabbitmq; const env = cfg.env; const rabbitMQ = new RabbitMQ(); let mongoSdpc: MongoClient; export const zombieCleanupLambda = async (): Promise => { try { await rabbitMQ.connect(rabbitConfig.url); mongoSdpc = await connectToMongo(); // Create interval for zombie deletions - default = 60 minutes if (!afterMinutes) logger.error({ message: 'CANDIDATE_FOR_DELETION_AFTER_MINUTES is not defined!' }); const updatedAt: Date = Moment() .subtract(afterMinutes ?? 60, 'm') .toDate(); const [zombieCandidatesCount, activeAgentsCount]: [number, number] = await Promise.all([ mongoSdpc .db() .collection('SDPC_NetworkDeviceChannel') .countDocuments({ updatedAt: { $lt: updatedAt }, type: 'client' }), mongoSdpc .db() .collection('SDPC_NetworkDeviceChannel') .countDocuments({ updatedAt: { $gte: updatedAt }, type: 'client' }), ]); logger.info({ message: 'Zombie cleanup lambda candidate count', zombieCandidatesCount }); logger.info({ message: 'System wide active agents count', activeAgentsCount }); if (!zombieCandidatesCount) { logger.warn({ message: 'No zombie candidates found!' }); await rabbitMQ.disconnect(); return; } // Retrieve all "zombie" candidates const zombieCandidates: FindCursor> = await mongoSdpc .db() .collection('SDPC_NetworkDeviceChannel') .find( { updatedAt: { $lt: updatedAt }, type: 'client', }, { projection: { deviceId: 1, tenantId: 1, type: 1 } }, ) .limit(500); // Send each candidate as global event message for await (const doc of zombieCandidates) { logger.info({ message: 'sendCandidateToGlobalEventQueue', doc, time: new Date() }); sendCandidateToGlobalEventQueue(doc); } } catch (error) { logger.error({ message: `ZombieCleanupLambda#zombieCleanupLambda`, error }); } finally { await Promise.allSettled([rabbitMQ.disconnect(), mongoSdpc && mongoSdpc.close()]); } }; export const connectToMongo = async (): Promise => { const mongoUrl = env === environment.test ? process.env.MONGO_SDPC : mongoConfig.url; const client = new MongoClient(mongoUrl!, { maxPoolSize: 1, connectTimeoutMS: 8000, serverSelectionTimeoutMS: 8000, }); return client.connect(); }; /** * Sends a message to a RabbitMQ exchange with information about a "zombie candidate." * @param {Document} candidate - The candidate object containing tenantId, deviceId, and userId. * @returns {Promise} - A promise that resolves when the message is published. */ export const sendCandidateToGlobalEventQueue = async (candidate: Document): Promise => { // Destructure tenantId, deviceId, and userId from candidate object const { tenantId, deviceId, type } = candidate; logger.debug({ message: `Zombie candidate`, tenantId, deviceId, safeMode: cfg.safeMode, }); // Check if safe mode is not set if (!cfg.safeMode) { // Create payload object const payload = { device: { deviceId, tenantId, type }, stateReasonCode: 0x0e, }; // Publish message to RabbitMQ exchange try { await rabbitMQ.publish( rabbitConfig.globalEventChannel.exchangeName, rabbitConfig.globalEventChannel.routingKey, SDP.SystemEventMessage.V1(SDP.SystemEventEnum.USERSTATE_ONDELETE, payload), ); } catch (error) { logger.error({ message: `Error publishing message to RabbitMQ`, error }); } } };