{"version":3,"file":"index.cjs","names":["EventSubscriber"],"sources":["../src/streaming-event-subscriber.ts"],"sourcesContent":["/**\n * Streaming Events Subscriber Base Class\n *\n * Specialized base class for high-throughput streaming platforms like\n * Kafka, Kinesis, Pub/Sub, etc.\n *\n * Extends EventSubscriber with streaming-specific features:\n * - Partitioning strategy for ordered delivery\n * - Buffer overflow handling (drop/block/disk)\n * - High-throughput optimizations\n * - Backpressure signaling\n *\n * @example Kafka Streaming Subscriber\n * ```typescript\n * import { StreamingEventSubscriber } from 'autotel-subscribers/streaming-event-subscriber';\n *\n * class KafkaSubscriber extends StreamingEventSubscriber {\n *   name = 'KafkaSubscriber';\n *   version = '1.0.0';\n *\n *   constructor(config: KafkaConfig) {\n *     super({\n *       maxBufferSize: 10000,\n *       bufferOverflowStrategy: 'block',\n *       maxBatchSize: 500\n *     });\n *   }\n *\n *   protected getPartitionKey(payload: EventPayload): string {\n *     // Partition by userId for ordered events per user\n *     return payload.attributes?.userId || 'default';\n *   }\n *\n *   protected async sendBatch(events: EventPayload[]): Promise<void> {\n *     await this.producer.send({\n *       topic: this.topic,\n *       messages: events.map(e => ({\n *         key: this.getPartitionKey(e),\n *         value: JSON.stringify(e)\n *       }))\n *     });\n *   }\n * }\n * ```\n */\n\nimport {\n  EventSubscriber,\n  type EventPayload,\n} from './event-subscriber-base';\n\n/**\n * Buffer overflow strategy\n *\n * - 'drop': Drop new events when buffer is full (prevents blocking, but loses data)\n * - 'block': Wait for space in buffer (backpressure, may slow application)\n * - 'disk': Spill to disk when memory buffer full (reliable, but complex - not implemented yet)\n */\nexport type BufferOverflowStrategy = 'drop' | 'block' | 'disk';\n\n/**\n * Streaming subscriber configuration\n */\nexport interface StreamingSubscriberConfig {\n  /** Maximum buffer size before triggering overflow strategy (default: 10000) */\n  maxBufferSize?: number;\n\n  /** Strategy when buffer is full (default: 'block') */\n  bufferOverflowStrategy?: BufferOverflowStrategy;\n\n  /** Maximum batch size for sending (default: 500) */\n  maxBatchSize?: number;\n\n  /** Flush interval in milliseconds (default: 1000) */\n  flushIntervalMs?: number;\n\n  /** Enable compression (default: false) */\n  compressionEnabled?: boolean;\n}\n\n/**\n * Buffer status for monitoring\n */\nexport interface BufferStatus {\n  /** Current number of events in buffer */\n  size: number;\n\n  /** Maximum capacity */\n  capacity: number;\n\n  /** Utilization percentage (0-100) */\n  utilization: number;\n\n  /** Is buffer near full (>80%) */\n  isNearFull: boolean;\n\n  /** Is buffer full (100%) */\n  isFull: boolean;\n}\n\n/**\n * Streaming Events Subscriber Base Class\n *\n * Provides streaming-specific patterns on top of EventSubscriber.\n */\nexport abstract class StreamingEventSubscriber extends EventSubscriber {\n  protected config: Required<StreamingSubscriberConfig>;\n  protected buffer: EventPayload[] = [];\n  protected flushIntervalHandle: NodeJS.Timeout | null = null;\n  private isShuttingDown = false;\n\n  constructor(config: StreamingSubscriberConfig = {}) {\n    super();\n\n    // Set defaults\n    this.config = {\n      maxBufferSize: config.maxBufferSize ?? 10_000,\n      bufferOverflowStrategy: config.bufferOverflowStrategy ?? 'block',\n      maxBatchSize: config.maxBatchSize ?? 500,\n      flushIntervalMs: config.flushIntervalMs ?? 1000,\n      compressionEnabled: config.compressionEnabled ?? false,\n    };\n\n    // Start periodic flushing\n    this.startFlushInterval();\n  }\n\n  /**\n   * Get partition key for event\n   *\n   * Override this to implement your partitioning strategy.\n   * Events with the same partition key go to the same partition/shard.\n   *\n   * Common strategies:\n   * - By userId: Ordered events per user\n   * - By tenantId: Isolate tenants\n   * - By eventType: Group similar events\n   * - Round-robin: Load balancing\n   *\n   * @param payload - Event payload\n   * @returns Partition key (string)\n   *\n   * @example Partition by userId\n   * ```typescript\n   * protected getPartitionKey(payload: EventPayload): string {\n   *   return payload.attributes?.userId || 'default';\n   * }\n   * ```\n   */\n  protected abstract getPartitionKey(payload: EventPayload): string;\n\n  /**\n   * Send batch of events to streaming platform\n   *\n   * Override this to implement platform-specific batch sending.\n   * Called when buffer reaches maxBatchSize or flush interval triggers.\n   *\n   * @param events - Batch of events to send\n   */\n  protected abstract sendBatch(events: EventPayload[]): Promise<void>;\n\n  /**\n   * Send single event to destination (from EventSubscriber)\n   *\n   * This buffers events and sends in batches for performance.\n   * Override sendBatch() instead of this method.\n   */\n  protected async sendToDestination(payload: EventPayload): Promise<void> {\n    // Check buffer capacity before adding\n    await this.ensureBufferCapacity();\n\n    // Add to buffer\n    this.buffer.push(payload);\n\n    // Auto-flush if batch size reached\n    if (this.buffer.length >= this.config.maxBatchSize) {\n      await this.flushBuffer();\n    }\n  }\n\n  /**\n   * Ensure buffer has capacity for new event\n   *\n   * Implements buffer overflow strategy:\n   * - 'drop': Returns immediately (event will be added, oldest may be dropped)\n   * - 'block': Waits until space available (backpressure)\n   * - 'disk': Not implemented yet (would spill to disk)\n   */\n  private async ensureBufferCapacity(): Promise<void> {\n    if (this.buffer.length < this.config.maxBufferSize) {\n      return; // Has space\n    }\n\n    // Buffer is full - apply overflow strategy\n    switch (this.config.bufferOverflowStrategy) {\n      case 'drop': {\n        // Drop oldest event to make space\n        this.buffer.shift();\n        console.warn(\n          `[${this.name}] Buffer full (${this.config.maxBufferSize}), dropped oldest event`\n        );\n        break;\n      }\n\n      case 'block': {\n        // Wait for flush to complete (backpressure)\n        console.warn(\n          `[${this.name}] Buffer full (${this.config.maxBufferSize}), blocking until space available`\n        );\n        await this.flushBuffer();\n\n        // If still full after flush, wait a bit and retry\n        if (this.buffer.length >= this.config.maxBufferSize) {\n          await new Promise((resolve) => setTimeout(resolve, 100));\n          await this.ensureBufferCapacity(); // Recursive retry\n        }\n        break;\n      }\n\n      case 'disk': {\n        throw new Error(\n          `[${this.name}] Disk overflow strategy not implemented yet`\n        );\n      }\n    }\n  }\n\n  /**\n   * Flush buffer to destination\n   */\n  private async flushBuffer(): Promise<void> {\n    if (this.buffer.length === 0) return;\n\n    const batch = [...this.buffer];\n    this.buffer = [];\n\n    try {\n      await this.sendBatch(batch);\n    } catch (error) {\n      console.error(\n        `[${this.name}] Failed to send batch of ${batch.length} events:`,\n        error\n      );\n\n      // On failure, put events back in buffer (at front)\n      this.buffer.unshift(...batch);\n\n      // If we're near capacity, we need to make a decision\n      if (this.buffer.length >= this.config.maxBufferSize * 0.9 && this.config.bufferOverflowStrategy === 'drop') {\n          // Drop oldest to prevent runaway growth\n          const toDrop = Math.floor(this.config.maxBufferSize * 0.1);\n          this.buffer.splice(0, toDrop);\n          console.warn(\n            `[${this.name}] After failed flush, dropped ${toDrop} oldest events to prevent overflow`\n          );\n        }\n    }\n  }\n\n  /**\n   * Start periodic flushing\n   */\n  private startFlushInterval(): void {\n    this.flushIntervalHandle = setInterval(() => {\n      if (!this.isShuttingDown) {\n        void this.flushBuffer();\n      }\n    }, this.config.flushIntervalMs);\n  }\n\n  /**\n   * Get current buffer status (for monitoring/observability)\n   */\n  public getBufferStatus(): BufferStatus {\n    const size = this.buffer.length;\n    const capacity = this.config.maxBufferSize;\n    const utilization = Math.round((size / capacity) * 100);\n\n    return {\n      size,\n      capacity,\n      utilization,\n      isNearFull: utilization > 80,\n      isFull: utilization >= 100,\n    };\n  }\n\n  /**\n   * Shutdown with proper buffer draining\n   */\n  async shutdown(): Promise<void> {\n    this.isShuttingDown = true;\n\n    // Stop flush interval (no more automatic flushes)\n    if (this.flushIntervalHandle) {\n      clearInterval(this.flushIntervalHandle);\n      this.flushIntervalHandle = null;\n    }\n\n    // Call parent shutdown FIRST to:\n    // 1. Set enabled = false (stop accepting new events)\n    // 2. Drain any pending sendToDestination() calls\n    await super.shutdown();\n\n    // THEN flush remaining buffer\n    // (no new events can arrive after super.shutdown() disabled the subscriber)\n    await this.flushBuffer();\n  }\n\n  /**\n   * Optional: Compress payload before sending\n   *\n   * Override this if your streaming platform supports compression.\n   * Only called if compressionEnabled = true.\n   */\n  protected async compressPayload(\n    payload: string\n  ): Promise<Buffer | string> {\n    // Default: no compression\n    // Override with gzip, snappy, lz4, etc.\n    return payload;\n  }\n}\n"],"mappings":";;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;AAyGA,IAAsB,2BAAtB,cAAuDA,8CAAgB;CACrE,AAAU;CACV,AAAU,SAAyB,CAAC;CACpC,AAAU,sBAA6C;CACvD,AAAQ,iBAAiB;CAEzB,YAAY,SAAoC,CAAC,GAAG;EAClD,MAAM;EAGN,KAAK,SAAS;GACZ,eAAe,OAAO,iBAAiB;GACvC,wBAAwB,OAAO,0BAA0B;GACzD,cAAc,OAAO,gBAAgB;GACrC,iBAAiB,OAAO,mBAAmB;GAC3C,oBAAoB,OAAO,sBAAsB;EACnD;EAGA,KAAK,mBAAmB;CAC1B;;;;;;;CA0CA,MAAgB,kBAAkB,SAAsC;EAEtE,MAAM,KAAK,qBAAqB;EAGhC,KAAK,OAAO,KAAK,OAAO;EAGxB,IAAI,KAAK,OAAO,UAAU,KAAK,OAAO,cACpC,MAAM,KAAK,YAAY;CAE3B;;;;;;;;;CAUA,MAAc,uBAAsC;EAClD,IAAI,KAAK,OAAO,SAAS,KAAK,OAAO,eACnC;EAIF,QAAQ,KAAK,OAAO,wBAApB;GACE,KAAK;IAEH,KAAK,OAAO,MAAM;IAClB,QAAQ,KACN,IAAI,KAAK,KAAK,iBAAiB,KAAK,OAAO,cAAc,wBAC3D;IACA;GAGF,KAAK;IAEH,QAAQ,KACN,IAAI,KAAK,KAAK,iBAAiB,KAAK,OAAO,cAAc,kCAC3D;IACA,MAAM,KAAK,YAAY;IAGvB,IAAI,KAAK,OAAO,UAAU,KAAK,OAAO,eAAe;KACnD,MAAM,IAAI,SAAS,YAAY,WAAW,SAAS,GAAG,CAAC;KACvD,MAAM,KAAK,qBAAqB;IAClC;IACA;GAGF,KAAK,QACH,MAAM,IAAI,MACR,IAAI,KAAK,KAAK,6CAChB;EAEJ;CACF;;;;CAKA,MAAc,cAA6B;EACzC,IAAI,KAAK,OAAO,WAAW,GAAG;EAE9B,MAAM,QAAQ,CAAC,GAAG,KAAK,MAAM;EAC7B,KAAK,SAAS,CAAC;EAEf,IAAI;GACF,MAAM,KAAK,UAAU,KAAK;EAC5B,SAAS,OAAO;GACd,QAAQ,MACN,IAAI,KAAK,KAAK,4BAA4B,MAAM,OAAO,WACvD,KACF;GAGA,KAAK,OAAO,QAAQ,GAAG,KAAK;GAG5B,IAAI,KAAK,OAAO,UAAU,KAAK,OAAO,gBAAgB,MAAO,KAAK,OAAO,2BAA2B,QAAQ;IAExG,MAAM,SAAS,KAAK,MAAM,KAAK,OAAO,gBAAgB,EAAG;IACzD,KAAK,OAAO,OAAO,GAAG,MAAM;IAC5B,QAAQ,KACN,IAAI,KAAK,KAAK,gCAAgC,OAAO,mCACvD;GACF;EACJ;CACF;;;;CAKA,AAAQ,qBAA2B;EACjC,KAAK,sBAAsB,kBAAkB;GAC3C,IAAI,CAAC,KAAK,gBACR,AAAK,KAAK,YAAY;EAE1B,GAAG,KAAK,OAAO,eAAe;CAChC;;;;CAKA,AAAO,kBAAgC;EACrC,MAAM,OAAO,KAAK,OAAO;EACzB,MAAM,WAAW,KAAK,OAAO;EAC7B,MAAM,cAAc,KAAK,MAAO,OAAO,WAAY,GAAG;EAEtD,OAAO;GACL;GACA;GACA;GACA,YAAY,cAAc;GAC1B,QAAQ,eAAe;EACzB;CACF;;;;CAKA,MAAM,WAA0B;EAC9B,KAAK,iBAAiB;EAGtB,IAAI,KAAK,qBAAqB;GAC5B,cAAc,KAAK,mBAAmB;GACtC,KAAK,sBAAsB;EAC7B;EAKA,MAAM,MAAM,SAAS;EAIrB,MAAM,KAAK,YAAY;CACzB;;;;;;;CAQA,MAAgB,gBACd,SAC0B;EAG1B,OAAO;CACT;AACF"}