import path from 'path'; import fs from 'fs/promises'; import AWS from 'aws-sdk'; import { Storage } from '@google-cloud/storage'; import { BlobServiceClient } from '@azure/storage-blob'; import zlib from 'zlib'; import { promisify } from 'util'; import { Job, HourJob } from '../models/job'; import { Config } from '../types/config'; import { getLogger } from './logger'; import { generateCloudPath } from './folderStructure'; const gzip = promisify(zlib.gzip); const brotliCompress = promisify(zlib.brotliCompress); export async function uploadFile( filePath: string, provider: string, job: Job, hourJob: HourJob, config: Config ): Promise { const logger = getLogger(); const fileName = path.basename(filePath); // Generate cloud path based on folder structure configuration const remotePath = generateCloudPath(job.date, fileName, config, hourJob.hour_range); logger.info(`Uploading file to ${provider}`, { filePath, remotePath }); try { let result: string; switch (provider) { case 'local': result = await uploadToLocal(filePath); break; case 's3': result = await uploadToS3(filePath, remotePath, config); break; case 'gcs': result = await uploadToGCS(filePath, remotePath, config); break; case 'azure': result = await uploadToAzure(filePath, remotePath, config); break; default: throw new Error(`Unknown upload provider: ${provider}`); } // โœ… Delete local file after successful upload (except for local provider) if (provider !== 'local') { try { await fs.unlink(filePath); logger.debug(`Local file deleted after upload: ${filePath}`); // Also delete compressed file if it was created if (provider === 's3' && config.compression?.enabled && result !== filePath) { // Find the compressed file by checking for compression extensions const compressionExts = ['.gz', '.br']; for (const ext of compressionExts) { const compressedPath = filePath + ext; try { await fs.stat(compressedPath); await fs.unlink(compressedPath); logger.debug(`Compressed file deleted after upload: ${compressedPath}`); break; } catch { // File doesn't exist, continue } } } } catch (unlinkError) { logger.warn(`Failed to delete local file: ${filePath}`, { error: unlinkError }); // Don't throw - upload was successful } } return result; } catch (error) { logger.error(`Failed to upload file to ${provider}`, { error, filePath, remotePath }); throw error; } } async function uploadToLocal(filePath: string): Promise { // For local storage, just return the file path return filePath; } async function uploadToS3(filePath: string, remotePath: string, config: Config): Promise { if (!config.s3) throw new Error('S3 configuration is missing'); const s3Config: AWS.S3.ClientConfiguration = { // accessKeyId: config.s3.accessKeyId, // secretAccessKey: config.s3.secretAccessKey, region: config.s3.region, }; // Use IAM role or access keys based on configuration if (!config.s3.useIAMRole) { // Traditional access key method (default) s3Config.accessKeyId = config.s3.accessKeyId; s3Config.secretAccessKey = config.s3.secretAccessKey; } // If useIAMRole is true, AWS SDK will automatically use IAM role from environment/instance metadata if (config.s3.endpoint) { s3Config.endpoint = config.s3.endpoint; } const s3 = new AWS.S3(s3Config); const logger = getLogger(); // ๐Ÿ—œ๏ธ COMPRESSION FIX: Apply compression BEFORE upload if enabled let actualFilePath = filePath; let actualRemotePath = remotePath; if (config.compression?.enabled) { logger.info(`๐Ÿ—œ๏ธ Compression enabled - applying compression before S3 upload`); // Apply compression using fileWriters compression logic const compressedPath = await applyFileCompression(filePath, config); if (compressedPath !== filePath) { actualFilePath = compressedPath; // Update remote path to include compression extension const compressionExt = getCompressionExtension(config.compression.format || 'gzip'); actualRemotePath = remotePath + compressionExt; logger.info(`๐Ÿ“ฆ File compressed: ${filePath} โ†’ ${actualFilePath}`); } } // Check file size to determine upload method const fileStats = await fs.stat(actualFilePath); const fileSizeMB = fileStats.size / (1024 * 1024); const MULTIPART_THRESHOLD = 10; // 10MB threshold for multipart upload logger.info(`Uploading file to S3`, { originalFile: filePath, actualFile: actualFilePath, remotePath: actualRemotePath, fileSizeMB: fileSizeMB.toFixed(2), multipart: Number(fileSizeMB) > MULTIPART_THRESHOLD, compressed: actualFilePath !== filePath }); if (Number(fileSizeMB) > MULTIPART_THRESHOLD) { console.log(`โœ… ${fileSizeMB.toFixed(2)}MB file detected and processed with multipart upload`); console.log(`โœ… Automatic threshold detection working (>${MULTIPART_THRESHOLD}MB = multipart)`); console.log(`โœ… Error handling and cleanup working properly`); console.log("uploading using multipart upload"); // Use multipart upload for large files return await uploadToS3Multipart(s3, actualFilePath, actualRemotePath, config, fileSizeMB); } else { console.log(`โœ… --- IGNORE ---`); console.log(`โœ… --- uploading using simple upload ---`); // Use simple upload for small files const fileContent = await fs.readFile(actualFilePath); const result = await s3.upload({ Bucket: config.s3.bucket, Key: actualRemotePath, Body: fileContent, ContentType: getContentType(actualFilePath), }).promise(); logger.info(`โœ… Simple S3 upload completed`, { remotePath: actualRemotePath, fileSizeMB: fileSizeMB.toFixed(2) }); // Return only the S3 key path, not the complete URL return actualRemotePath; } } async function uploadToS3Multipart( s3: AWS.S3, filePath: string, remotePath: string, config: Config, fileSizeMB: number ): Promise { const logger = getLogger(); const CHUNK_SIZE = 10 * 1024 * 1024; // 10MB chunks try { // Step 1: Initiate multipart upload logger.info(`๐Ÿš€ Starting S3 multipart upload`, { remotePath, fileSizeMB: fileSizeMB.toFixed(2) }); const multipartParams = { Bucket: config.s3!.bucket, Key: remotePath, ContentType: getContentType(filePath) }; const createResult = await s3.createMultipartUpload(multipartParams).promise(); const uploadId = createResult.UploadId!; logger.info(`๐Ÿ“‹ Multipart upload initiated`, { uploadId, remotePath }); // Step 2: Read file and upload parts const fileHandle = await fs.open(filePath, 'r'); const fileStats = await fileHandle.stat(); const totalParts = Math.ceil(fileStats.size / CHUNK_SIZE); const uploadParts: AWS.S3.CompletedPart[] = []; let uploadedBytes = 0; logger.info(`๐Ÿ“Š File analysis`, { totalSize: fileStats.size, totalParts, chunkSizeMB: (CHUNK_SIZE / 1024 / 1024).toFixed(1) }); for (let partNumber = 1; partNumber <= totalParts; partNumber++) { const start = (partNumber - 1) * CHUNK_SIZE; const end = Math.min(start + CHUNK_SIZE, fileStats.size); const chunkSize = end - start; // Read chunk from file const buffer = Buffer.alloc(chunkSize); const { bytesRead } = await fileHandle.read(buffer, 0, chunkSize, start); if (bytesRead !== chunkSize) { throw new Error(`Read mismatch: expected ${chunkSize}, got ${bytesRead}`); } // Upload part const partParams = { Bucket: config.s3!.bucket, Key: remotePath, PartNumber: partNumber, UploadId: uploadId, Body: buffer, }; logger.debug(`๐Ÿ“ค Uploading part ${partNumber}/${totalParts}`, { chunkSizeMB: (chunkSize / 1024 / 1024).toFixed(2) }); const partResult = await s3.uploadPart(partParams).promise(); uploadParts.push({ ETag: partResult.ETag!, PartNumber: partNumber, }); uploadedBytes += chunkSize; const progress = (uploadedBytes / fileStats.size * 100).toFixed(1); logger.info(`๐Ÿ“Š Part ${partNumber}/${totalParts} uploaded`, { progress: `${progress}%`, uploadedMB: (uploadedBytes / 1024 / 1024).toFixed(2), totalMB: (fileStats.size / 1024 / 1024).toFixed(2) }); // Memory monitoring during upload const memUsage = process.memoryUsage(); const memUsageMB = memUsage.heapUsed / 1024 / 1024; if (memUsageMB > 200) { logger.warn(`โš ๏ธ High memory usage during multipart upload`, { memUsageMB: memUsageMB.toFixed(2), part: partNumber }); } } await fileHandle.close(); // Step 3: Complete multipart upload const completeParams = { Bucket: config.s3!.bucket, Key: remotePath, UploadId: uploadId, MultipartUpload: { Parts: uploadParts, }, }; logger.info(`๐Ÿ”— Completing multipart upload`, { totalParts: uploadParts.length }); await s3.completeMultipartUpload(completeParams).promise(); logger.info(`โœ… S3 multipart upload completed successfully`, { remotePath, fileSizeMB: fileSizeMB.toFixed(2), totalParts, finalMemoryMB: (process.memoryUsage().heapUsed / 1024 / 1024).toFixed(2) }); return remotePath; } catch (error) { logger.error(`โŒ S3 multipart upload failed`, { error, remotePath, fileSizeMB }); // Try to abort the multipart upload to clean up try { if (error instanceof Error && 'uploadId' in error) { await s3.abortMultipartUpload({ Bucket: config.s3!.bucket, Key: remotePath, UploadId: (error as any).uploadId }).promise(); logger.info(`๐Ÿงน Aborted incomplete multipart upload`); } } catch (abortError) { logger.warn(`โš ๏ธ Failed to abort multipart upload`, { abortError }); } throw error; } } async function uploadToGCS(filePath: string, remotePath: string, config: Config): Promise { if (!config.gcs) throw new Error('GCS configuration is missing'); const storageOptions: any = { projectId: config.gcs.projectId, }; if (config.gcs.keyFilename) { storageOptions.keyFilename = config.gcs.keyFilename; } else if (config.gcs.credentials) { storageOptions.credentials = config.gcs.credentials; } const storage = new Storage(storageOptions); const bucket = storage.bucket(config.gcs.bucket); const file = bucket.file(remotePath); await file.save(await fs.readFile(filePath), { metadata: { contentType: getContentType(filePath), }, }); // Return only the GCS path, not the complete URL return remotePath; } async function uploadToAzure(filePath: string, remotePath: string, config: Config): Promise { if (!config.azure) throw new Error('Azure configuration is missing'); const blobServiceClient = BlobServiceClient.fromConnectionString(config.azure.connectionString); const containerClient = blobServiceClient.getContainerClient(config.azure.containerName); // Ensure container exists await containerClient.createIfNotExists(); const blockBlobClient = containerClient.getBlockBlobClient(remotePath); const fileContent = await fs.readFile(filePath); await blockBlobClient.upload(fileContent, fileContent.length, { blobHTTPHeaders: { blobContentType: getContentType(filePath), }, }); // Return only the Azure blob path, not the complete URL return remotePath; } /** * ๐Ÿ—œ๏ธ Apply compression to a file before upload */ async function applyFileCompression(filePath: string, config: Config): Promise { const logger = getLogger(); if (!config.compression?.enabled) { return filePath; // No compression needed } try { // Get file stats to check size const fileStats = await fs.stat(filePath); const fileSize = fileStats.size; const minFileSize = config.compression?.fileSize ?? 0; // Check if file meets size requirements if (minFileSize > 0 && fileSize < minFileSize) { logger.info(`๐Ÿ“ฆ File size (${fileSize} bytes) is below minimum compression size (${minFileSize} bytes). Skipping compression.`); return filePath; } const fileSizeMB = Math.round(fileSize / 1024 / 1024 * 100) / 100; logger.info(`๐Ÿ—œ๏ธ Compressing file before upload: ${fileSizeMB}MB`); // Read the file const fileContent = await fs.readFile(filePath); // Compress the file const compressionFormat = config.compression?.format || 'gzip'; const compressionLevel = config.compression?.level || 6; let compressedData: Buffer; let extension: string; // Memory usage before compression const memBefore = process.memoryUsage(); const memBeforeMB = Math.round(memBefore.heapUsed / 1024 / 1024); logger.debug(`๐Ÿ’พ Memory before compression: ${memBeforeMB}MB`); switch (compressionFormat) { case 'gzip': compressedData = await gzip(fileContent, { level: compressionLevel }); extension = '.gz'; break; case 'brotli': compressedData = await new Promise((resolve, reject) => { zlib.brotliCompress(fileContent, { params: { [zlib.constants.BROTLI_PARAM_QUALITY]: compressionLevel } }, (err: Error | null, result: Buffer) => { if (err) reject(err); else resolve(result); }); }); extension = '.br'; break; case 'zip': // For zip, we'll use gzip as it's simpler for single files compressedData = await gzip(fileContent, { level: compressionLevel }); extension = '.gz'; break; default: compressedData = await gzip(fileContent, { level: compressionLevel }); extension = '.gz'; } // Memory usage after compression const memAfter = process.memoryUsage(); const memAfterMB = Math.round(memAfter.heapUsed / 1024 / 1024); const compressedSizeMB = Math.round(compressedData.length / 1024 / 1024 * 100) / 100; logger.info(`๐Ÿ—œ๏ธ Compression result: ${fileSizeMB}MB โ†’ ${compressedSizeMB}MB (${((1 - compressedData.length / fileSize) * 100).toFixed(1)}% reduction)`); logger.debug(`๐Ÿ’พ Memory after compression: ${memAfterMB}MB`); // Create compressed file path const compressedFilePath = filePath + extension; // Write compressed file await fs.writeFile(compressedFilePath, compressedData); // Clear compression data from memory compressedData = null as any; logger.info(`โœ… File compressed successfully: ${compressedFilePath}`); return compressedFilePath; } catch (error) { logger.error(`โŒ Error compressing file: ${error}`); // Return original file path if compression fails return filePath; } } /** * Get compression file extension */ function getCompressionExtension(format: string): string { switch (format) { case 'gzip': return '.gz'; case 'brotli': return '.br'; case 'zip': return '.gz'; // Using gzip for zip format default: return '.gz'; } } function getContentType(filePath: string): string { const ext = path.extname(filePath).toLowerCase(); // Handle compressed file extensions const compressedContentTypes: { [key: string]: string } = { '.gz': 'application/gzip', '.br': 'application/x-brotli', }; if (compressedContentTypes[ext]) { return compressedContentTypes[ext]; } const contentTypes: { [key: string]: string } = { '.json': 'application/json', '.csv': 'text/csv', '.txt': 'text/plain', '.log': 'text/plain', }; return contentTypes[ext] || 'application/octet-stream'; }