import type { IDatabaseConnection } from '@pgtyped/runtime'; import type { Logger } from 'pino'; import type { HeaderRecord } from 'undici-types/header.js'; import type { AbortableAsyncIterator } from './abortable-async-iterator.ts'; import type { IAssertSubscriptionParams, IFindEventsParams, IFindEventsResult, IReadNextEventsParams, IReadNextEventsResult } from './queries.ts'; import type { PgClientLike } from './query-types.ts'; export type ISplitFn = (event: IReadEvent) => IReadEvent[]; export type SerialisedEvent = { body: Buffer | string; contentType: string; }; export type WebhookInfo = T & { id: string; url: string | URL; /** * Optional signing secret for this webhook endpoint. * When provided, each request will be signed using the * Standard Webhooks spec (HMAC-SHA256). * Accepts any format supported by the `standardwebhooks` library * (e.g. raw base64 or the `whsec_...` prefixed format). * * Requires the `standardwebhooks` package to be installed: * `npm install standardwebhooks` */ signingSecret?: string; }; export type GetWebhookInfoFn = (subscriptionIds: string[]) => Promise<{ [id: string]: WebhookInfo[]; }> | { [id: string]: WebhookInfo[]; }; export type SerialiseReadEventFn = (ev: IReadEvent, logger: Logger) => SerialisedEvent; export type PgmbWebhookOpts = { /** * Maximum time to wait for webhook request to complete * @default 5 seconds */ timeoutMs?: number; headers?: HeaderRecord; /** * Configure retry intervals in seconds for failed webhook requests. * If null, a failed handler will fail the event processor. Use carefully. */ retryOpts?: IRetryHandlerOpts | null; splitBy?: ISplitFn; jsonifier?: JSONifier; serialiseEvent?: SerialiseReadEventFn; /** * Minimum size in bytes for the payload to be compressed. * @default 1024 */ minCompressSizeBytes?: number; /** * Compress the webhook payload before sending. * By default, uses gzip compression via the `zlib` library. */ compress?(data: Uint8Array | string): Promise<{ data: Uint8Array | string; contentEncoding: string; }>; }; export interface IEventData { topic: string; payload: unknown; metadata?: unknown; } export type IEvent = (T & { id: string; }); export type PGMBEventBatcherOpts = { /** * Whether a particular published message should be logged. * By default, all messages are logged -- in case of certain * failures, the logs can be used to replay the messages. */ shouldLog?(msg: T): boolean; publish(...msgs: T[]): Promise<{ id: string; }[]>; logger?: Logger; /** * Automatically flush after this interval. * Set to undefined or 0 to disable. Will need to * manually call `flush()` to publish messages. * @default undefined */ flushIntervalMs?: number; /** * Max number of messages to send in a batch * @default 2500 */ maxBatchSize?: number; }; export type IReadNextEventsFn = (parmas: IReadNextEventsParams, db: IDatabaseConnection) => Promise; export type IFindEventsFn = (parmas: IFindEventsParams, db: IDatabaseConnection) => Promise; export type Pgmb2ClientOpts = { client: PgClientLike; /** * Globally unique identifier for this Pgmb2Client instance. All subs * registered with this client will use this groupId. */ groupId: string; logger?: Logger; /** * How long to sleep between polling for new events from * the global events table. * Only one global call is required across all clients. * Set to 0 to disable polling. * * @default 1 second * */ pollEventsIntervalMs?: number; /** * Group level configuration for how often to read new events * relevant to the group's subscriptions. * @default 1 second */ readEventsIntervalMs?: number; /** * How often to mark subscriptions as active, * and remove expired ones. * @default 1 minute */ subscriptionMaintenanceMs?: number; /** * How often to maintain the events tables * (drop old partitions, create new ones, etc) * Set to 0 to disable automatic maintenance. * * @default 5 minutes */ tableMaintainanceMs?: number; readChunkSize?: number; /** * As we process in batches, a single handler taking time to finish * can lead to buildup of unprocessed checkpoints. To avoid this, * we keep moving forward while handlers run in the background, but * to avoid an unbounded number of items being backlogged, we limit * how much further we can go ahead from the earliest uncompleted checkpoint. * @default 10 */ maxActiveCheckpoints?: number; webhookHandlerOpts?: Partial>; getWebhookInfo?: GetWebhookInfoFn; /** * Override the default readNextEvents implementation */ readNextEvents?: IReadNextEventsFn; /** * Override the default findEvents implementation */ findEvents?: IFindEventsFn; } & Pick, 'flushIntervalMs' | 'maxBatchSize' | 'shouldLog'>; export type IReadEvent = { items: IEvent[]; retry?: IRetryEventPayload; }; export type RegisterSubscriptionParams = Omit; export type registerReliableHandlerParams = RegisterSubscriptionParams & { /** * Name for the retry handler, used to ensure retries for a particular * handler are not mixed with another handler. This name need only be * unique for a particular subscription. */ name?: string; retryOpts?: IRetryHandlerOpts; /** * If provided, will split an incoming event into multiple events * as determined by the function. */ splitBy?: ISplitFn; }; export type CreateTopicalSubscriptionOpts = { /** * The topics to subscribe to. */ topics: T['topic'][]; /** * To scale out processing, you can partition the subscriptions. * For example, with `current: 0, total: 3`, only messages * where `hashtext(e.id) % 3 == 0` will be received by this subscription. * This will result in an approximate even split for all processors, the only * caveat being it requires knowing the number of event processors on this * subscription beforehand. */ partition?: { current: number; total: number; }; /** * Add any additional params to filter by. * i.e "s.params @> jsonb_build_object(...additionalFilters)" * The value should be a valid SQL snippet. */ additionalFilters?: Record; /** JSON to populate params */ additionalParams?: Record; expiryInterval?: RegisterSubscriptionParams['expiryInterval']; }; export interface IEphemeralListener extends AbortableAsyncIterator> { id: string; } export type IEventHandlerContext = { logger: Logger; client: PgClientLike; subscriptionId: string; /** registered name of the handler */ name: string; extra?: E; }; export type IEventHandler = (item: IReadEvent, ctx: IEventHandlerContext) => Promise; export type IRetryEventPayload = { ids: string[]; handlerName: string; retryNumber: number; }; type SSESubscriptionOpts = Pick; export type SSERequestHandlerOpts = { getSubscriptionOpts(req: R): Promise | SSESubscriptionOpts; /** * If provided, will determine the event ID to replay from for an SSE * subscription based on the incoming request. Will fallback to the * `last-event-id` header if not provided, or undefined. */ getEventIdToReplayFrom?(req: R): string | undefined; /** * Maximum interval to replay events for an SSE subscription. * @default 5 minutes */ maxReplayIntervalMs?: number; /** * Max number of events to replay for an SSE subscription. * Set to 0 to disable replaying events. * @default 1000 */ maxReplayEvents?: number; jsonifier?: JSONifier; /** * custom function to serialise an event for sending to the SSE client. * By default, uses the provided `jsonifier` to stringify the event's payload. * Return undefined to skip sending this event to the client. */ serialiseEvent?(item: IEvent, req: R): string | undefined | Promise; }; export type IRetryHandlerOpts = { retriesS: number[]; }; export interface JSONifier { stringify(data: unknown): string; parse(data: string): unknown; } export type ITableMutationEventData = { topic: `${N}.insert`; payload: T; metadata: {}; } | { topic: `${N}.delete`; payload: T; metadata: {}; } | { topic: `${N}.update`; /** * The fields that were updated in the row */ payload: Partial; metadata: { old: T; }; }; export {};