import fs from 'fs/promises'; import { createWriteStream } from 'fs'; import path from 'path'; import zlib from 'zlib'; import { promisify } from 'util'; import { Job, HourJob } from '../models/job'; import { Config } from '../types/config'; import { generateFolderPath } from './folderStructure'; const gzip = promisify(zlib.gzip); const brotliCompress = promisify(zlib.brotliCompress); export async function saveFile(job: Job, hourJob: HourJob, data: any, format: string, config?: Config) { // Generate folder path based on configuration const folder = generateFolderPath(job.date, config, hourJob.hour_range); await fs.mkdir(folder, { recursive: true }); let fileName = hourJob.file_name.replace('.json', `.${format}`); let filePath = path.join(folder, fileName); // Generate content with memory protection let content = ''; if (format === 'json') { // Memory-safe JSON generation for large datasets const dataArray = Array.isArray(data) ? data : [data]; const dataCount = dataArray.length; if (dataCount > 300) { // Very large dataset: Use streaming write to prevent memory overflow console.log(`🚨 Very large JSON dataset (${dataCount} items). Using streaming write to prevent memory overflow.`); return await writeStreamingJSON(dataArray, filePath, config); } else if (dataCount > 100) { // Large dataset: Use chunked JSON writing to prevent memory spike console.log(`⚠️ Large JSON dataset (${dataCount} items). Using memory-safe chunked writing.`); // Build JSON manually in chunks to avoid memory spike from JSON.stringify const chunks = []; chunks.push('[\n'); for (let i = 0; i < dataArray.length; i++) { const item = dataArray[i]; try { // Stringify individual items to avoid large memory allocation const itemJson = JSON.stringify(item, null, 2); chunks.push(' ' + itemJson.split('\n').join('\n ')); // Indent properly if (i < dataArray.length - 1) { chunks.push(',\n'); } else { chunks.push('\n'); } // Memory protection: Clear processed items and check memory every 50 items if (i % 50 === 0 && i > 0) { const memUsage = process.memoryUsage(); const memUsedMB = Math.round(memUsage.heapUsed / 1024 / 1024); console.log(`💾 JSON processing: ${i}/${dataCount} items, Memory: ${memUsedMB}MB`); // Force garbage collection opportunity if (global.gc) { global.gc(); } } } catch (error) { console.error(`❌ Error stringifying item ${i}:`, error); chunks.push(' null'); // Fallback for problematic items if (i < dataArray.length - 1) chunks.push(',\n'); } } chunks.push(']\n'); content = chunks.join(''); console.log(`✅ Memory-safe JSON generation completed for ${dataCount} items`); } else { // Small dataset: Use regular JSON.stringify content = JSON.stringify(data, null, 2); } } else if (format === 'csv') content = toCSV(data); else if (format === 'txt') content = toTxt(data); // Memory-safe file writing const contentSize = Buffer.byteLength(content, 'utf8'); const contentSizeMB = Math.round(contentSize / 1024 / 1024 * 100) / 100; console.log(`📁 Writing file: ${fileName} (${contentSizeMB}MB)`); // Check if compression is enabled // If fileSize is set, only compress files >= that size // If fileSize is not set or 0, compress all files when enabled const minFileSize = config?.compression?.fileSize ?? 0; const shouldCompress = config?.compression?.enabled && (minFileSize === 0 || contentSize >= minFileSize); if (shouldCompress) { console.log(`🗜️ Compressing file: ${contentSizeMB}MB → compression enabled`); const compressionFormat = config.compression?.format || 'gzip'; const compressionLevel = config.compression?.level || 6; let compressedData: Buffer; let extension: string; // Memory usage before compression const memBefore = process.memoryUsage(); const memBeforeMB = Math.round(memBefore.heapUsed / 1024 / 1024); console.log(`💾 Memory before compression: ${memBeforeMB}MB`); switch (compressionFormat) { case 'gzip': compressedData = await gzip(content, { level: compressionLevel }); extension = '.gz'; break; case 'brotli': compressedData = await new Promise((resolve, reject) => { zlib.brotliCompress(Buffer.from(content, 'utf8'), { params: { [zlib.constants.BROTLI_PARAM_QUALITY]: compressionLevel } }, (err, result) => { if (err) reject(err); else resolve(result); }); }); extension = '.br'; break; case 'zip': // For zip, we'll use gzip as it's simpler for single files compressedData = await gzip(content, { level: compressionLevel }); extension = '.gz'; break; default: compressedData = await gzip(content, { level: compressionLevel }); extension = '.gz'; } // Memory usage after compression const memAfter = process.memoryUsage(); const memAfterMB = Math.round(memAfter.heapUsed / 1024 / 1024); const compressedSizeMB = Math.round(compressedData.length / 1024 / 1024 * 100) / 100; console.log(`💾 Memory after compression: ${memAfterMB}MB`); console.log(`🗜️ Compression result: ${contentSizeMB}MB → ${compressedSizeMB}MB`); fileName += extension; filePath = path.join(folder, fileName); await fs.writeFile(filePath, compressedData); // Clear compression data from memory immediately compressedData = null as any; // Force garbage collection if available if (global.gc) { global.gc(); const memFinal = process.memoryUsage(); const memFinalMB = Math.round(memFinal.heapUsed / 1024 / 1024); console.log(`💾 Memory after cleanup: ${memFinalMB}MB`); } } else { await fs.writeFile(filePath, content); } // Clear content from memory content = null as any; console.log(`✅ File written successfully: ${filePath}`); return filePath; } /** * 🌊 Streaming JSON writer for extremely large datasets * Prevents memory overflow by writing data piece by piece */ async function writeStreamingJSON(dataArray: any[], filePath: string, config?: Config): Promise { return new Promise((resolve, reject) => { const writeStream = createWriteStream(filePath); let currentIndex = 0; writeStream.on('error', reject); writeStream.on('finish', () => { console.log(`✅ Streaming JSON write completed: ${filePath}`); resolve(filePath); }); // Start JSON array writeStream.write('[\n'); const writeNextChunk = () => { let chunkCount = 0; const CHUNK_SIZE = 10; // Write 10 items at a time while (currentIndex < dataArray.length && chunkCount < CHUNK_SIZE) { const item = dataArray[currentIndex]; try { const itemJson = JSON.stringify(item, null, 2); const indentedJson = ' ' + itemJson.split('\n').join('\n '); writeStream.write(indentedJson); if (currentIndex < dataArray.length - 1) { writeStream.write(',\n'); } else { writeStream.write('\n'); } currentIndex++; chunkCount++; } catch (error) { console.error(`❌ Error writing item ${currentIndex}:`, error); writeStream.write(' null'); if (currentIndex < dataArray.length - 1) writeStream.write(',\n'); currentIndex++; chunkCount++; } } // Log progress if (currentIndex % 50 === 0 || currentIndex >= dataArray.length) { const memUsage = process.memoryUsage(); const memUsedMB = Math.round(memUsage.heapUsed / 1024 / 1024); console.log(`🌊 Streaming progress: ${currentIndex}/${dataArray.length} items, Memory: ${memUsedMB}MB`); } if (currentIndex < dataArray.length) { // Continue writing next chunk setImmediate(writeNextChunk); } else { // Finish JSON array and close stream writeStream.write(']\n'); writeStream.end(); } }; // Start writing writeNextChunk(); }); } /** * 🗜️ Apply compression to a file if compression is enabled * This is used for streaming writes where compression happens after file is written */ async function applyCompressionIfNeeded(filePath: string, originalFileName: string, config?: Config): Promise { // Check if compression is enabled if (!config?.compression?.enabled) { return filePath; // No compression needed } try { // Get file stats to check size const fileStats = await fs.stat(filePath); const fileSize = fileStats.size; const minFileSize = config.compression?.fileSize ?? 0; // Check if file meets size requirements if (minFileSize > 0 && fileSize < minFileSize) { console.log(`📦 File size (${fileSize} bytes) is below minimum compression size (${minFileSize} bytes). Skipping compression.`); return filePath; } const fileSizeMB = Math.round(fileSize / 1024 / 1024 * 100) / 100; console.log(`🗜️ Compressing streamed file: ${fileSizeMB}MB → compression enabled`); // Read the file let fileContent = await fs.readFile(filePath); // Compress the file const compressionFormat = config.compression?.format || 'gzip'; const compressionLevel = config.compression?.level || 6; let compressedData: Buffer; let extension: string; // Memory usage before compression const memBefore = process.memoryUsage(); const memBeforeMB = Math.round(memBefore.heapUsed / 1024 / 1024); console.log(`💾 Memory before compression: ${memBeforeMB}MB`); switch (compressionFormat) { case 'gzip': compressedData = await gzip(fileContent, { level: compressionLevel }); extension = '.gz'; break; case 'brotli': compressedData = await new Promise((resolve, reject) => { zlib.brotliCompress(fileContent, { params: { [zlib.constants.BROTLI_PARAM_QUALITY]: compressionLevel } }, (err: Error | null, result: Buffer) => { if (err) reject(err); else resolve(result); }); }); extension = '.br'; break; case 'zip': // For zip, we'll use gzip as it's simpler for single files compressedData = await gzip(fileContent, { level: compressionLevel }); extension = '.gz'; break; default: compressedData = await gzip(fileContent, { level: compressionLevel }); extension = '.gz'; } // Memory usage after compression const memAfter = process.memoryUsage(); const memAfterMB = Math.round(memAfter.heapUsed / 1024 / 1024); const compressedSizeMB = Math.round(compressedData.length / 1024 / 1024 * 100) / 100; console.log(`💾 Memory after compression: ${memAfterMB}MB`); console.log(`🗜️ Compression result: ${fileSizeMB}MB → ${compressedSizeMB}MB (${((1 - compressedData.length / fileSize) * 100).toFixed(1)}% reduction)`); // Create compressed file path const compressedFilePath = filePath + extension; // Write compressed file await fs.writeFile(compressedFilePath, compressedData); // Delete original uncompressed file await fs.unlink(filePath); // Clear compression data from memory (help GC) compressedData = null as any; fileContent = null as any; // Force garbage collection if available if (global.gc) { global.gc(); const memFinal = process.memoryUsage(); const memFinalMB = Math.round(memFinal.heapUsed / 1024 / 1024); console.log(`💾 Memory after cleanup: ${memFinalMB}MB`); } console.log(`✅ File compressed successfully: ${compressedFilePath}`); return compressedFilePath; } catch (error) { console.error(`❌ Error compressing streamed file: ${error}`); // Return original file path if compression fails return filePath; } } function toCSV(data: any): string { if (!Array.isArray(data)) return ''; const keys = Object.keys(data[0] || {}); const rows = [keys.join(',')]; for (const row of data) rows.push(keys.map(k => row[k]).join(',')); return rows.join('\n'); } function toTxt(data: any): string { if (Array.isArray(data)) return data.map(String).join('\n'); return String(data); }