/** * Advanced Event Patterns Example * * This example demonstrates how to use the advanced event patterns * implemented in SoapJS without being tied to any specific framework. */ import { // Event Replay BaseEventReplayManager, EventReplayHandler, ReplayOptions, ReplayStrategy, // Event Versioning BaseEventVersionManager, InMemoryEventVersionRegistry, createEventVersion, // Saga Orchestration BaseSagaOrchestrator, createSagaDefinition, SagaOrchestrationStrategy, SagaOrchestrationStep, // Event Correlation BaseEventCorrelationManager, createCorrelationRule, CorrelationType, ConditionType, // Event Sourcing Snapshots BaseSnapshotManager, createSnapshotConfiguration, SnapshotStrategy, CompressionAlgorithm } from '../src/cqrs'; import { DomainEvent } from '../src/domain/domain-event'; import { EventStore } from '../src/cqrs/event-store'; import { Result } from '../src/common/result'; // Mock implementations for demonstration class MockEventStore implements EventStore { private events: DomainEvent[] = []; async appendEvents(aggregateId: string, expectedVersion: number, events: DomainEvent[]): Promise> { this.events.push(...events); return Result.withSuccess(); } async getEvents(aggregateId: string): Promise> { const filteredEvents = this.events.filter(e => (e as any).aggregateId === aggregateId); return Result.withSuccess(filteredEvents); } async getEventsFromVersion(aggregateId: string, fromVersion: number): Promise> { const filteredEvents = this.events.filter(e => (e as any).aggregateId === aggregateId && (e as any).version >= fromVersion ); return Result.withSuccess(filteredEvents); } async getEventsByType(eventType: string): Promise> { const filteredEvents = this.events.filter(e => e.type === eventType); return Result.withSuccess(filteredEvents); } async getEventsByCorrelationId(correlationId: string): Promise> { const filteredEvents = this.events.filter(e => (e as any).correlationId === correlationId); return Result.withSuccess(filteredEvents); } async getEventsInTimeRange(fromDate: Date, toDate: Date): Promise> { const filteredEvents = this.events.filter(e => e.timestamp >= fromDate && e.timestamp <= toDate ); return Result.withSuccess(filteredEvents); } } class MockSnapshotStore { private snapshots: any[] = []; async saveSnapshot(snapshot: any): Promise> { this.snapshots.push(snapshot); return Result.withSuccess(); } async getLatestSnapshot(aggregateId: string): Promise> { const snapshot = this.snapshots .filter(s => s.aggregateId === aggregateId) .sort((a, b) => b.version - a.version)[0]; if (!snapshot) { return Result.withFailure(new Error('No snapshot found')); } return Result.withSuccess(snapshot); } async getSnapshots(aggregateId: string): Promise> { const filteredSnapshots = this.snapshots.filter(s => s.aggregateId === aggregateId); return Result.withSuccess(filteredSnapshots); } async getSnapshot(snapshotId: string): Promise> { const snapshot = this.snapshots.find(s => s.snapshotId === snapshotId); if (!snapshot) { return Result.withFailure(new Error('Snapshot not found')); } return Result.withSuccess(snapshot); } async getSnapshotsInRange(aggregateId: string, fromVersion: number, toVersion: number): Promise> { const filteredSnapshots = this.snapshots.filter(s => s.aggregateId === aggregateId && s.version >= fromVersion && s.version <= toVersion ); return Result.withSuccess(filteredSnapshots); } async deleteSnapshot(snapshotId: string): Promise> { const index = this.snapshots.findIndex(s => s.snapshotId === snapshotId); if (index >= 0) { this.snapshots.splice(index, 1); } return Result.withSuccess(); } async deleteOldSnapshots(aggregateId: string, keepCount: number): Promise> { const snapshots = this.snapshots .filter(s => s.aggregateId === aggregateId) .sort((a, b) => b.version - a.version); if (snapshots.length > keepCount) { const toDelete = snapshots.slice(keepCount); for (const snapshot of toDelete) { await this.deleteSnapshot(snapshot.snapshotId); } } return Result.withSuccess(); } async getSnapshotStatistics(): Promise> { return Result.withSuccess({ totalSnapshots: this.snapshots.length, totalSizeInBytes: 0, averageSizeInBytes: 0, snapshotsByStrategy: {}, snapshotsByAggregateType: {}, compressionStats: { compressedSnapshots: 0, averageCompressionRatio: 0, totalSpaceSaved: 0 } }); } } // Example usage functions export async function demonstrateEventReplay() { console.log('🔄 Event Replay Example'); const eventStore = new MockEventStore(); const replayManager = new BaseEventReplayManager(eventStore); // Create a custom replay handler class CustomReplayHandler implements EventReplayHandler { async handleEvent(event: DomainEvent): Promise> { console.log(`Processing event: ${event.type} at ${event.timestamp}`); return Result.withSuccess(); } async handleBatch(events: DomainEvent[]): Promise> { console.log(`Processing batch of ${events.length} events`); return Result.withSuccess(); } async onReplayStart(options: ReplayOptions): Promise> { console.log(`Starting replay with strategy: ${options.strategy}`); return Result.withSuccess(); } async onReplayComplete(progress: any): Promise> { console.log(`Replay completed: ${progress.processedEvents}/${progress.totalEvents} events`); return Result.withSuccess(); } } // Configure replay options const replayOptions: ReplayOptions = { strategy: ReplayStrategy.CHRONOLOGICAL, fromDate: new Date('2024-01-01'), toDate: new Date('2024-12-31'), batchSize: 50, includeSnapshots: true }; // Start replay const replayResult = await replayManager.startReplay(replayOptions, new CustomReplayHandler()); if (replayResult.isSuccess) { console.log(`Replay started with ID: ${replayResult.data}`); } } export async function demonstrateEventVersioning() { console.log('📋 Event Versioning Example'); const registry = new InMemoryEventVersionRegistry(); const versionManager = new BaseEventVersionManager(registry); // Create event version schemas const userCreatedV1 = createEventVersion('UserCreated') .version(1) .schema({ type: 'object', properties: { userId: { type: 'string' }, email: { type: 'string' }, name: { type: 'string' } }, required: ['userId', 'email', 'name'] }) .build(); const userCreatedV2 = createEventVersion('UserCreated') .version(2) .schema({ type: 'object', properties: { userId: { type: 'string' }, email: { type: 'string' }, firstName: { type: 'string' }, lastName: { type: 'string' } }, required: ['userId', 'email', 'firstName', 'lastName'] }) .migration((data, fromVersion, toVersion) => { // Migrate from v1 to v2 if (fromVersion === 1 && toVersion === 2) { const name = data.name as string; const nameParts = name.split(' '); return { ...data, firstName: nameParts[0] || '', lastName: nameParts.slice(1).join(' ') || '', // Remove old field name: undefined }; } return data; }) .build(); // Register versions await versionManager.registerVersion(userCreatedV1); await versionManager.registerVersion(userCreatedV2); // Create an old version event const oldEvent: DomainEvent = { id: 'event-1', type: 'UserCreated', timestamp: new Date(), data: { userId: 'user-123', email: 'john@example.com', name: 'John Doe' } }; // Migrate to latest version const migrationResult = await versionManager.migrateToLatest(oldEvent); if (migrationResult.isSuccess) { console.log('Event migrated successfully:', migrationResult.data.data); } } export async function demonstrateSagaOrchestration() { console.log('🎭 Saga Orchestration Example'); const sagaOrchestrator = new BaseSagaOrchestrator(); // Create saga steps const steps: SagaOrchestrationStep[] = [ { stepId: 'step-1', name: 'Create User', command: { type: 'CreateUser', data: { email: 'user@example.com' } } as any, compensation: { type: 'DeleteUser', data: { email: 'user@example.com' } } as any, completed: false, compensated: false, timeout: 30000, retryConfig: { maxRetries: 3, retryDelay: 1000, backoffMultiplier: 2 } }, { stepId: 'step-2', name: 'Send Welcome Email', command: { type: 'SendEmail', data: { to: 'user@example.com', template: 'welcome' } } as any, completed: false, compensated: false, timeout: 10000 } ]; // Create saga definition const sagaDefinition = createSagaDefinition('UserOnboardingSaga') .version('1.0.0') .strategy(SagaOrchestrationStrategy.ORCHESTRATION) .addStep(steps[0]) .addStep(steps[1]) .globalTimeout(60000) .compensationStrategy('compensate_all' as any) .build(); // Start saga const sagaResult = await sagaOrchestrator.startSaga(sagaDefinition, { userEmail: 'user@example.com', userId: 'user-123' }); if (sagaResult.isSuccess) { console.log(`Saga started with ID: ${sagaResult.data}`); } } export async function demonstrateEventCorrelation() { console.log('🔗 Event Correlation Example'); const correlationManager = new BaseEventCorrelationManager(); // Create correlation rule const correlationRule = createCorrelationRule('user-session-rule', 'User Session Correlation') .description('Correlate events within the same user session') .sourceEventPattern({ eventType: 'UserAction*', dataPatterns: { userId: '${userId}' } }) .targetEventPattern({ eventType: 'UserAction*', dataPatterns: { userId: '${userId}' } }) .correlationType(CorrelationType.CONTEXTUAL) .addCondition({ type: ConditionType.EQUALS, fieldPath: 'data.sessionId', expectedValue: '${sessionId}', operator: 'and' as any }) .build(); // Register correlation rule await correlationManager.registerCorrelationRule(correlationRule); // Process events for correlation const event1: DomainEvent = { id: 'event-1', type: 'UserActionLogin', timestamp: new Date(), data: { userId: 'user-123', sessionId: 'session-456', action: 'login' } }; const event2: DomainEvent = { id: 'event-2', type: 'UserActionViewPage', timestamp: new Date(), data: { userId: 'user-123', sessionId: 'session-456', action: 'view_page', page: '/dashboard' } }; // Process events await correlationManager.processEvent(event1); const correlationResult = await correlationManager.processEvent(event2); if (correlationResult.isSuccess) { console.log(`Found ${correlationResult.data.length} correlations`); } } export async function demonstrateEventSourcingSnapshots() { console.log('📸 Event Sourcing Snapshots Example'); const eventStore = new MockEventStore(); const snapshotStore = new MockSnapshotStore(); const snapshotManager = new BaseSnapshotManager(snapshotStore, eventStore); // Configure snapshot strategy const snapshotConfig = createSnapshotConfiguration('User') .strategy(SnapshotStrategy.EVENT_COUNT) .eventCountThreshold(50) .compress(true) .compressionAlgorithm(CompressionAlgorithm.GZIP) .maxSnapshots(5) .autoCleanup(true) .build(); // Set configuration await snapshotManager.setSnapshotConfiguration('User', snapshotConfig); // Create a snapshot const aggregateState = { userId: 'user-123', email: 'user@example.com', profile: { firstName: 'John', lastName: 'Doe', preferences: { theme: 'dark', notifications: true } }, lastLoginAt: new Date(), version: 100 }; const snapshotResult = await snapshotManager.createSnapshot( 'user-123', 'User', aggregateState, 100, snapshotConfig ); if (snapshotResult.isSuccess) { console.log(`Snapshot created: ${snapshotResult.data.snapshotId}`); } // Restore aggregate from snapshot const restoreResult = await snapshotManager.restoreAggregate('user-123'); if (restoreResult.isSuccess) { console.log('Aggregate restored from snapshot:', restoreResult.data.restoredState); } } // Main demonstration function export async function runAdvancedEventPatternsDemo() { console.log('🚀 Advanced Event Patterns Demonstration'); console.log('==========================================\n'); try { await demonstrateEventReplay(); console.log(''); await demonstrateEventVersioning(); console.log(''); await demonstrateSagaOrchestration(); console.log(''); await demonstrateEventCorrelation(); console.log(''); await demonstrateEventSourcingSnapshots(); console.log(''); console.log('✅ All advanced event patterns demonstrated successfully!'); } catch (error) { console.error('❌ Error during demonstration:', error); } } // Run the demo if this file is executed directly if (require.main === module) { runAdvancedEventPatternsDemo().catch(console.error); }