/// import { EventBase } from "./event-base"; export type StartPosition = { type: "beginning"; } | { type: "latest"; } | { type: "timestamp"; timestamp: number; } | { type: "offset"; partition: number; offset: string; }; export type SeekTarget = { type: "timestamp"; timestamp: number; partition?: number; } | { type: "offset"; partition: number; offset: string; } | { type: "beginning"; partition?: number; } | { type: "latest"; partition?: number; }; export type ConsumptionScenario = "single" | "concurrent" | "batch"; export interface BackpressureOptions { maxBytes?: number; maxBytesPerPartition?: number; minBytes?: number; maxWaitTimeMs?: number; partitionsConsumedConcurrently?: number; maxInFlight?: number; memoryGuard?: MemoryGuardOptions; } export interface MemoryGuardOptions { enabled?: boolean; metric?: "rss" | "heapUsed" | "external"; checkIntervalMs?: number; pauseAboveBytes?: number; resumeBelowBytes?: number; pauseAboveHeapFraction?: number; resumeBelowHeapFraction?: number; shouldPause?: (usage: NodeJS.MemoryUsage) => boolean; } export interface CommitOptions { autoCommit?: boolean; autoCommitIntervalMs?: number; autoCommitThreshold?: number; } export interface BatchOptions { maxSize?: number; maxWaitMs?: number; } export interface MessageControls { ack(): Promise; nack(requeue?: boolean): Promise; heartbeat(): Promise; pause(): void; resume(): void; } export interface BatchContext> { events: EventBase[]; resolve(event: EventBase): void; commit(): Promise; heartbeat(): Promise; isRunning(): boolean; isStale(): boolean; pause(): () => void; } export type SingleMessageHandler> = (event: EventBase, controls: MessageControls) => Promise; export type BatchMessageHandler> = (context: BatchContext) => Promise; export interface ConsumerCallbacks> { onError?: (error: Error, event?: EventBase) => void; onRebalance?: (info: { assigned?: unknown; revoked?: unknown; }) => void; onPause?: (reason: "backpressure" | "memory" | "manual") => void; onResume?: (reason: "backpressure" | "memory" | "manual") => void; } export interface BaseConsumerOptions> { groupId?: string; startFrom?: StartPosition; backpressure?: BackpressureOptions; commit?: CommitOptions; callbacks?: ConsumerCallbacks; } export interface SingleConsumerOptions> extends BaseConsumerOptions { scenario?: "single"; handler: SingleMessageHandler; } export interface ConcurrentConsumerOptions> extends BaseConsumerOptions { scenario: "concurrent"; concurrency: number; handler: SingleMessageHandler; } export interface BatchConsumerOptions> extends BaseConsumerOptions { scenario: "batch"; batch?: BatchOptions; handler: BatchMessageHandler; } export type ConsumerOptions> = SingleConsumerOptions | ConcurrentConsumerOptions | BatchConsumerOptions; export interface PartitionLag { topic: string; partition: number; lag: number; committedOffset?: string; logEndOffset?: string; } export interface ConsumerMetrics { topic?: string; groupId?: string; inFlight: number; processed: number; failed: number; retried: number; lastCommittedAt?: number; throughputPerSec?: number; lag?: PartitionLag[]; totalLag?: number; } export interface GracefulShutdownOptions { timeoutMs?: number; commitOnShutdown?: boolean; onDraining?: () => void; onTimeout?: (inFlight: number) => void; }