# ETL Patterns Template
# Purpose: Comprehensive collection of ETL design patterns and best practices
# Version: 1.0.0
# Last Updated: 2025-01-23

metadata:
  template_id: "etl-patterns"
  version: "1.0.0"
  name: "ETL Patterns Template"
  description: "Design patterns and best practices for ETL pipeline development"
  category: "data-engineering"
  tags:
    - etl-patterns
    - data-pipeline
    - design-patterns
    - best-practices
    - data-integration
  owner: "Data Engineering Team"
  created_date: "2025-01-23"
  last_modified: "2025-01-23"
  compliance:
    - ISO-27001
    - SOC2
  dependencies:
    - data-pipeline-tmpl
    - quality-checks-tmpl

template:
  structure:
    - pattern_overview
    - extraction_patterns
    - transformation_patterns
    - loading_patterns
    - error_handling_patterns
    - monitoring_patterns
    - performance_patterns
    - scalability_patterns
    - integration_patterns
    - governance_patterns

sections:
  pattern_overview:
    pattern_classification:
      architectural_patterns:
        - "Batch Processing Patterns"
        - "Stream Processing Patterns"
        - "Hybrid Processing Patterns"
        - "Lambda Architecture Patterns"
        - "Kappa Architecture Patterns"
      
      functional_patterns:
        - "Data Extraction Patterns"
        - "Data Transformation Patterns"
        - "Data Loading Patterns"
        - "Data Quality Patterns"
        - "Error Handling Patterns"
      
      operational_patterns:
        - "Monitoring and Alerting Patterns"
        - "Recovery and Restart Patterns"
        - "Performance Optimization Patterns"
        - "Scalability Patterns"
    
    pattern_selection_criteria:
      data_characteristics:
        - volume: "Small/Medium/Large/Big Data"
        - velocity: "Batch/Micro-batch/Real-time/Streaming"
        - variety: "Structured/Semi-structured/Unstructured"
        - veracity: "High/Medium/Low quality"
      
      business_requirements:
        - latency_requirements: "Real-time/Near real-time/Batch"
        - consistency_requirements: "Strong/Eventual consistency"
        - availability_requirements: "High/Standard availability"
        - compliance_requirements: "Regulatory compliance needs"
      
      technical_constraints:
        - infrastructure: "On-premises/Cloud/Hybrid"
        - resources: "Available compute and storage"
        - skills: "Team expertise and capabilities"
        - budget: "Cost constraints and optimization"

  extraction_patterns:
    full_extraction_pattern:
      pattern_name: "Full Table Extract"
      description: "Extract complete dataset from source system"
      
      use_cases:
        - "Small to medium datasets"
        - "Initial data loads"
        - "Systems without change tracking"
        - "Reference data synchronization"
      
      implementation:
        approach: "SELECT * FROM source_table"
        advantages:
          - "Simple to implement"
          - "Ensures data completeness"
          - "No dependency on change tracking"
        disadvantages:
          - "High resource consumption"
          - "Long processing times"
          - "Network bandwidth intensive"
      
      best_practices:
        - "Schedule during low-usage periods"
        - "Implement data compression"
        - "Use parallel extraction for large tables"
        - "Monitor extraction performance"
      
      code_example: |
        ```sql
        -- Full extraction with optimization
        SELECT 
            *,
            CURRENT_TIMESTAMP as extraction_timestamp
        FROM source_system.large_table
        WHERE created_date >= '2023-01-01'
        ORDER BY primary_key
        ```
    
    incremental_extraction_pattern:
      pattern_name: "Incremental Extract"
      description: "Extract only changed or new records since last extraction"
      
      variations:
        timestamp_based:
          description: "Use timestamp columns to identify changes"
          implementation: "WHERE last_modified > :last_extraction_time"
          advantages:
            - "Efficient data transfer"
            - "Reduced processing time"
            - "Lower system impact"
          challenges:
            - "Requires reliable timestamp columns"
            - "Handling of deletions"
            - "Clock synchronization issues"
        
        change_data_capture:
          description: "Capture changes using database CDC features"
          implementation: "Database transaction log mining"
          advantages:
            - "Real-time change detection"
            - "Captures all DML operations"
            - "Minimal source system impact"
          challenges:
            - "Complex setup and configuration"
            - "Database-specific implementations"
            - "Log retention management"
        
        version_based:
          description: "Use version numbers to track changes"
          implementation: "WHERE version > :last_processed_version"
          advantages:
            - "Reliable change tracking"
            - "Support for concurrent updates"
            - "Clear audit trail"
          challenges:
            - "Requires application changes"
            - "Version number management"
            - "Schema modifications needed"
      
      best_practices:
        - "Maintain extraction state/watermarks"
        - "Handle edge cases (clock drift, timezone issues)"
        - "Implement overlap windows for safety"
        - "Monitor extraction completeness"
      
      code_example: |
        ```python
        def incremental_extract(last_extraction_timestamp):
            query = """
            SELECT *
            FROM source_table 
            WHERE last_modified > %s
            AND last_modified <= CURRENT_TIMESTAMP - INTERVAL '5 minutes'
            ORDER BY last_modified
            """
            return execute_query(query, [last_extraction_timestamp])
        ```
    
    streaming_extraction_pattern:
      pattern_name: "Stream Processing Extract"
      description: "Continuous extraction from streaming data sources"
      
      implementation_approaches:
        event_driven:
          description: "Process events as they arrive"
          technologies: ["Apache Kafka", "Amazon Kinesis", "Azure Event Hubs"]
          pattern: "Subscribe to event streams and process in real-time"
        
        micro_batch:
          description: "Process small batches at frequent intervals"
          technologies: ["Apache Spark Streaming", "Apache Flink"]
          pattern: "Collect events in small time windows"
      
      considerations:
        - "Event ordering and delivery guarantees"
        - "Backpressure handling"
        - "Fault tolerance and recovery"
        - "Exactly-once processing semantics"

  transformation_patterns:
    data_mapping_pattern:
      pattern_name: "Field Mapping and Conversion"
      description: "Transform source fields to target schema"
      
      mapping_types:
        direct_mapping:
          description: "One-to-one field mapping"
          example: "source.customer_id -> target.cust_id"
        
        calculated_mapping:
          description: "Derived fields using expressions"
          example: "CONCAT(first_name, ' ', last_name) -> full_name"
        
        lookup_mapping:
          description: "Reference data enrichment"
          example: "country_code -> country_name via lookup table"
      
      code_example: |
        ```python
        def apply_field_mappings(source_record, mapping_config):
            target_record = {}
            for target_field, mapping in mapping_config.items():
                if mapping['type'] == 'direct':
                    target_record[target_field] = source_record[mapping['source_field']]
                elif mapping['type'] == 'calculated':
                    target_record[target_field] = eval(mapping['expression'])
                elif mapping['type'] == 'lookup':
                    target_record[target_field] = lookup_table[source_record[mapping['key']]]
            return target_record
        ```
    
    data_validation_pattern:
      pattern_name: "Data Quality Validation"
      description: "Validate data during transformation process"
      
      validation_types:
        schema_validation:
          description: "Validate data types and structure"
          checks:
            - "Data type conformity"
            - "Required field presence"
            - "Field length constraints"
            - "Format validation (email, phone, etc.)"
        
        business_rule_validation:
          description: "Validate business logic constraints"
          checks:
            - "Range validations"
            - "Cross-field validations"
            - "Referential integrity"
            - "Custom business rules"
        
        statistical_validation:
          description: "Detect anomalies using statistical methods"
          checks:
            - "Outlier detection"
            - "Distribution analysis"
            - "Trend validation"
            - "Volume checks"
      
      error_handling_strategies:
        - reject_record: "Remove invalid records"
        - quarantine_record: "Store in error table for review"
        - default_value: "Apply default values for missing data"
        - log_and_continue: "Log errors but continue processing"
    
    aggregation_pattern:
      pattern_name: "Data Aggregation and Summarization"
      description: "Aggregate source data for analytical purposes"
      
      aggregation_strategies:
        temporal_aggregation:
          description: "Aggregate by time periods"
          examples:
            - "Daily/Weekly/Monthly summaries"
            - "Rolling window aggregations"
            - "Time-based metrics calculation"
        
        dimensional_aggregation:
          description: "Aggregate by business dimensions"
          examples:
            - "Customer-level aggregations"
            - "Product category summaries"
            - "Geographic rollups"
      
      implementation_patterns:
        pre_aggregation:
          description: "Calculate aggregates during ETL"
          advantages: "Faster query performance"
          disadvantages: "Storage overhead"
        
        on_demand_aggregation:
          description: "Calculate aggregates at query time"
          advantages: "Fresh data, flexible"
          disadvantages: "Query performance impact"
      
      code_example: |
        ```sql
        -- Temporal aggregation example
        SELECT 
            DATE_TRUNC('day', transaction_date) as day,
            customer_id,
            COUNT(*) as transaction_count,
            SUM(amount) as total_amount,
            AVG(amount) as avg_amount
        FROM transactions
        WHERE transaction_date >= CURRENT_DATE - INTERVAL '30 days'
        GROUP BY DATE_TRUNC('day', transaction_date), customer_id
        ```
    
    slowly_changing_dimension_pattern:
      pattern_name: "Slowly Changing Dimensions (SCD)"
      description: "Handle dimension changes over time"
      
      scd_types:
        type_1_overwrite:
          description: "Overwrite existing values"
          use_case: "When historical values are not important"
          implementation: "UPDATE existing records with new values"
        
        type_2_versioning:
          description: "Create new records for changes"
          use_case: "When history must be preserved"
          implementation: "Insert new record, update effective dates"
        
        type_3_partial_history:
          description: "Store previous value in separate column"
          use_case: "When limited history is sufficient"
          implementation: "Add previous_value columns"
      
      code_example: |
        ```sql
        -- SCD Type 2 implementation
        INSERT INTO dim_customer (
            customer_key, customer_id, name, address,
            effective_date, end_date, is_current
        )
        SELECT 
            nextval('customer_key_seq'),
            s.customer_id,
            s.name,
            s.address,
            CURRENT_DATE,
            '9999-12-31',
            true
        FROM staging_customer s
        LEFT JOIN dim_customer d ON s.customer_id = d.customer_id AND d.is_current = true
        WHERE d.customer_id IS NULL OR s.address != d.address;
        
        -- Update existing records
        UPDATE dim_customer 
        SET end_date = CURRENT_DATE - 1, is_current = false
        WHERE customer_id IN (SELECT customer_id FROM staging_customer)
        AND is_current = true;
        ```

  loading_patterns:
    bulk_loading_pattern:
      pattern_name: "Bulk Data Loading"
      description: "Efficiently load large volumes of data"
      
      loading_strategies:
        bulk_insert:
          description: "Use database bulk loading utilities"
          technologies: ["COPY command", "BULK INSERT", "LOAD DATA"]
          advantages:
            - "High performance"
            - "Optimized for large datasets"
            - "Minimal logging overhead"
          considerations:
            - "Requires exclusive table access"
            - "Limited error handling"
            - "Rollback complexity"
        
        batch_insert:
          description: "Insert multiple records in single statement"
          implementation: "INSERT INTO ... VALUES (...), (...), (...)"
          advantages:
            - "Better than single-row inserts"
            - "Good error handling"
            - "Transactional support"
          considerations:
            - "Batch size optimization"
            - "Memory usage management"
            - "Network packet size limits"
      
      optimization_techniques:
        - "Disable indexes during load, rebuild after"
        - "Use parallel loading processes"
        - "Partition tables for parallel access"
        - "Optimize database configuration for loading"
      
      code_example: |
        ```python
        def bulk_load_data(data_file, table_name, connection):
            # Disable indexes and constraints
            connection.execute(f"ALTER TABLE {table_name} DISABLE TRIGGER ALL")
            connection.execute(f"DROP INDEX IF EXISTS idx_{table_name}_*")
            
            # Bulk load data
            copy_sql = f"""
            COPY {table_name} FROM '{data_file}' 
            WITH (FORMAT CSV, HEADER TRUE, DELIMITER ',')
            """
            connection.execute(copy_sql)
            
            # Rebuild indexes and enable constraints
            connection.execute(f"CREATE INDEX idx_{table_name}_id ON {table_name}(id)")
            connection.execute(f"ALTER TABLE {table_name} ENABLE TRIGGER ALL")
        ```
    
    upsert_pattern:
      pattern_name: "Insert or Update (Upsert)"
      description: "Handle both new records and updates to existing records"
      
      implementation_approaches:
        merge_statement:
          description: "Use SQL MERGE statement"
          database_support: ["SQL Server", "Oracle", "PostgreSQL"]
          example: "MERGE target USING source ON key WHEN MATCHED THEN UPDATE WHEN NOT MATCHED THEN INSERT"
        
        insert_on_duplicate_key:
          description: "Use INSERT with duplicate key handling"
          database_support: ["MySQL", "PostgreSQL"]
          example: "INSERT ... ON DUPLICATE KEY UPDATE"
        
        application_logic:
          description: "Handle upsert logic in application code"
          approach: "Check existence, then INSERT or UPDATE"
          advantages: "Database-agnostic, flexible logic"
          disadvantages: "Higher complexity, performance overhead"
      
      considerations:
        - "Concurrent access handling"
        - "Performance with large datasets"
        - "Key selection and uniqueness"
        - "Partial update strategies"
    
    partition_loading_pattern:
      pattern_name: "Partition-Based Loading"
      description: "Load data into partitioned tables efficiently"
      
      partitioning_strategies:
        time_based_partitioning:
          description: "Partition by date/time columns"
          benefits:
            - "Efficient historical data queries"
            - "Easy data archival and purging"
            - "Parallel loading by partition"
          implementation: "Partition by day/month/year"
        
        hash_partitioning:
          description: "Partition by hash of key column"
          benefits:
            - "Even data distribution"
            - "Parallel processing"
            - "Scalable access patterns"
          implementation: "Partition by hash(customer_id)"
      
      loading_optimization:
        - "Load into specific partitions"
        - "Use partition-wise joins"
        - "Implement partition pruning"
        - "Manage partition metadata"

  error_handling_patterns:
    circuit_breaker_pattern:
      pattern_name: "Circuit Breaker"
      description: "Prevent cascade failures in ETL pipelines"
      
      implementation:
        states:
          - "Closed: Normal operation"
          - "Open: Failing fast, not attempting calls"
          - "Half-Open: Testing if service recovered"
        
        configuration:
          failure_threshold: "Number of failures before opening"
          timeout: "Time to wait before trying again"
          success_threshold: "Successes needed to close circuit"
      
      code_example: |
        ```python
        class CircuitBreaker:
            def __init__(self, failure_threshold=5, timeout=60):
                self.failure_threshold = failure_threshold
                self.timeout = timeout
                self.failure_count = 0
                self.last_failure_time = None
                self.state = 'CLOSED'
            
            def call(self, func, *args, **kwargs):
                if self.state == 'OPEN':
                    if time.time() - self.last_failure_time > self.timeout:
                        self.state = 'HALF_OPEN'
                    else:
                        raise Exception("Circuit breaker is OPEN")
                
                try:
                    result = func(*args, **kwargs)
                    self.on_success()
                    return result
                except Exception as e:
                    self.on_failure()
                    raise e
        ```
    
    retry_pattern:
      pattern_name: "Retry with Backoff"
      description: "Retry failed operations with increasing delays"
      
      retry_strategies:
        fixed_interval:
          description: "Wait fixed time between retries"
          use_case: "Simple scenarios with predictable failures"
        
        exponential_backoff:
          description: "Exponentially increase wait time"
          use_case: "Network failures, rate limiting"
          formula: "wait_time = base_delay * (2 ^ attempt_number)"
        
        jittered_backoff:
          description: "Add randomness to prevent thundering herd"
          use_case: "Multiple concurrent processes"
          formula: "wait_time = base_delay + random(0, jitter_amount)"
      
      implementation_considerations:
        - "Maximum retry attempts"
        - "Timeout configurations"
        - "Idempotent operations only"
        - "Error categorization (retryable vs non-retryable)"
    
    dead_letter_queue_pattern:
      pattern_name: "Dead Letter Queue"
      description: "Handle permanently failed messages"
      
      implementation:
        message_flow:
          1: "Process message from main queue"
          2: "If processing fails, increment retry count"
          3: "If retry limit exceeded, move to DLQ"
          4: "Monitor and process DLQ messages separately"
        
        dlq_processing:
          - "Manual review and correction"
          - "Automated reprocessing after fixes"
          - "Alert generation for critical failures"
          - "Analytics on failure patterns"

  monitoring_patterns:
    data_lineage_tracking_pattern:
      pattern_name: "Data Lineage Tracking"
      description: "Track data flow through ETL processes"
      
      tracking_levels:
        job_level:
          description: "Track ETL job executions"
          information: "Start time, end time, status, record counts"
        
        dataset_level:
          description: "Track dataset transformations"
          information: "Source datasets, transformations applied, output datasets"
        
        record_level:
          description: "Track individual record transformations"
          information: "Field mappings, transformations, data quality results"
      
      implementation_approach:
        metadata_capture:
          - "Instrument ETL code with logging"
          - "Capture transformation metadata"
          - "Store lineage information in metadata repository"
        
        visualization:
          - "Create lineage diagrams"
          - "Build impact analysis tools"
          - "Provide search and discovery interfaces"
    
    performance_monitoring_pattern:
      pattern_name: "ETL Performance Monitoring"
      description: "Monitor and optimize ETL performance"
      
      key_metrics:
        throughput_metrics:
          - "Records processed per hour"
          - "Data volume processed"
          - "Processing rate trends"
        
        latency_metrics:
          - "End-to-end processing time"
          - "Step-by-step timing"
          - "Bottleneck identification"
        
        resource_metrics:
          - "CPU and memory utilization"
          - "I/O operations and throughput"
          - "Network bandwidth usage"
      
      alerting_strategies:
        threshold_based:
          description: "Alert when metrics exceed thresholds"
          examples: "Processing time > 2 hours, Error rate > 5%"
        
        anomaly_based:
          description: "Alert on unusual patterns"
          examples: "Unexpected data volume changes, Performance degradation"

  performance_patterns:
    parallel_processing_pattern:
      pattern_name: "Parallel ETL Processing"
      description: "Process data in parallel to improve performance"
      
      parallelization_strategies:
        data_parallelism:
          description: "Split data across multiple workers"
          approaches:
            - "Partition by date ranges"
            - "Hash-based data splitting"
            - "Round-robin distribution"
        
        task_parallelism:
          description: "Execute different tasks simultaneously"
          approaches:
            - "Independent pipeline stages"
            - "Parallel dimension loading"
            - "Concurrent quality checks"
      
      coordination_mechanisms:
        - "Message queues for task distribution"
        - "Shared state management"
        - "Dependency resolution"
        - "Result aggregation"
      
      code_example: |
        ```python
        from concurrent.futures import ThreadPoolExecutor
        import multiprocessing as mp
        
        def parallel_etl_processing(data_partitions):
            with ThreadPoolExecutor(max_workers=mp.cpu_count()) as executor:
                futures = []
                for partition in data_partitions:
                    future = executor.submit(process_partition, partition)
                    futures.append(future)
                
                results = []
                for future in futures:
                    results.append(future.result())
                
                return combine_results(results)
        ```
    
    caching_pattern:
      pattern_name: "Data Caching"
      description: "Cache frequently accessed data to improve performance"
      
      caching_strategies:
        lookup_data_caching:
          description: "Cache reference/lookup tables"
          implementation: "Load lookup tables into memory at start"
          benefits: "Avoid repeated database lookups"
        
        result_caching:
          description: "Cache transformation results"
          implementation: "Store computed results with TTL"
          benefits: "Avoid redundant calculations"
        
        connection_pooling:
          description: "Cache database connections"
          implementation: "Maintain pool of reusable connections"
          benefits: "Reduce connection overhead"
      
      cache_management:
        - "Cache invalidation strategies"
        - "Memory usage monitoring"
        - "Cache hit ratio optimization"
        - "Distributed caching considerations"

  scalability_patterns:
    horizontal_scaling_pattern:
      pattern_name: "Scale-Out ETL Architecture"
      description: "Scale ETL processes across multiple nodes"
      
      distribution_strategies:
        master_worker:
          description: "Central coordinator with worker nodes"
          components:
            - "Master node: Task distribution and coordination"
            - "Worker nodes: Data processing execution"
            - "Shared storage: Data and metadata"
        
        peer_to_peer:
          description: "Distributed processing without central coordinator"
          components:
            - "Each node handles subset of data"
            - "Consensus mechanisms for coordination"
            - "Fault tolerance through replication"
      
      data_distribution:
        - "Consistent hashing for data partitioning"
        - "Dynamic load balancing"
        - "Fault tolerance and recovery"
        - "State synchronization"
    
    elastic_scaling_pattern:
      pattern_name: "Auto-Scaling ETL Resources"
      description: "Automatically adjust resources based on workload"
      
      scaling_triggers:
        - "Queue depth monitoring"
        - "Processing latency thresholds"
        - "Resource utilization metrics"
        - "Scheduled scaling for known peaks"
      
      scaling_strategies:
        reactive_scaling:
          description: "Scale after detecting load changes"
          advantages: "Cost-effective, responsive"
          disadvantages: "Delay in scaling response"
        
        predictive_scaling:
          description: "Scale based on predicted load"
          advantages: "Proactive, no delay"
          disadvantages: "Complex, may over-provision"

