/** * Memory Compression System * Provides automatic compression and decompression of memory entries */ import { gzip, gunzip } from 'node:zlib'; import { promisify } from 'node:util'; import type { ILogger } from '../core/logger.js'; import type { MemoryEntry } from '../utils/types.js'; const gzipAsync = promisify(gzip); const gunzipAsync = promisify(gunzip); export interface CompressionConfig { enabled: boolean; threshold: number; // Minimum size in bytes to trigger compression algorithm: 'gzip' | 'lz4' | 'brotli'; level: number; // Compression level (1-9) enableAdaptive: boolean; // Adaptive compression based on content type } export interface CompressionResult { compressed: boolean; originalSize: number; compressedSize: number; compressionRatio: number; algorithm: string; metadata: Record; } export interface CompressionMetrics { totalCompressions: number; totalDecompressions: number; totalSpaceSaved: number; averageCompressionRatio: number; compressionsByType: Record; compressionTime: { average: number; min: number; max: number; }; decompressionTime: { average: number; min: number; max: number; }; } export class MemoryCompressionEngine { private config: CompressionConfig; private metrics: CompressionMetrics; private compressionTimes: number[] = []; private decompressionTimes: number[] = []; constructor( config: Partial, private logger: ILogger ) { this.config = { enabled: true, threshold: 1024, // 1KB algorithm: 'gzip', level: 6, enableAdaptive: true, ...config }; this.metrics = { totalCompressions: 0, totalDecompressions: 0, totalSpaceSaved: 0, averageCompressionRatio: 0, compressionsByType: {}, compressionTime: { average: 0, min: Infinity, max: 0 }, decompressionTime: { average: 0, min: Infinity, max: 0 } }; } async compressEntry(entry: MemoryEntry): Promise<{ entry: MemoryEntry; result: CompressionResult }> { if (!this.config.enabled) { return { entry, result: { compressed: false, originalSize: entry.content.length, compressedSize: entry.content.length, compressionRatio: 1, algorithm: 'none', metadata: {} } }; } const startTime = Date.now(); const originalSize = Buffer.byteLength(entry.content, 'utf8'); // Check if compression is needed if (originalSize < this.config.threshold) { return { entry, result: { compressed: false, originalSize, compressedSize: originalSize, compressionRatio: 1, algorithm: 'none', metadata: { reason: 'below_threshold' } } }; } try { // Determine best compression algorithm const algorithm = this.config.enableAdaptive ? this.selectOptimalAlgorithm(entry) : this.config.algorithm; // Compress content const compressedBuffer = await this.compressData(entry.content, algorithm); const compressedSize = compressedBuffer.length; const compressionRatio = originalSize / compressedSize; // Only use compression if it provides significant benefit if (compressionRatio < 1.1) { return { entry, result: { compressed: false, originalSize, compressedSize: originalSize, compressionRatio: 1, algorithm: 'none', metadata: { reason: 'insufficient_benefit' } } }; } // Create compressed entry const compressedEntry: MemoryEntry = { ...entry, content: compressedBuffer.toString('base64'), metadata: { ...entry.metadata, compressed: true, compressionAlgorithm: algorithm, originalSize, compressedSize, compressionRatio } }; // Update metrics const compressionTime = Date.now() - startTime; this.updateCompressionMetrics(compressionTime, originalSize, compressedSize, entry.type); const result: CompressionResult = { compressed: true, originalSize, compressedSize, compressionRatio, algorithm, metadata: { compressionTime, level: this.config.level } }; this.logger.debug('Memory entry compressed', { entryId: entry.id, originalSize, compressedSize, ratio: compressionRatio, algorithm, time: compressionTime }); return { entry: compressedEntry, result }; } catch (error) { this.logger.error('Compression failed', { entryId: entry.id, error: error instanceof Error ? error.message : String(error) }); return { entry, result: { compressed: false, originalSize, compressedSize: originalSize, compressionRatio: 1, algorithm: 'none', metadata: { error: error instanceof Error ? error.message : String(error) } } }; } } async decompressEntry(entry: MemoryEntry): Promise { if (!entry.metadata?.compressed) { return entry; } const startTime = Date.now(); try { const algorithm = entry.metadata.compressionAlgorithm as string; const compressedBuffer = Buffer.from(entry.content, 'base64'); // Decompress content const decompressedContent = await this.decompressData(compressedBuffer, algorithm); // Create decompressed entry const decompressedEntry: MemoryEntry = { ...entry, content: decompressedContent, metadata: { ...entry.metadata, compressed: false, decompressedAt: new Date().toISOString() } }; // Update metrics const decompressionTime = Date.now() - startTime; this.updateDecompressionMetrics(decompressionTime); this.logger.debug('Memory entry decompressed', { entryId: entry.id, algorithm, time: decompressionTime }); return decompressedEntry; } catch (error) { this.logger.error('Decompression failed', { entryId: entry.id, error: error instanceof Error ? error.message : String(error) }); // Return original entry if decompression fails return entry; } } async compressBatch(entries: MemoryEntry[]): Promise> { const results: Array<{ entry: MemoryEntry; result: CompressionResult }> = []; // Process entries in parallel with concurrency limit const concurrency = 10; for (let i = 0; i < entries.length; i += concurrency) { const batch = entries.slice(i, i + concurrency); const batchResults = await Promise.all( batch.map(entry => this.compressEntry(entry)) ); results.push(...batchResults); } return results; } async decompressBatch(entries: MemoryEntry[]): Promise { const compressedEntries = entries.filter(e => e.metadata?.compressed); if (compressedEntries.length === 0) { return entries; } // Process compressed entries in parallel const concurrency = 10; const decompressedEntries = new Map(); for (let i = 0; i < compressedEntries.length; i += concurrency) { const batch = compressedEntries.slice(i, i + concurrency); const batchResults = await Promise.all( batch.map(entry => this.decompressEntry(entry)) ); batchResults.forEach(entry => { decompressedEntries.set(entry.id, entry); }); } // Return all entries with compressed ones replaced return entries.map(entry => decompressedEntries.get(entry.id) || entry ); } getMetrics(): CompressionMetrics { const metrics = { ...this.metrics }; // Handle case where no compressions have been performed if (metrics.totalCompressions === 0) { metrics.compressionTime.min = 0; } if (metrics.totalDecompressions === 0) { metrics.decompressionTime.min = 0; } return metrics; } resetMetrics(): void { this.metrics = { totalCompressions: 0, totalDecompressions: 0, totalSpaceSaved: 0, averageCompressionRatio: 0, compressionsByType: {}, compressionTime: { average: 0, min: Infinity, max: 0 }, decompressionTime: { average: 0, min: Infinity, max: 0 } }; this.compressionTimes = []; this.decompressionTimes = []; } updateConfig(config: Partial): void { this.config = { ...this.config, ...config }; this.logger.info('Compression config updated', { config: this.config }); } private selectOptimalAlgorithm(entry: MemoryEntry): string { // Adaptive algorithm selection based on content type and size const contentType = entry.type; const size = entry.content.length; // JSON and structured data compress well with gzip if (contentType === 'artifact' || contentType === 'decision') { return 'gzip'; } // For small entries, use faster compression if (size < 10000) { return 'gzip'; } // For large entries, use more aggressive compression return 'gzip'; // Would use brotli if available } private async compressData(data: string, algorithm: string): Promise { const buffer = Buffer.from(data, 'utf8'); switch (algorithm) { case 'gzip': return await gzipAsync(buffer, { level: this.config.level }); case 'lz4': // LZ4 compression would be implemented here // For now, fall back to gzip return await gzipAsync(buffer, { level: this.config.level }); case 'brotli': // Brotli compression would be implemented here // For now, fall back to gzip return await gzipAsync(buffer, { level: this.config.level }); default: throw new Error(`Unsupported compression algorithm: ${algorithm}`); } } private async decompressData(buffer: Buffer, algorithm: string): Promise { switch (algorithm) { case 'gzip': const decompressed = await gunzipAsync(buffer); return decompressed.toString('utf8'); case 'lz4': // LZ4 decompression would be implemented here // For now, fall back to gzip const decompressedLz4 = await gunzipAsync(buffer); return decompressedLz4.toString('utf8'); case 'brotli': // Brotli decompression would be implemented here // For now, fall back to gzip const decompressedBrotli = await gunzipAsync(buffer); return decompressedBrotli.toString('utf8'); default: throw new Error(`Unsupported decompression algorithm: ${algorithm}`); } } private updateCompressionMetrics( time: number, originalSize: number, compressedSize: number, type: string ): void { this.metrics.totalCompressions++; this.metrics.totalSpaceSaved += (originalSize - compressedSize); // Update compression ratio const ratio = originalSize / compressedSize; this.metrics.averageCompressionRatio = (this.metrics.averageCompressionRatio * (this.metrics.totalCompressions - 1) + ratio) / this.metrics.totalCompressions; // Update type statistics this.metrics.compressionsByType[type] = (this.metrics.compressionsByType[type] || 0) + 1; // Update timing metrics this.compressionTimes.push(time); this.metrics.compressionTime.min = Math.min(this.metrics.compressionTime.min, time); this.metrics.compressionTime.max = Math.max(this.metrics.compressionTime.max, time); this.metrics.compressionTime.average = this.compressionTimes.reduce((sum, t) => sum + t, 0) / this.compressionTimes.length; // Keep only recent timing data if (this.compressionTimes.length > 1000) { this.compressionTimes = this.compressionTimes.slice(-500); } } private updateDecompressionMetrics(time: number): void { this.metrics.totalDecompressions++; // Update timing metrics this.decompressionTimes.push(time); this.metrics.decompressionTime.min = Math.min(this.metrics.decompressionTime.min, time); this.metrics.decompressionTime.max = Math.max(this.metrics.decompressionTime.max, time); this.metrics.decompressionTime.average = this.decompressionTimes.reduce((sum, t) => sum + t, 0) / this.decompressionTimes.length; // Keep only recent timing data if (this.decompressionTimes.length > 1000) { this.decompressionTimes = this.decompressionTimes.slice(-500); } } }