{"version":3,"file":"event-subscriber-base-C_F3Ycbe.cjs","names":[],"sources":["../src/event-subscriber-base.ts"],"sourcesContent":["/**\n * EventSubscriber - Standard base class for building custom subscribers\n *\n * This is the recommended base class for creating custom events subscribers.\n * It provides production-ready features out of the box:\n *\n * **Built-in Features:**\n * - **Error Handling**: Automatic error catching with customizable handlers\n * - **Pending Request Tracking**: Ensures all requests complete during shutdown\n * - **Graceful Shutdown**: Drains pending requests before closing\n * - **Enable/Disable**: Runtime control to turn subscriber on/off\n * - **Normalized Payload**: Consistent event structure across all event types\n *\n * **When to use:**\n * - Building custom subscribers for any platform\n * - Production deployments requiring reliability\n * - Need graceful shutdown and error handling\n *\n * @example Basic usage\n * ```typescript\n * import { EventSubscriber, EventPayload } from 'autotel-subscribers';\n *\n * class SnowflakeSubscriber extends EventSubscriber {\n *   name = 'SnowflakeSubscriber';\n *   version = '1.0.0';\n *\n *   protected async sendToDestination(payload: EventPayload): Promise<void> {\n *     await snowflakeClient.execute(\n *       `INSERT INTO events VALUES (?, ?, ?)`,\n *       [payload.type, payload.name, JSON.stringify(payload.attributes)]\n *     );\n *   }\n * }\n * ```\n *\n * @example With buffering\n * ```typescript\n * class BufferedSubscriber extends EventSubscriber {\n *   name = 'BufferedSubscriber';\n *   private buffer: EventPayload[] = [];\n *\n *   protected async sendToDestination(payload: EventPayload): Promise<void> {\n *     this.buffer.push(payload);\n *\n *     if (this.buffer.length >= 100) {\n *       await this.flush();\n *     }\n *   }\n *\n *   async shutdown(): Promise<void> {\n *     await super.shutdown(); // Drain pending requests first\n *     await this.flush(); // Then flush buffer\n *   }\n *\n *   private async flush(): Promise<void> {\n *     if (this.buffer.length === 0) return;\n *\n *     const batch = [...this.buffer];\n *     this.buffer = [];\n *\n *     await apiClient.sendBatch(batch);\n *   }\n * }\n * ```\n */\n\nimport type {\n  EventSubscriber as IEventSubscriber,\n  EventAttributes,\n  EventAttributesInput,\n  FunnelStatus,\n  OutcomeStatus,\n  AutotelEventContext,\n  EventTrackingOptions,\n} from 'autotel/event-subscriber';\n\n// Re-export types for convenience\nexport type { AutotelEventContext, EventTrackingOptions } from 'autotel/event-subscriber';\n\n/**\n * Payload sent to destination\n */\nexport interface EventPayload {\n  /** Event type: 'event', 'funnel', 'outcome', or 'value' */\n  type: 'event' | 'funnel' | 'outcome' | 'value';\n\n  /** Event name or metric name */\n  name: string;\n\n  /** Optional attributes */\n  attributes?: EventAttributes;\n\n  /** For funnel events: funnel name */\n  funnel?: string;\n\n  /** For funnel events: step status (from FunnelStatus enum) */\n  step?: FunnelStatus | string;\n\n  /** For funnel events: custom step name (from trackFunnelProgression) */\n  stepName?: string;\n\n  /** For funnel events: numeric position in funnel */\n  stepNumber?: number;\n\n  /** For outcome events: operation name */\n  operation?: string;\n\n  /** For outcome events: outcome status */\n  outcome?: OutcomeStatus;\n\n  /** For value events: numeric value */\n  value?: number;\n\n  /** Timestamp (ISO 8601) */\n  timestamp: string;\n\n  /**\n   * Autotel trace context (present when events.includeTraceContext is enabled)\n   *\n   * Subscribers should map these to platform-specific field names:\n   * - PostHog: autotel.trace_id → $trace_id\n   * - Mixpanel: autotel.trace_id → trace_id\n   */\n  autotel?: AutotelEventContext;\n  /** Optional schema metadata for contract-aware subscribers. */\n  schema?: EventTrackingOptions['schema'];\n}\n\n/**\n * Standard base class for building custom events subscribers\n *\n * **What it provides:**\n * - Consistent payload structure (normalized across all event types)\n * - Enable/disable flag (runtime control)\n * - Automatic error handling (with customizable error handlers)\n * - Pending requests tracking (ensures no lost events during shutdown)\n * - Graceful shutdown (drains pending requests before closing)\n *\n * **Usage:**\n * Extend this class and implement `sendToDestination()`. All other methods\n * (trackEvent, trackFunnelStep, trackOutcome, trackValue, shutdown) are handled automatically.\n *\n * For high-throughput streaming platforms (Kafka, Kinesis, Pub/Sub), use `StreamingEventSubscriber` instead.\n */\nexport abstract class EventSubscriber implements IEventSubscriber {\n  /**\n   * Subscriber name (required for debugging)\n   */\n  abstract readonly name: string;\n\n  /**\n   * Subscriber version (optional)\n   */\n  readonly version?: string;\n\n  /**\n   * Enable/disable the subscriber (default: true)\n   */\n  protected enabled: boolean = true;\n\n  /**\n   * Track pending requests for graceful shutdown\n   */\n  private pendingRequests: Set<Promise<void>> = new Set();\n\n  /**\n   * Send payload to destination\n   *\n   * Override this method to implement your destination-specific logic.\n   * This is called for all event types (event, funnel, outcome, value).\n   *\n   * @param payload - Normalized event payload\n   */\n  protected abstract sendToDestination(payload: EventPayload): Promise<void>;\n\n  /**\n   * Optional: Handle errors\n   *\n   * Override this to customize error handling (logging, retries, etc.).\n   * Default behavior: log to console.error\n   *\n   * @param error - Error that occurred\n   * @param payload - Event payload that failed\n   */\n  protected handleError(error: Error, payload: EventPayload): void {\n    console.error(\n      `[${this.name}] Failed to send ${payload.type}:`,\n      error,\n      payload,\n    );\n  }\n\n  /**\n   * Filter out undefined and null values from attributes\n   *\n   * This improves DX by allowing callers to pass objects with optional properties\n   * without having to manually filter them first.\n   *\n   * @param attributes - Input attributes (may contain undefined/null)\n   * @returns Filtered attributes with only defined values, or undefined if empty\n   *\n   * @example\n   * ```typescript\n   * const filtered = this.filterAttributes({\n   *   userId: user.id,\n   *   email: user.email,      // might be undefined\n   *   plan: null,             // will be filtered out\n   * });\n   * // Result: { userId: 'abc', email: 'test@example.com' } or { userId: 'abc' }\n   * ```\n   */\n  protected filterAttributes(\n    attributes?: EventAttributesInput,\n  ): EventAttributes | undefined {\n    if (!attributes) return undefined;\n\n    const filtered: EventAttributes = {};\n    for (const [key, value] of Object.entries(attributes)) {\n      if (value !== undefined && value !== null) {\n        filtered[key] = value;\n      }\n    }\n\n    // Return undefined if no attributes remain after filtering\n    return Object.keys(filtered).length > 0 ? filtered : undefined;\n  }\n\n  /**\n   * Track an event\n   */\n  async trackEvent(\n    name: string,\n    attributes?: EventAttributes,\n    options?: EventTrackingOptions,\n  ): Promise<void> {\n    if (!this.enabled) return;\n\n    const payload: EventPayload = {\n      type: 'event',\n      name,\n      attributes,\n      timestamp: new Date().toISOString(),\n      autotel: options?.autotel,\n      schema: options?.schema,\n    };\n\n    await this.send(payload);\n  }\n\n  /**\n   * Track a funnel step\n   */\n  async trackFunnelStep(\n    funnelName: string,\n    step: FunnelStatus,\n    attributes?: EventAttributes,\n    options?: EventTrackingOptions,\n  ): Promise<void> {\n    if (!this.enabled) return;\n\n    const payload: EventPayload = {\n      type: 'funnel',\n      name: `${funnelName}.${step}`,\n      funnel: funnelName,\n      step,\n      attributes,\n      timestamp: new Date().toISOString(),\n      autotel: options?.autotel,\n    };\n\n    await this.send(payload);\n  }\n\n  /**\n   * Track an outcome\n   */\n  async trackOutcome(\n    operationName: string,\n    outcome: OutcomeStatus,\n    attributes?: EventAttributes,\n    options?: EventTrackingOptions,\n  ): Promise<void> {\n    if (!this.enabled) return;\n\n    const payload: EventPayload = {\n      type: 'outcome',\n      name: `${operationName}.${outcome}`,\n      operation: operationName,\n      outcome,\n      attributes,\n      timestamp: new Date().toISOString(),\n      autotel: options?.autotel,\n    };\n\n    await this.send(payload);\n  }\n\n  /**\n   * Track a value/metric\n   */\n  async trackValue(\n    name: string,\n    value: number,\n    attributes?: EventAttributes,\n    options?: EventTrackingOptions,\n  ): Promise<void> {\n    if (!this.enabled) return;\n\n    const payload: EventPayload = {\n      type: 'value',\n      name,\n      value,\n      attributes,\n      timestamp: new Date().toISOString(),\n      autotel: options?.autotel,\n    };\n\n    await this.send(payload);\n  }\n\n  /**\n   * Track funnel progression with custom step names\n   *\n   * Unlike trackFunnelStep which uses FunnelStatus enum values,\n   * this method allows any string as the step name for flexible funnel tracking.\n   *\n   * @param funnelName - Name of the funnel (e.g., \"checkout\", \"onboarding\")\n   * @param stepName - Custom step name (e.g., \"cart_viewed\", \"payment_entered\")\n   * @param stepNumber - Optional numeric position in the funnel\n   * @param attributes - Optional event attributes\n   * @param options - Optional tracking options including autotel context\n   */\n  async trackFunnelProgression(\n    funnelName: string,\n    stepName: string,\n    stepNumber?: number,\n    attributes?: EventAttributes,\n    options?: EventTrackingOptions,\n  ): Promise<void> {\n    if (!this.enabled) return;\n\n    const payload: EventPayload = {\n      type: 'funnel',\n      name: `${funnelName}.${stepName}`,\n      funnel: funnelName,\n      step: stepName,\n      stepName,\n      stepNumber,\n      attributes,\n      timestamp: new Date().toISOString(),\n      autotel: options?.autotel,\n    };\n\n    await this.send(payload);\n  }\n\n  /**\n   * Flush pending requests and clean up\n   *\n   * CRITICAL: Prevents race condition during shutdown\n   * 1. Disables subscriber to stop new events\n   * 2. Drains all pending requests (with retry logic)\n   * 3. Ensures flush guarantee\n   *\n   * Override this if you need custom cleanup logic (close connections, flush buffers, etc.),\n   * but ALWAYS call super.shutdown() first to drain pending requests.\n   */\n  async shutdown(): Promise<void> {\n    // 1. Stop accepting new events (prevents race condition)\n    this.enabled = false;\n\n    // 2. Drain pending requests with retry logic\n    // Loop until empty to handle race where new requests added during Promise.allSettled\n    const maxDrainAttempts = 10;\n    const drainIntervalMs = 50;\n\n    for (let attempt = 0; attempt < maxDrainAttempts; attempt++) {\n      if (this.pendingRequests.size === 0) {\n        break;\n      }\n\n      // Wait for current batch\n      await Promise.allSettled(this.pendingRequests);\n\n      // Small delay to catch any stragglers added during allSettled\n      if (this.pendingRequests.size > 0 && attempt < maxDrainAttempts - 1) {\n        await new Promise((resolve) => setTimeout(resolve, drainIntervalMs));\n      }\n    }\n\n    // 3. Warn if we still have pending requests (shouldn't happen, but be defensive)\n    if (this.pendingRequests.size > 0) {\n      console.warn(\n        `[${this.name}] Shutdown completed with ${this.pendingRequests.size} pending requests still in-flight. ` +\n        `This may indicate a bug in the subscriber or extremely slow destination.`\n      );\n    }\n  }\n\n  /**\n   * Internal: Send payload and track request\n   */\n  private async send(payload: EventPayload): Promise<void> {\n    const request = this.sendWithErrorHandling(payload);\n    this.pendingRequests.add(request);\n\n    void request.finally(() => {\n      this.pendingRequests.delete(request);\n    });\n\n    return request;\n  }\n\n  /**\n   * Internal: Send with error handling\n   */\n  private async sendWithErrorHandling(\n    payload: EventPayload,\n  ): Promise<void> {\n    try {\n      await this.sendToDestination(payload);\n    } catch (error) {\n      this.handleError(error as Error, payload);\n    }\n  }\n}\n\nexport {\n  type EventAttributes,\n  type EventAttributesInput,\n  type FunnelStatus,\n  type OutcomeStatus,\n} from 'autotel/event-subscriber';\n"],"mappings":";;;;;;;;;;;;;;;;;;AAgJA,IAAsB,kBAAtB,MAAkE;;;;CAShE,AAAS;;;;CAKT,AAAU,UAAmB;;;;CAK7B,AAAQ,kCAAsC,IAAI,IAAI;;;;;;;;;;CAqBtD,AAAU,YAAY,OAAc,SAA6B;EAC/D,QAAQ,MACN,IAAI,KAAK,KAAK,mBAAmB,QAAQ,KAAK,IAC9C,OACA,OACF;CACF;;;;;;;;;;;;;;;;;;;;CAqBA,AAAU,iBACR,YAC6B;EAC7B,IAAI,CAAC,YAAY,OAAO;EAExB,MAAM,WAA4B,CAAC;EACnC,KAAK,MAAM,CAAC,KAAK,UAAU,OAAO,QAAQ,UAAU,GAClD,IAAI,UAAU,UAAa,UAAU,MACnC,SAAS,OAAO;EAKpB,OAAO,OAAO,KAAK,QAAQ,CAAC,CAAC,SAAS,IAAI,WAAW;CACvD;;;;CAKA,MAAM,WACJ,MACA,YACA,SACe;EACf,IAAI,CAAC,KAAK,SAAS;EAEnB,MAAM,UAAwB;GAC5B,MAAM;GACN;GACA;GACA,4BAAW,IAAI,KAAK,EAAC,CAAC,YAAY;GAClC,SAAS,SAAS;GAClB,QAAQ,SAAS;EACnB;EAEA,MAAM,KAAK,KAAK,OAAO;CACzB;;;;CAKA,MAAM,gBACJ,YACA,MACA,YACA,SACe;EACf,IAAI,CAAC,KAAK,SAAS;EAEnB,MAAM,UAAwB;GAC5B,MAAM;GACN,MAAM,GAAG,WAAW,GAAG;GACvB,QAAQ;GACR;GACA;GACA,4BAAW,IAAI,KAAK,EAAC,CAAC,YAAY;GAClC,SAAS,SAAS;EACpB;EAEA,MAAM,KAAK,KAAK,OAAO;CACzB;;;;CAKA,MAAM,aACJ,eACA,SACA,YACA,SACe;EACf,IAAI,CAAC,KAAK,SAAS;EAEnB,MAAM,UAAwB;GAC5B,MAAM;GACN,MAAM,GAAG,cAAc,GAAG;GAC1B,WAAW;GACX;GACA;GACA,4BAAW,IAAI,KAAK,EAAC,CAAC,YAAY;GAClC,SAAS,SAAS;EACpB;EAEA,MAAM,KAAK,KAAK,OAAO;CACzB;;;;CAKA,MAAM,WACJ,MACA,OACA,YACA,SACe;EACf,IAAI,CAAC,KAAK,SAAS;EAEnB,MAAM,UAAwB;GAC5B,MAAM;GACN;GACA;GACA;GACA,4BAAW,IAAI,KAAK,EAAC,CAAC,YAAY;GAClC,SAAS,SAAS;EACpB;EAEA,MAAM,KAAK,KAAK,OAAO;CACzB;;;;;;;;;;;;;CAcA,MAAM,uBACJ,YACA,UACA,YACA,YACA,SACe;EACf,IAAI,CAAC,KAAK,SAAS;EAEnB,MAAM,UAAwB;GAC5B,MAAM;GACN,MAAM,GAAG,WAAW,GAAG;GACvB,QAAQ;GACR,MAAM;GACN;GACA;GACA;GACA,4BAAW,IAAI,KAAK,EAAC,CAAC,YAAY;GAClC,SAAS,SAAS;EACpB;EAEA,MAAM,KAAK,KAAK,OAAO;CACzB;;;;;;;;;;;;CAaA,MAAM,WAA0B;EAE9B,KAAK,UAAU;EAIf,MAAM,mBAAmB;EACzB,MAAM,kBAAkB;EAExB,KAAK,IAAI,UAAU,GAAG,UAAU,kBAAkB,WAAW;GAC3D,IAAI,KAAK,gBAAgB,SAAS,GAChC;GAIF,MAAM,QAAQ,WAAW,KAAK,eAAe;GAG7C,IAAI,KAAK,gBAAgB,OAAO,KAAK,UAAU,mBAAmB,GAChE,MAAM,IAAI,SAAS,YAAY,WAAW,SAAS,eAAe,CAAC;EAEvE;EAGA,IAAI,KAAK,gBAAgB,OAAO,GAC9B,QAAQ,KACN,IAAI,KAAK,KAAK,4BAA4B,KAAK,gBAAgB,KAAK,4GAEtE;CAEJ;;;;CAKA,MAAc,KAAK,SAAsC;EACvD,MAAM,UAAU,KAAK,sBAAsB,OAAO;EAClD,KAAK,gBAAgB,IAAI,OAAO;EAEhC,AAAK,QAAQ,cAAc;GACzB,KAAK,gBAAgB,OAAO,OAAO;EACrC,CAAC;EAED,OAAO;CACT;;;;CAKA,MAAc,sBACZ,SACe;EACf,IAAI;GACF,MAAM,KAAK,kBAAkB,OAAO;EACtC,SAAS,OAAO;GACd,KAAK,YAAY,OAAgB,OAAO;EAC1C;CACF;AACF"}