import { Message, SQSClient } from '@aws-sdk/client-sqs'; import { TimeProvider } from '@paradoxical-io/common'; import { Brand, Milliseconds, Seconds } from '@paradoxical-io/types'; import { Monitoring } from '../monitoring'; import { SQSConfig } from './config'; import { ProxyQueueProvider } from './proxy/proxyProvider'; import { SQSEvent } from './publisher'; export type Max10 = 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 9 | 10; export type QueueName = Brand; export type QueueUrl = Brand; export interface Options { /** * Long poll wait seconds, default is 20 seconds */ longPollWaitTimeSeconds: Seconds; /** * Max number of messages to get at a time. Limit is 10 */ maxNumberOfMessages: Max10; /** * The sqs client instance. If not set the default one will be used */ sqs: SQSClient; /** * If set to true will make a message immediately visible on any unhandled error * otherwise messages will time out and re-process at its timeout */ makeAvailableOnError: boolean; /** * Provider to determine now */ timeProvider: TimeProvider; maxVisibilityTimeoutSeconds?: Seconds; proxyProvider?: ProxyQueueProvider; monitoring?: Monitoring; } /** * If a consumer wants to defer a message return this from the consumer * * Warning: retries do count against instances of delivery, so if you retry X times and the redrive policy * is to DLQ after X deliveries, the message will DLQ */ export interface RetryMessageLater { type: 'retry-later'; reason: string; retryInSeconds: Seconds; } /** * A different version of retry which re-publishes the original message (potentially with a delay) * and ACKS the original message. This means that retrying of messages don't count towards DLQ counts */ export interface RepublishMessage { type: 'republish-later'; reason: string; /** * How long to delay visibility of this message */ retryInSeconds: Seconds | { type: 'exponential-backoff'; max: Seconds; min: Seconds; }; /** * Whether this message should stop being delivered ms time after the _first_ republish */ expireFromFirstPublishMS?: Milliseconds; } /** * Starts the consumers and safely hooks into the signal shutdown to gracefully stop them * * The returning promise will not resolve until the consumers are all gracefully stopped * @param consumers * @param options */ export declare function runSQS(consumers: Array>, options?: { monitoring?: Monitoring; }): Promise; export type MessageProcessorResult = void | RetryMessageLater | RepublishMessage; export interface MessageProcessor { process(data: T): Promise; } export interface MessageProcessorRaw { process(data: SQSEvent): Promise; } export declare abstract class SQSConsumer implements MessageProcessor { private queueUrl; static defaultOptions: Omit; private stopped?; private readonly opts; private readonly sqs; private readonly queueName; private readonly timeProvider; private readonly logger; private readonly metrics; constructor(queueUrl: QueueUrl, opts?: Partial); /** * Adhoc processes a message of a raw sqs queue (lambda/etc) * @param message */ adhoc(message: Pick): Promise; /** * Implement to handle each instance of a message * @param data */ abstract process(data: T): Promise; /** * Start will resolve once stop has been called (or shortly after) */ start(): Promise; /** * Tells the consumer to stop processing. Will first finish processing */ stop({ timeoutMilli, flush }?: { timeoutMilli?: number; flush?: boolean; }): void; protected handleMessage(message: Pick): Promise; protected processRaw(data: SQSEvent): Promise; private receive; private ack; private republishLater; /** * Returns true or false depending on if we should ack the message by handing the message off to the consumer * or deferring the message if necessary * @param event * @param message * @private */ private handMessageToConsumer; private proxy; } export declare function determineRetryDelay(result: RepublishMessage, publishCount: number | undefined): Seconds; /** * Utility class to create a consumer from a method * @param method * @param queue * @param opts */ export declare class FunctionalConsumer extends SQSConsumer { private handler; constructor(handler: (data: T) => Promise, queue: QueueUrl, opts?: Partial); process(data: T): Promise; } export declare class FunctionalConsumerRaw extends SQSConsumer { private handler; constructor(handler: (data: SQSEvent) => Promise, queue: QueueUrl, opts?: Partial); process(_: T): Promise; protected processRaw(data: SQSEvent): Promise; } export declare function newConsumer(method: (event: T) => Promise, config: SQSConfig, opts?: { monitoring?: Monitoring; proxyProvider?: ProxyQueueProvider; }): FunctionalConsumer; export declare function newRawConsumer(method: (event: SQSEvent) => Promise, config: SQSConfig, opts?: { monitoring?: Monitoring; proxyProvider?: ProxyQueueProvider; }): FunctionalConsumerRaw; //# sourceMappingURL=consumer.d.ts.map