template_guidance:
  pattern_selection_guide:
    small_datasets:
      recommended_patterns:
        - "Full extraction pattern"
        - "Simple transformation patterns"
        - "Bulk loading pattern"
      considerations:
        - "Simplicity over optimization"
        - "Lower operational overhead"
        - "Standard SQL transformations"
    
    large_datasets:
      recommended_patterns:
        - "Incremental extraction pattern"
        - "Parallel processing pattern"
        - "Partition loading pattern"
      considerations:
        - "Performance optimization critical"
        - "Resource management important"
        - "Error handling complexity"
    
    real_time_requirements:
      recommended_patterns:
        - "Streaming extraction pattern"
        - "Micro-batch processing"
        - "Event-driven architecture"
      considerations:
        - "Low latency requirements"
        - "Fault tolerance essential"
        - "Exactly-once processing"
  
  integration_points:
    - monitoring_setup: "Implement recommended monitoring patterns"
    - quality_checks: "Integrate data validation patterns"
    - infrastructure: "Align with scalability patterns"
    - governance: "Apply governance patterns consistently"

template_metadata:
  recommended_review_cycle: "Quarterly pattern assessment"
  minimum_fields:
    - pattern_overview
    - extraction_patterns
    - transformation_patterns
    - loading_patterns
    - error_handling_patterns
  
  automation_potential:
    - "Pattern template generation"
    - "Code scaffolding from patterns"
    - "Performance pattern suggestions"
    - "Best practice validation"
  
  success_metrics:
    - "Pattern adoption rate"
    - "ETL performance improvements"
    - "Error reduction percentage"
    - "Development time reduction"