import mongoose from 'mongoose'; import { getConfig } from '../src/main'; export interface FlexibleApiLogProvider { getApiLogsByHour(date: Date, hour: number): Promise; getCollectionName(): string; getTimestampField(): string; validateConnection(): Promise; } export class ExistingCollectionProvider implements FlexibleApiLogProvider { private collectionName: string; private timestampField: string; private requiredFields: any; constructor( collectionName: string, timestampField: string, requiredFields?: any ) { this.collectionName = collectionName; this.timestampField = timestampField; this.requiredFields = requiredFields || {}; } async getApiLogsByHour(date: Date, hour: number): Promise { const startDate = new Date(date); startDate.setHours(hour, 0, 0, 0); const endDate = new Date(date); endDate.setHours(hour + 1, 0, 0, 0); const collection = mongoose.connection.collection(this.collectionName); // Build query with flexible timestamp field matching const query = { $or: [ { [this.timestampField]: { $gte: startDate, $lt: endDate } }, // Fallback for different timestamp formats { request_time: { $gte: startDate, $lt: endDate } }, { createdAt: { $gte: startDate, $lt: endDate } }, { created_at: { $gte: startDate, $lt: endDate } }, { timestamp: { $gte: startDate, $lt: endDate } } ] }; // Memory protection: Check count first const totalCount = await collection.countDocuments(query); console.log(`📊 Found ${totalCount} logs for ${date.toDateString()} hour ${hour} from collection zee "${this.collectionName}"`); // 🚨 MEMORY PROTECTION - More reasonable limits for complete hour data const MEMORY_SAFE_LIMIT = totalCount > 50000 ? 10000 : totalCount > 10000 ? 5000 : totalCount; if (totalCount > 10000) { console.log(`⚠️ Large dataset detected (${totalCount} logs). Using safe limit: ${MEMORY_SAFE_LIMIT}`); console.log(`🛡️ Memory protection: Processing ${MEMORY_SAFE_LIMIT} logs for complete hour coverage`); } else if (totalCount > MEMORY_SAFE_LIMIT) { console.log(`⚠️ Dataset detected (${totalCount} logs). Limiting to ${MEMORY_SAFE_LIMIT} for memory safety`); console.log(`🛡️ Processing ${MEMORY_SAFE_LIMIT} logs for better hour coverage`); } // 🌊 STREAMING APPROACH - Never load all data at once! const logs: any[] = []; const cursor = collection.find(query) .sort({ [this.timestampField]: 1 }) // Get oldest logs first (ASCENDING ORDER) .limit(Math.min(totalCount, MEMORY_SAFE_LIMIT)); let processedCount = 0; const memUsageInitial = process.memoryUsage(); const memInitialMB = Math.round(memUsageInitial.heapUsed / 1024 / 1024); console.log(`🌊 Starting streaming cursor processing. Initial memory: ${memInitialMB}MB`); try { // Process documents one by one to prevent memory overflow // Use for-await-of for better performance and proper break control for await (const doc of cursor) { logs.push(doc); processedCount++; // Memory check every 100 documents (reduced for performance) if (processedCount % 100 === 0) { const memUsage = process.memoryUsage(); const memUsedMB = Math.round(memUsage.heapUsed / 1024 / 1024); console.log(`🌊 Processed ${processedCount}/${Math.min(totalCount, MEMORY_SAFE_LIMIT)} logs, Memory: ${memUsedMB}MB`); // EMERGENCY BRAKE: If memory exceeds safe threshold, stop processing if (memUsedMB > (memInitialMB + 200)) { // Allow max 200MB increase console.log(`🚨 MEMORY EMERGENCY BRAKE: Stopping at ${processedCount} logs (Memory: ${memUsedMB}MB)`); break; // Properly break the loop } } } } catch (error) { console.error(`❌ Streaming cursor error at document ${processedCount}:`, error); } finally { // Ensure cursor is closed await cursor.close(); } const finalMemUsage = process.memoryUsage(); const finalMemMB = Math.round(finalMemUsage.heapUsed / 1024 / 1024); console.log(`📋 Streaming completed: ${logs.length}/${totalCount} logs, Final memory: ${finalMemMB}MB`); if (totalCount > MEMORY_SAFE_LIMIT) { console.log(`💾 Memory usage optimized - processed ${logs.length}/${totalCount} logs`); } return logs; } getCollectionName(): string { return this.collectionName; } getTimestampField(): string { return this.timestampField; } async validateConnection(): Promise { try { const collection = mongoose.connection.collection(this.collectionName); const count = await collection.countDocuments(); console.log(`✅ Connected to existing collection "${this.collectionName}" with ${count} documents`); return true; } catch (error) { console.error(`❌ Failed to connect to collection "${this.collectionName}":`, error); return false; } } } export class NewCollectionProvider implements FlexibleApiLogProvider { private collectionName: string; private model: any; constructor(collectionName: string = 'apilogs') { this.collectionName = collectionName; // Import the model dynamically to avoid circular dependencies try { const { ApiLogModel } = require('../models/apiLog'); this.model = ApiLogModel; } catch (error) { console.warn('ApiLogModel not available, using direct collection access'); } } async getApiLogsByHour(date: Date, hour: number): Promise { const startDate = new Date(date); startDate.setHours(hour, 0, 0, 0); const endDate = new Date(date); endDate.setHours(hour + 1, 0, 0, 0); const query = { $or: [ { request_time: { $gte: startDate, $lt: endDate } }, { createdAt: { $gte: startDate, $lt: endDate } } ] }; let totalCount = 0; let logs: any[] = []; if (this.model) { // Use the model if available - check count first totalCount = await this.model.countDocuments(query); console.log(`📊 Found ${totalCount} logs for ${date.toDateString()} hour ${hour} from new collection "${this.collectionName}"`); // 🚨 MEMORY PROTECTION - More reasonable limits for complete hour data const MEMORY_SAFE_LIMIT = totalCount > 50000 ? 10000 : totalCount > 10000 ? 5000 : totalCount; if (totalCount > 10000) { console.log(`⚠️ Large dataset detected (${totalCount} logs). Using safe limit: ${MEMORY_SAFE_LIMIT}`); console.log(`🛡️ Memory protection: Processing ${MEMORY_SAFE_LIMIT} logs for complete hour coverage`); } else if (totalCount > MEMORY_SAFE_LIMIT) { console.log(`⚠️ Dataset detected (${totalCount} logs). Limiting to ${MEMORY_SAFE_LIMIT} for memory safety`); console.log(`🛡️ Processing ${MEMORY_SAFE_LIMIT} logs for better hour coverage`); } // 🌊 STREAMING APPROACH - No .lean() for large datasets const logs: any[] = []; const cursor = this.model.find(query) .sort({ request_time: 1, createdAt: 1 }) // Get oldest first (ASCENDING ORDER) .limit(Math.min(totalCount, MEMORY_SAFE_LIMIT)) .cursor(); // Use cursor instead of lean() to prevent memory spike let processedCount = 0; const memUsageInitial = process.memoryUsage(); const memInitialMB = Math.round(memUsageInitial.heapUsed / 1024 / 1024); console.log(`🌊 Starting model cursor processing. Initial memory: ${memInitialMB}MB`); try { // Use for-await-of for better performance for await (const doc of cursor) { logs.push(doc.toObject ? doc.toObject() : doc); processedCount++; // Memory check every 50 documents (reduced for performance) if (processedCount % 50 === 0) { const memUsage = process.memoryUsage(); const memUsedMB = Math.round(memUsage.heapUsed / 1024 / 1024); console.log(`🌊 Model processed ${processedCount}/${Math.min(totalCount, MEMORY_SAFE_LIMIT)} logs, Memory: ${memUsedMB}MB`); // EMERGENCY BRAKE for models if (memUsedMB > (memInitialMB + 100)) { // Allow max 100MB increase for models console.log(`🚨 MODEL MEMORY EMERGENCY BRAKE: Stopping at ${processedCount} logs (Memory: ${memUsedMB}MB)`); break; } } } } catch (error) { console.error(`❌ Model cursor error at document ${processedCount}:`, error); } finally { await cursor.close(); } // Logs already processed in the loop above } else { // Fallback to direct collection access const collection = mongoose.connection.collection(this.collectionName); totalCount = await collection.countDocuments(query); console.log(`📊 Found ${totalCount} logs for ${date.toDateString()} hour ${hour} from new collection "${this.collectionName}"`); // 🚨 MEMORY PROTECTION - More reasonable limits for complete hour data const MEMORY_SAFE_LIMIT = totalCount > 50000 ? 10000 : totalCount > 10000 ? 5000 : totalCount; if (totalCount > 10000) { console.log(`⚠️ Large dataset detected (${totalCount} logs). Using safe limit: ${MEMORY_SAFE_LIMIT}`); console.log(`🛡️ Memory protection: Processing ${MEMORY_SAFE_LIMIT} logs for complete hour coverage`); } else if (totalCount > MEMORY_SAFE_LIMIT) { console.log(`⚠️ Dataset detected (${totalCount} logs). Limiting to ${MEMORY_SAFE_LIMIT} for memory safety`); console.log(`🛡️ Processing ${MEMORY_SAFE_LIMIT} logs for better hour coverage`); } // 🌊 STREAMING APPROACH for collection fallback - NO .toArray()! const collectionLogs: any[] = []; const cursor = collection.find(query) .sort({ request_time: 1, createdAt: 1 }) // Get oldest first (ASCENDING ORDER) .limit(Math.min(totalCount, MEMORY_SAFE_LIMIT)); let processedCount = 0; const memUsageInitial = process.memoryUsage(); const memInitialMB = Math.round(memUsageInitial.heapUsed / 1024 / 1024); console.log(`🌊 Starting collection cursor processing. Initial memory: ${memInitialMB}MB`); try { // Process documents one by one with for-await-of (better performance) for await (const doc of cursor) { collectionLogs.push(doc); processedCount++; // Memory check every 50 documents (reduced for performance) if (processedCount % 50 === 0) { const memUsage = process.memoryUsage(); const memUsedMB = Math.round(memUsage.heapUsed / 1024 / 1024); console.log(`🌊 Collection processed ${processedCount}/${Math.min(totalCount, MEMORY_SAFE_LIMIT)} logs, Memory: ${memUsedMB}MB`); // EMERGENCY BRAKE for collections if (memUsedMB > (memInitialMB + 80)) { // Allow max 80MB increase for direct collections console.log(`🚨 COLLECTION MEMORY EMERGENCY BRAKE: Stopping at ${processedCount} logs (Memory: ${memUsedMB}MB)`); break; } } } } catch (error) { console.error(`❌ Collection cursor error at document ${processedCount}:`, error); } finally { await cursor.close(); } logs = collectionLogs; } console.log(`� Retrieved ${logs.length}/${totalCount} logs from new collection "${this.collectionName}"`); if (totalCount > 500) { console.log(`💾 Memory usage optimized - processed ${logs.length}/${totalCount} logs`); } return logs; } getCollectionName(): string { return this.collectionName; } getTimestampField(): string { return 'request_time'; // Default for new collections } async validateConnection(): Promise { try { const collection = mongoose.connection.collection(this.collectionName); const count = await collection.countDocuments(); console.log(`✅ Connected to collection "${this.collectionName}" with ${count} documents`); return true; } catch (error) { console.error(`❌ Failed to connect to collection "${this.collectionName}":`, error); return false; } } } export function createApiLogProvider(): FlexibleApiLogProvider { const config = getConfig(); if (config.apiLogs?.existingCollection) { const { name, timestampField, requiredFields } = config.apiLogs.existingCollection; console.log(`🔌 Using existing collection: "${name}" with timestamp field: "${timestampField}"`); return new ExistingCollectionProvider(name, timestampField, requiredFields); } else if (config.apiLogs?.createNew) { const collectionName = config.apiLogs.createNew.collectionName || 'apilogs'; console.log(`🆕 Using new collection: "${collectionName}"`); return new NewCollectionProvider(collectionName); } else { // Default to new collection console.log('📝 No API logs configuration found, using default new collection: "apilogs"'); return new NewCollectionProvider(); } }