/** * Distributed memory system with sharing capabilities */ import { EventEmitter } from 'node:events'; import { ILogger } from "../core/logger.ts"; import { IEventBus } from "../core/event-bus.ts"; import { SwarmMemory, MemoryPartition, MemoryEntry, MemoryType, AccessLevel, ConsistencyLevel, MemoryPermissions, AgentId } from "../swarm/types.ts"; import { generateId } from "../utils/helpers.ts"; export interface DistributedMemoryConfig { namespace: string; distributed: boolean; consistency: ConsistencyLevel; replicationFactor: number; syncInterval: number; maxMemorySize: number; compressionEnabled: boolean; encryptionEnabled: boolean; backupEnabled: boolean; persistenceEnabled: boolean; shardingEnabled: boolean; cacheSize: number; cacheTtl: number; } export interface MemoryNode { id: string; address: string; port: number; status: 'online' | 'offline' | 'syncing' | 'failed'; lastSeen: Date; partitions: string[]; load: number; capacity: number; } export interface SyncOperation { id: string; type: 'create' | 'update' | 'delete' | 'batch' | 'partition-create' | 'entry-update' | 'entry-delete'; partition?: string; entry?: MemoryEntry; entries?: MemoryEntry[]; timestamp: Date; version: number; origin: string; targets: string[]; status: 'pending' | 'in_progress' | 'completed' | 'failed'; data?: any; nodeId?: string; } export interface MemoryQuery { namespace?: string; partition?: string; type?: MemoryType; tags?: string[]; owner?: AgentId; accessLevel?: AccessLevel; createdAfter?: Date; updatedAfter?: Date; limit?: number; offset?: number; sortBy?: string; sortOrder?: 'asc' | 'desc'; } export interface MemoryStatistics { totalEntries: number; totalSize: number; partitionCount: number; nodeCount: number; replicationHealth: number; syncOperations: { pending: number; completed: number; failed: number; }; performance: { readLatency: number; writeLatency: number; syncLatency: number; throughput: number; }; utilization: { memoryUsage: number; diskUsage: number; networkUsage: number; }; } /** * Distributed memory system for sharing data across swarm agents */ export class DistributedMemorySystem extends EventEmitter { private logger: ILogger; private eventBus: IEventBus; private config: DistributedMemoryConfig; // Storage private partitions = new Map(); private entries = new Map(); private cache = new Map(); // Distribution private nodes = new Map(); private localNodeId: string; private syncQueue: SyncOperation[] = []; private replicationMap = new Map(); // entryId -> nodeIds // Synchronization private syncInterval?: NodeJS.Timeout; private vectorClock = new Map(); private conflictResolver?: (local: MemoryEntry, remote: MemoryEntry) => MemoryEntry; // Performance tracking private statistics: MemoryStatistics; private operationMetrics = new Map(); constructor( config: Partial, logger: ILogger, eventBus: IEventBus ) { super(); this.logger = logger; this.eventBus = eventBus; this.config = { namespace: 'default', distributed: true, consistency: 'eventual', replicationFactor: 3, syncInterval: 5000, maxMemorySize: 1024 * 1024 * 1024, // 1GB compressionEnabled: true, encryptionEnabled: false, backupEnabled: true, persistenceEnabled: true, shardingEnabled: true, cacheSize: 10000, cacheTtl: 300000, // 5 minutes ...config }; this.localNodeId = generateId('memory-node'); this.statistics = this.initializeStatistics(); this.setupEventHandlers(); } private setupEventHandlers(): void { this.eventBus.on('memory:sync-request', (data) => { this.handleSyncRequest(data); }); this.eventBus.on('memory:node-joined', (data) => { this.handleNodeJoined(data); }); this.eventBus.on('memory:node-left', (data) => { this.handleNodeLeft(data); }); this.eventBus.on('memory:conflict-detected', (data) => { this.handleConflict(data); }); } async initialize(): Promise { this.logger.info('Initializing distributed memory system', { nodeId: this.localNodeId, namespace: this.config.namespace, distributed: this.config.distributed }); // Register local node const localNode: MemoryNode = { id: this.localNodeId, address: 'localhost', port: 8080, status: 'online', lastSeen: new Date(), partitions: [], load: 0, capacity: this.config.maxMemorySize }; this.nodes.set(this.localNodeId, localNode); // Initialize default partitions await this.createPartition('knowledge', 'knowledge'); await this.createPartition('state', 'state'); await this.createPartition('cache', 'cache'); await this.createPartition('results', 'results'); // Start synchronization if distributed if (this.config.distributed) { this.startSynchronization(); } // Load persisted data if enabled if (this.config.persistenceEnabled) { await this.loadPersistedData(); } this.emit('memory:initialized', { nodeId: this.localNodeId }); } async shutdown(): Promise { this.logger.info('Shutting down distributed memory system'); // Stop synchronization if (this.syncInterval) { clearInterval(this.syncInterval); } // Complete pending sync operations await this.completePendingSyncOperations(); // Persist data if enabled if (this.config.persistenceEnabled) { await this.persistData(); } // Clear caches this.cache.clear(); this.partitions.clear(); this.entries.clear(); this.emit('memory:shutdown', { nodeId: this.localNodeId }); } // === PARTITION MANAGEMENT === async createPartition( name: string, type: MemoryType, options: { maxSize?: number; ttl?: number; readOnly?: boolean; shared?: boolean; indexed?: boolean; compressed?: boolean; } = {} ): Promise { const partitionId = generateId('partition'); const partition: MemoryPartition = { id: partitionId, name, type, entries: [], maxSize: options.maxSize || this.config.maxMemorySize / 10, ttl: options.ttl, readOnly: options.readOnly || false, shared: options.shared !== false, // Default to shared indexed: options.indexed || false, compressed: options.compressed || this.config.compressionEnabled }; this.partitions.set(partitionId, partition); // Update local node partition list const localNode = this.nodes.get(this.localNodeId)!; localNode.partitions.push(partitionId); this.logger.info('Created partition', { partitionId, name, type }); this.emit('memory:partition-created', { partition }); // Sync with other nodes if distributed if (this.config.distributed) { await this.syncPartitionCreation(partition); } return partitionId; } async deletePartition(partitionId: string): Promise { const partition = this.partitions.get(partitionId); if (!partition) { throw new Error(`Partition ${partitionId} not found`); } // Delete all entries in partition const entriesToDelete = Array.from(this.entries.values()) .filter(entry => this.getEntryPartition(entry.id) === partitionId); for (const entry of entriesToDelete) { await this.deleteEntry(entry.id); } // Remove partition this.partitions.delete(partitionId); // Update local node const localNode = this.nodes.get(this.localNodeId)!; localNode.partitions = localNode.partitions.filter(p => p !== partitionId); this.logger.info('Deleted partition', { partitionId }); this.emit('memory:partition-deleted', { partitionId }); // Sync with other nodes if distributed if (this.config.distributed) { await this.syncPartitionDeletion(partitionId); } } // === ENTRY OPERATIONS === async store( key: string, value: any, options: { type?: string; tags?: string[]; owner?: AgentId; accessLevel?: AccessLevel; partition?: string; ttl?: number; replicate?: boolean; } = {} ): Promise { const startTime = Date.now(); try { const entryId = generateId('entry'); const now = new Date(); // Determine partition const partitionId = options.partition || this.selectPartition(options.type || 'knowledge'); const partition = this.partitions.get(partitionId); if (!partition) { throw new Error(`Partition ${partitionId} not found`); } if (partition.readOnly) { throw new Error('Cannot write to read-only partition'); } // Check partition capacity if (this.getPartitionSize(partitionId) >= partition.maxSize) { await this.evictOldEntries(partitionId); } // Create entry const entry: MemoryEntry = { id: entryId, key, value: await this.processValue(value, partition), type: options.type || 'data', tags: options.tags || [], owner: options.owner || { id: 'system', swarmId: '', type: 'coordinator', instance: 0 }, accessLevel: options.accessLevel || 'swarm', createdAt: now, updatedAt: now, expiresAt: options.ttl ? new Date(now.getTime() + options.ttl) : undefined, version: 1, references: [], dependencies: [] }; // Store entry this.entries.set(entryId, entry); partition.entries.push(entry); // Update cache this.updateCache(entryId, entry); // Update vector clock this.incrementVectorClock(this.localNodeId); this.logger.debug('Stored entry', { entryId, key, partition: partitionId }); this.emit('memory:entry-stored', { entry }); // Replicate if distributed and requested if (this.config.distributed && options.replicate !== false) { await this.replicateEntry(entry); } this.recordMetric('store', Date.now() - startTime); return entryId; } catch (error) { this.recordMetric('store-error', Date.now() - startTime); throw error; } } async retrieve( key: string, options: { partition?: string; consistency?: ConsistencyLevel; maxAge?: number; } = {} ): Promise { const startTime = Date.now(); try { // Check cache first const cached = this.getCachedEntry(key); if (cached && this.isCacheValid(cached)) { this.recordMetric('retrieve-cache', Date.now() - startTime); return cached.entry; } // Search in specified partition or all partitions const partitions = options.partition ? [this.partitions.get(options.partition)].filter(Boolean) : Array.from(this.partitions.values()); for (const partition of partitions) { const entry = partition!.entries.find(e => e.key === key); if (entry) { // Check if entry is expired if (entry.expiresAt && entry.expiresAt < new Date()) { await this.deleteEntry(entry.id); continue; } // Check access permissions if (!this.checkAccess(entry, 'read')) { continue; } // Apply consistency model if (this.config.distributed && options.consistency === 'strong') { const latestEntry = await this.ensureConsistency(entry); this.updateCache(latestEntry.id, latestEntry); this.recordMetric('retrieve', Date.now() - startTime); return latestEntry; } this.updateCache(entry.id, entry); this.recordMetric('retrieve', Date.now() - startTime); return entry; } } // Not found locally, try remote nodes if distributed if (this.config.distributed) { const remoteEntry = await this.retrieveFromRemote(key, options); if (remoteEntry) { this.recordMetric('retrieve-remote', Date.now() - startTime); return remoteEntry; } } this.recordMetric('retrieve-miss', Date.now() - startTime); return null; } catch (error) { this.recordMetric('retrieve-error', Date.now() - startTime); throw error; } } async update( key: string, value: any, options: { partition?: string; merge?: boolean; version?: number; } = {} ): Promise { const startTime = Date.now(); try { const entry = await this.retrieve(key, { partition: options.partition }); if (!entry) { this.recordMetric('update-not-found', Date.now() - startTime); return false; } // Check access permissions if (!this.checkAccess(entry, 'write')) { throw new Error('Access denied for update operation'); } // Version check for optimistic locking if (options.version && entry.version !== options.version) { throw new Error('Version conflict: entry has been modified'); } // Update entry const partition = this.partitions.get(this.getEntryPartition(entry.id))!; entry.value = options.merge ? await this.mergeValues(entry.value, value, partition) : await this.processValue(value, partition); entry.updatedAt = new Date(); entry.version++; // Update cache this.updateCache(entry.id, entry); // Update vector clock this.incrementVectorClock(this.localNodeId); this.logger.debug('Updated entry', { entryId: entry.id, key }); this.emit('memory:entry-updated', { entry }); // Sync with other nodes if distributed if (this.config.distributed) { await this.syncEntryUpdate(entry); } this.recordMetric('update', Date.now() - startTime); return true; } catch (error) { this.recordMetric('update-error', Date.now() - startTime); throw error; } } async deleteEntry(entryId: string): Promise { const startTime = Date.now(); try { const entry = this.entries.get(entryId); if (!entry) { this.recordMetric('delete-not-found', Date.now() - startTime); return false; } // Check access permissions if (!this.checkAccess(entry, 'delete')) { throw new Error('Access denied for delete operation'); } // Remove from partition const partitionId = this.getEntryPartition(entryId); const partition = this.partitions.get(partitionId); if (partition) { partition.entries = partition.entries.filter(e => e.id !== entryId); } // Remove from storage this.entries.delete(entryId); // Remove from cache this.removeFromCache(entry.key); // Update vector clock this.incrementVectorClock(this.localNodeId); this.logger.debug('Deleted entry', { entryId, key: entry.key }); this.emit('memory:entry-deleted', { entryId }); // Sync with other nodes if distributed if (this.config.distributed) { await this.syncEntryDeletion(entryId); } this.recordMetric('delete', Date.now() - startTime); return true; } catch (error) { this.recordMetric('delete-error', Date.now() - startTime); throw error; } } // === QUERY OPERATIONS === async query(query: MemoryQuery): Promise { const startTime = Date.now(); try { let results: MemoryEntry[] = []; // Get relevant partitions const partitions = query.partition ? [this.partitions.get(query.partition)].filter(Boolean) : Array.from(this.partitions.values()); for (const partition of partitions) { for (const entry of partition!.entries) { if (this.matchesQuery(entry, query)) { results.push(entry); } } } // Apply sorting if (query.sortBy) { results.sort((a, b) => { const aVal = this.getNestedProperty(a, query.sortBy!); const bVal = this.getNestedProperty(b, query.sortBy!); const order = query.sortOrder === 'desc' ? -1 : 1; if (aVal < bVal) return -1 * order; if (aVal > bVal) return 1 * order; return 0; }); } // Apply pagination const offset = query.offset || 0; const limit = query.limit || results.length; results = results.slice(offset, offset + limit); this.recordMetric('query', Date.now() - startTime); return results; } catch (error) { this.recordMetric('query-error', Date.now() - startTime); throw error; } } // === SYNCHRONIZATION === private startSynchronization(): void { this.syncInterval = setInterval(() => { this.performSync(); }, this.config.syncInterval); this.logger.info('Started synchronization', { interval: this.config.syncInterval, consistency: this.config.consistency }); } private async performSync(): Promise { try { // Process sync queue await this.processSyncQueue(); // Send heartbeat to other nodes await this.sendHeartbeat(); // Check for and resolve conflicts await this.detectAndResolveConflicts(); // Update statistics this.updateStatistics(); } catch (error) { this.logger.error('Sync operation failed', error); } } private async processSyncQueue(): Promise { const pendingOps = this.syncQueue.filter(op => op.status === 'pending'); for (const operation of pendingOps) { try { operation.status = 'in_progress'; await this.executeSyncOperation(operation); operation.status = 'completed'; this.statistics.syncOperations.completed++; } catch (error) { operation.status = 'failed'; this.statistics.syncOperations.failed++; this.logger.error('Sync operation failed', { operation, error }); } } // Remove completed/failed operations older than 1 hour const cutoff = new Date(Date.now() - 3600000); this.syncQueue = this.syncQueue.filter(op => op.status === 'pending' || op.timestamp > cutoff ); } private async sendHeartbeat(): Promise { const heartbeat = { nodeId: this.localNodeId, timestamp: new Date(), partitions: Array.from(this.partitions.keys()), load: this.calculateNodeLoad(), vectorClock: Object.fromEntries(this.vectorClock) }; // Send to all known nodes for (const nodeId of this.nodes.keys()) { if (nodeId !== this.localNodeId) { try { await this.sendToNode(nodeId, { type: 'heartbeat', data: heartbeat }); } catch (error) { this.logger.warn('Failed to send heartbeat', { nodeId, error }); } } } } private async detectAndResolveConflicts(): Promise { // Check for vector clock conflicts for (const [entryId, entry] of this.entries) { if (this.hasConflict(entry)) { await this.resolveConflict(entryId, entry); } } } private calculateNodeLoad(): number { const totalEntries = this.entries.size; const totalMemory = Array.from(this.entries.values()) .reduce((sum, entry) => sum + JSON.stringify(entry).length, 0); return totalMemory / (this.config.maxMemorySize || 1024 * 1024 * 100); // Default 100MB } private hasConflict(entry: MemoryEntry): boolean { // Simple conflict detection based on vector clocks return false; // Placeholder implementation } private async resolveConflict(entryId: string, entry: MemoryEntry): Promise { if (this.conflictResolver) { // Use custom conflict resolver if provided const resolved = this.conflictResolver(entry, entry); // Simplified this.entries.set(entryId, resolved); } else { // Default: use last-write-wins // Already handled by version comparison } } // === UTILITY METHODS === private async processValue(value: any, partition: MemoryPartition): Promise { if (partition.compressed && this.config.compressionEnabled) { return this.compressValue(value); } return value; } private async mergeValues(oldValue: any, newValue: any, partition: MemoryPartition): Promise { // Simple merge strategy - can be enhanced if (typeof oldValue === 'object' && typeof newValue === 'object') { return { ...oldValue, ...newValue }; } return newValue; } private compressValue(value: any): any { // Placeholder for compression logic return value; } private checkAccess(entry: MemoryEntry, operation: 'read' | 'write' | 'delete'): boolean { // Simplified access control - can be enhanced return true; } private selectPartition(type: string): string { // Simple partition selection based on type for (const [id, partition] of Array.from(this.partitions.entries())) { if (partition.type === type) { return id; } } // Default to first available partition return Array.from(this.partitions.keys())[0] || ''; } private getPartitionSize(partitionId: string): number { const partition = this.partitions.get(partitionId); if (!partition) return 0; return partition.entries.reduce((size, entry) => { return size + JSON.stringify(entry).length; }, 0); } private getEntryPartition(entryId: string): string { for (const [partitionId, partition] of Array.from(this.partitions.entries())) { if (partition.entries.some((e: MemoryEntry) => e.id === entryId)) { return partitionId; } } return ''; } private updateCache(entryId: string, entry: MemoryEntry): void { if (this.cache.size >= this.config.cacheSize) { this.evictCache(); } this.cache.set(entry.key, { entry: { ...entry }, expiry: Date.now() + this.config.cacheTtl }); } private getCachedEntry(key: string): { entry: MemoryEntry; expiry: number } | null { return this.cache.get(key) || null; } private isCacheValid(cached: { entry: MemoryEntry; expiry: number }): boolean { return cached.expiry > Date.now(); } private removeFromCache(key: string): void { this.cache.delete(key); } private evictCache(): void { // Simple LRU eviction - remove oldest entries const entries = Array.from(this.cache.entries()); entries.sort((a, b) => a[1].expiry - b[1].expiry); const toRemove = entries.slice(0, Math.floor(this.config.cacheSize * 0.1)); toRemove.forEach(([key]) => this.cache.delete(key)); } private async evictOldEntries(partitionId: string): Promise { const partition = this.partitions.get(partitionId); if (!partition) return; // Sort by last access time and remove oldest 10% const entries = partition.entries.sort((a, b) => a.updatedAt.getTime() - b.updatedAt.getTime() ); const toRemove = entries.slice(0, Math.floor(entries.length * 0.1)); for (const entry of toRemove) { await this.deleteEntry(entry.id); } } private matchesQuery(entry: MemoryEntry, query: MemoryQuery): boolean { if (query.type && entry.type !== query.type) return false; if (query.owner && entry.owner.id !== query.owner.id) return false; if (query.accessLevel && entry.accessLevel !== query.accessLevel) return false; if (query.createdAfter && entry.createdAt < query.createdAfter) return false; if (query.updatedAfter && entry.updatedAt < query.updatedAfter) return false; if (query.tags && query.tags.length > 0) { const hasAllTags = query.tags.every(tag => entry.tags.includes(tag)); if (!hasAllTags) return false; } return true; } private getNestedProperty(obj: any, path: string): any { return path.split('.').reduce((current, key) => current?.[key], obj); } private incrementVectorClock(nodeId: string): void { const current = this.vectorClock.get(nodeId) || 0; this.vectorClock.set(nodeId, current + 1); } private recordMetric(operation: string, duration: number): void { const current = this.operationMetrics.get(operation) || { count: 0, totalTime: 0 }; current.count++; current.totalTime += duration; this.operationMetrics.set(operation, current); } private initializeStatistics(): MemoryStatistics { return { totalEntries: 0, totalSize: 0, partitionCount: 0, nodeCount: 1, replicationHealth: 1.0, syncOperations: { pending: 0, completed: 0, failed: 0 }, performance: { readLatency: 0, writeLatency: 0, syncLatency: 0, throughput: 0 }, utilization: { memoryUsage: 0, diskUsage: 0, networkUsage: 0 } }; } private updateStatistics(): void { this.statistics.totalEntries = this.entries.size; this.statistics.partitionCount = this.partitions.size; this.statistics.nodeCount = this.nodes.size; // Calculate performance metrics const readMetrics = this.operationMetrics.get('retrieve') || { count: 0, totalTime: 0 }; const writeMetrics = this.operationMetrics.get('store') || { count: 0, totalTime: 0 }; this.statistics.performance.readLatency = readMetrics.count > 0 ? readMetrics.totalTime / readMetrics.count : 0; this.statistics.performance.writeLatency = writeMetrics.count > 0 ? writeMetrics.totalTime / writeMetrics.count : 0; } // === DISTRIBUTED OPERATIONS (Placeholders) === private async replicateEntry(entry: MemoryEntry): Promise { // Implementation for replication to other nodes if (!this.config.distributed) return; const targetNodes = this.selectReplicationNodes(entry.id); const replicationPromises = targetNodes.map(nodeId => this.sendReplicationRequest(nodeId, entry) ); try { await Promise.allSettled(replicationPromises); this.replicationMap.set(entry.id, targetNodes); this.logger.debug('Entry replicated', { entryId: entry.id, nodes: targetNodes }); } catch (error) { this.logger.error('Replication failed', { entryId: entry.id, error }); } } private async syncPartitionCreation(partition: MemoryPartition): Promise { const operation: SyncOperation = { id: generateId('sync'), type: 'partition-create', nodeId: this.localNodeId, timestamp: new Date(), data: partition, status: 'pending', version: this.vectorClock.get(this.localNodeId) || 0, origin: this.localNodeId, targets: Array.from(this.nodes.keys()).filter(id => id !== this.localNodeId) }; this.syncQueue.push(operation); this.logger.debug('Queued partition creation sync', { partition: partition.id, operation: operation.id }); } private async retrieveFromRemote(key: string, options: any): Promise { // Implementation for retrieving from remote nodes if (!this.config.distributed) return null; const remoteNodes = Array.from(this.nodes.values()).filter(node => node.id !== this.localNodeId && node.status === 'online' ); for (const node of remoteNodes) { try { const entry = await this.requestFromNode(node.id, key, options); if (entry) { // Cache locally for future access this.updateCache(entry.id, entry); return entry; } } catch (error) { this.logger.warn('Remote retrieval failed', { nodeId: node.id, error }); } } return null; } private async ensureConsistency(entry: MemoryEntry): Promise { // Implementation for ensuring strong consistency if (!this.config.distributed || this.config.consistency !== 'strong') { return entry; } const replicatedNodes = this.replicationMap.get(entry.id) || []; const consistencyPromises = replicatedNodes.map(nodeId => this.getVersionFromNode(nodeId, entry.id) ); try { const versions = await Promise.allSettled(consistencyPromises); const validVersions = versions .filter(result => result.status === 'fulfilled') .map(result => (result as PromiseFulfilledResult<{ version: number; entry: MemoryEntry }>).value); // Find the latest version const latestVersion = validVersions.reduce((latest, current) => current.version > latest.version ? current : latest, { version: entry.version, entry } ); if (latestVersion.version > entry.version) { // Update local entry with latest version this.entries.set(entry.id, latestVersion.entry); return latestVersion.entry; } return entry; } catch (error) { this.logger.error('Consistency check failed', { entryId: entry.id, error }); return entry; } } private async syncEntryUpdate(entry: MemoryEntry): Promise { const operation: SyncOperation = { id: generateId('sync'), type: 'entry-update', nodeId: this.localNodeId, timestamp: new Date(), data: entry, status: 'pending', version: this.vectorClock.get(this.localNodeId) || 0, origin: this.localNodeId, targets: Array.from(this.nodes.keys()).filter(id => id !== this.localNodeId) }; this.syncQueue.push(operation); this.logger.debug('Queued entry update sync', { entry: entry.id, operation: operation.id }); } private async syncEntryDeletion(entryId: string): Promise { const operation: SyncOperation = { id: generateId('sync'), type: 'entry-delete', nodeId: this.localNodeId, timestamp: new Date(), data: { entryId }, status: 'pending', version: this.vectorClock.get(this.localNodeId) || 0, origin: this.localNodeId, targets: Array.from(this.nodes.keys()).filter(id => id !== this.localNodeId) }; this.syncQueue.push(operation); this.logger.debug('Queued entry deletion sync', { entryId, operation: operation.id }); } private async executeSyncOperation(operation: SyncOperation): Promise { operation.status = 'in_progress'; try { switch (operation.type) { case 'partition-create': await this.broadcastPartitionCreation(operation.data); break; case 'entry-update': await this.broadcastEntryUpdate(operation.data); break; case 'entry-delete': await this.broadcastEntryDeletion(operation.data); break; default: this.logger.warn('Unknown sync operation type', { type: operation.type }); } operation.status = 'completed'; this.logger.debug('Sync operation completed', { operation: operation.id }); } catch (error) { operation.status = 'failed'; this.logger.error('Sync operation failed', { operation: operation.id, error }); throw error; } } private async completePendingSyncOperations(): Promise { // Implementation for completing pending operations const pendingOperations = this.syncQueue.filter(op => op.status === 'pending'); for (const operation of pendingOperations) { try { await this.executeSyncOperation(operation); } catch (error) { this.logger.error('Failed to complete sync operation', { operationId: operation.id, error }); } } // Remove completed and failed operations this.syncQueue = this.syncQueue.filter(op => op.status === 'pending'); } private async loadPersistedData(): Promise { // Implementation for loading persisted data try { // This would typically load from a persistent storage backend this.logger.info('Loading persisted data'); // Placeholder: In a real implementation, this would load from disk/database } catch (error) { this.logger.error('Failed to load persisted data', { error }); } } private async persistData(): Promise { // Implementation for persisting data try { // This would typically save to a persistent storage backend this.logger.info('Persisting data'); // Placeholder: In a real implementation, this would save to disk/database } catch (error) { this.logger.error('Failed to persist data', { error }); } } private handleSyncRequest(data: any): void { // Handle sync requests from other nodes try { const { operation, sourceNodeId } = data; switch (operation.type) { case 'partition-create': this.handleRemotePartitionCreation(operation.data, sourceNodeId); break; case 'entry-update': this.handleRemoteEntryUpdate(operation.data, sourceNodeId); break; case 'entry-delete': this.handleRemoteEntryDeletion(operation.data, sourceNodeId); break; default: this.logger.warn('Unknown sync request type', { type: operation.type }); } } catch (error) { this.logger.error('Failed to handle sync request', { error }); } } private handleNodeJoined(data: any): void { // Handle new node joining const nodeId = data.nodeId; this.handleNodeJoin(nodeId); } private handleNodeLeft(data: any): void { // Handle node leaving const nodeId = data.nodeId; this.handleNodeLeave(nodeId); } private handleConflict(data: any): void { // Handle conflict resolution // This method should be implemented to handle conflicts between nodes } // === HELPER METHODS === private selectReplicationNodes(entryId: string): string[] { const availableNodes = Array.from(this.nodes.values()) .filter(node => node.id !== this.localNodeId && node.status === 'online') .map(node => node.id); const replicationCount = Math.min(this.config.replicationFactor - 1, availableNodes.length); // Simple round-robin selection return availableNodes.slice(0, replicationCount); } private async sendReplicationRequest(nodeId: string, entry: MemoryEntry): Promise { // Simulate sending replication request this.logger.debug('Sending replication request', { nodeId, entryId: entry.id }); // In a real implementation, this would send over network } private async requestFromNode(nodeId: string, key: string, options: any): Promise { // Simulate requesting data from remote node this.logger.debug('Requesting from remote node', { nodeId, key }); // In a real implementation, this would make network request return null; } private async getVersionFromNode(nodeId: string, entryId: string): Promise<{ version: number; entry: MemoryEntry }> { // Simulate getting version from remote node this.logger.debug('Getting version from remote node', { nodeId, entryId }); // In a real implementation, this would make network request throw new Error('Remote node not available'); } private async broadcastPartitionCreation(partition: MemoryPartition): Promise { // Broadcast partition creation to all nodes const nodes = Array.from(this.nodes.values()).filter(node => node.id !== this.localNodeId && node.status === 'online' ); for (const node of nodes) { try { await this.sendToNode(node.id, { type: 'partition-create', data: partition }); } catch (error) { this.logger.warn('Failed to broadcast partition creation', { nodeId: node.id, error }); } } } private async broadcastEntryUpdate(entry: MemoryEntry): Promise { // Broadcast entry update to replicated nodes const replicatedNodes = this.replicationMap.get(entry.id) || []; for (const nodeId of replicatedNodes) { try { await this.sendToNode(nodeId, { type: 'entry-update', data: entry }); } catch (error) { this.logger.warn('Failed to broadcast entry update', { nodeId, error }); } } } private async broadcastEntryDeletion(data: { entryId: string }): Promise { // Broadcast entry deletion to replicated nodes const replicatedNodes = this.replicationMap.get(data.entryId) || []; for (const nodeId of replicatedNodes) { try { await this.sendToNode(nodeId, { type: 'entry-delete', data }); } catch (error) { this.logger.warn('Failed to broadcast entry deletion', { nodeId, error }); } } } private async sendToNode(nodeId: string, message: any): Promise { // Send message to specific node this.logger.debug('Sending message to node', { nodeId, type: message.type }); // In a real implementation, this would send over network } private handleRemotePartitionCreation(partition: MemoryPartition, sourceNodeId: string): void { // Handle partition creation from remote node if (!this.partitions.has(partition.id)) { this.partitions.set(partition.id, partition); this.logger.debug('Received remote partition creation', { partitionId: partition.id, sourceNodeId }); } } private handleRemoteEntryUpdate(entry: MemoryEntry, sourceNodeId: string): void { // Handle entry update from remote node const existingEntry = this.entries.get(entry.id); if (!existingEntry || entry.version > existingEntry.version) { this.entries.set(entry.id, entry); this.updateCache(entry.id, entry); this.logger.debug('Received remote entry update', { entryId: entry.id, sourceNodeId }); } } private handleRemoteEntryDeletion(data: { entryId: string }, sourceNodeId: string): void { // Handle entry deletion from remote node const entry = this.entries.get(data.entryId); if (entry) { this.entries.delete(data.entryId); this.removeFromCache(entry.key); // Remove from partition const partitionId = this.getEntryPartition(data.entryId); const partition = this.partitions.get(partitionId); if (partition) { partition.entries = partition.entries.filter(e => e.id !== data.entryId); } this.logger.debug('Received remote entry deletion', { entryId: data.entryId, sourceNodeId }); } } private handleNodeJoin(nodeId: string): void { // Handle new node joining the network this.logger.info('Node joined network', { nodeId }); // Send current state to new node this.sendStateSnapshot(nodeId); } private handleNodeLeave(nodeId: string): void { // Handle node leaving the network this.logger.info('Node left network', { nodeId }); // Remove from nodes list this.nodes.delete(nodeId); // Update replication map for (const [entryId, nodes] of Array.from(this.replicationMap.entries())) { const updatedNodes = nodes.filter(id => id !== nodeId); if (updatedNodes.length > 0) { this.replicationMap.set(entryId, updatedNodes); } else { this.replicationMap.delete(entryId); } } } private sendStateSnapshot(nodeId: string): void { // Send current state snapshot to new node const snapshot = { partitions: Array.from(this.partitions.values()), entries: Array.from(this.entries.values()), vectorClock: Array.from(this.vectorClock.entries()) }; this.sendToNode(nodeId, { type: 'state-snapshot', data: snapshot }); this.logger.debug('Sent state snapshot to new node', { nodeId }); } // === PUBLIC API === getStatistics(): MemoryStatistics { this.updateStatistics(); return { ...this.statistics }; } getPartitions(): MemoryPartition[] { return Array.from(this.partitions.values()); } getNodes(): MemoryNode[] { return Array.from(this.nodes.values()); } async backup(): Promise { // Create backup of all data const backup = { timestamp: new Date(), partitions: Array.from(this.partitions.values()), entries: Array.from(this.entries.values()), metadata: { version: '1.0', nodeId: this.localNodeId, config: this.config } }; return JSON.stringify(backup); } async restore(backupData: string): Promise { // Restore from backup const backup = JSON.parse(backupData); // Clear current data this.partitions.clear(); this.entries.clear(); this.cache.clear(); // Restore partitions for (const partition of backup.partitions) { this.partitions.set(partition.id, partition); } // Restore entries for (const entry of backup.entries) { this.entries.set(entry.id, entry); } this.logger.info('Restored from backup', { partitions: backup.partitions.length, entries: backup.entries.length }); } async clear(): Promise { this.partitions.clear(); this.entries.clear(); this.cache.clear(); this.syncQueue = []; this.statistics = this.initializeStatistics(); this.logger.info('Cleared all memory data'); this.emit('memory:cleared'); } private async syncPartitionDeletion(partitionId: string): Promise { const operation: SyncOperation = { id: generateId('sync'), type: 'delete', partition: partitionId, timestamp: new Date(), version: this.vectorClock.get(this.localNodeId) || 0, origin: this.localNodeId, targets: Array.from(this.nodes.keys()).filter(id => id !== this.localNodeId), status: 'pending', data: { partitionId } }; this.syncQueue.push(operation); this.logger.debug('Queued partition deletion sync', { partitionId, operation: operation.id }); } }