///
import { EventBase } from "./event-base";
export type StartPosition = {
type: "beginning";
} | {
type: "latest";
} | {
type: "timestamp";
timestamp: number;
} | {
type: "offset";
partition: number;
offset: string;
};
export type SeekTarget = {
type: "timestamp";
timestamp: number;
partition?: number;
} | {
type: "offset";
partition: number;
offset: string;
} | {
type: "beginning";
partition?: number;
} | {
type: "latest";
partition?: number;
};
export type ConsumptionScenario = "single" | "concurrent" | "batch";
export interface BackpressureOptions {
maxBytes?: number;
maxBytesPerPartition?: number;
minBytes?: number;
maxWaitTimeMs?: number;
partitionsConsumedConcurrently?: number;
maxInFlight?: number;
memoryGuard?: MemoryGuardOptions;
}
export interface MemoryGuardOptions {
enabled?: boolean;
metric?: "rss" | "heapUsed" | "external";
checkIntervalMs?: number;
pauseAboveBytes?: number;
resumeBelowBytes?: number;
pauseAboveHeapFraction?: number;
resumeBelowHeapFraction?: number;
shouldPause?: (usage: NodeJS.MemoryUsage) => boolean;
}
export interface CommitOptions {
autoCommit?: boolean;
autoCommitIntervalMs?: number;
autoCommitThreshold?: number;
}
export interface BatchOptions {
maxSize?: number;
maxWaitMs?: number;
}
export interface MessageControls {
ack(): Promise;
nack(requeue?: boolean): Promise;
heartbeat(): Promise;
pause(): void;
resume(): void;
}
export interface BatchContext> {
events: EventBase[];
resolve(event: EventBase): void;
commit(): Promise;
heartbeat(): Promise;
isRunning(): boolean;
isStale(): boolean;
pause(): () => void;
}
export type SingleMessageHandler> = (event: EventBase, controls: MessageControls) => Promise;
export type BatchMessageHandler> = (context: BatchContext) => Promise;
export interface ConsumerCallbacks> {
onError?: (error: Error, event?: EventBase) => void;
onRebalance?: (info: {
assigned?: unknown;
revoked?: unknown;
}) => void;
onPause?: (reason: "backpressure" | "memory" | "manual") => void;
onResume?: (reason: "backpressure" | "memory" | "manual") => void;
}
export interface BaseConsumerOptions> {
groupId?: string;
startFrom?: StartPosition;
backpressure?: BackpressureOptions;
commit?: CommitOptions;
callbacks?: ConsumerCallbacks;
}
export interface SingleConsumerOptions> extends BaseConsumerOptions {
scenario?: "single";
handler: SingleMessageHandler;
}
export interface ConcurrentConsumerOptions> extends BaseConsumerOptions {
scenario: "concurrent";
concurrency: number;
handler: SingleMessageHandler;
}
export interface BatchConsumerOptions> extends BaseConsumerOptions {
scenario: "batch";
batch?: BatchOptions;
handler: BatchMessageHandler;
}
export type ConsumerOptions> = SingleConsumerOptions | ConcurrentConsumerOptions | BatchConsumerOptions;
export interface PartitionLag {
topic: string;
partition: number;
lag: number;
committedOffset?: string;
logEndOffset?: string;
}
export interface ConsumerMetrics {
topic?: string;
groupId?: string;
inFlight: number;
processed: number;
failed: number;
retried: number;
lastCommittedAt?: number;
throughputPerSec?: number;
lag?: PartitionLag[];
totalLag?: number;
}
export interface GracefulShutdownOptions {
timeoutMs?: number;
commitOnShutdown?: boolean;
onDraining?: () => void;
onTimeout?: (inFlight: number) => void;
}