import { type CommonLogger, type Either, type ErrorReporter, type ErrorResolver } from '@lokalise/node-core'; import type { MakeRequired } from '@lokalise/universal-ts-utils/node'; import type { ZodSchema, ZodType } from 'zod/v4'; import type { MessageCodecHandler } from '../codec/messageCodec.ts'; import type { MessageInvalidFormatError, MessageValidationError } from '../errors/Errors.ts'; import { type AcquireLockTimeoutError } from '../message-deduplication/AcquireLockTimeoutError.ts'; import { type DeduplicationRequester, type MessageDeduplicationConfig, type ReleasableLock } from '../message-deduplication/messageDeduplicationTypes.ts'; import { type OffloadedPayloadPointerPayload } from '../payload-store/offloadedPayloadMessageSchemas.ts'; import type { MultiPayloadStoreConfig, SinglePayloadStoreConfig } from '../payload-store/payloadStoreTypes.ts'; import type { MessageProcessingResult } from '../types/MessageQueueTypes.ts'; import type { DeletionConfig, MessageMetricsManager, ProcessedMessageMetadata, QueueDependencies, QueueOptions } from '../types/queueOptionsTypes.ts'; import type { BarrierCallback, BarrierResult, MessageHandlerConfig, PreHandlingOutputs, Prehandler, PrehandlerResult } from './HandlerContainer.ts'; import type { HandlerSpy, PublicHandlerSpy } from './HandlerSpy.ts'; import { MessageSchemaContainer } from './MessageSchemaContainer.ts'; import { type MessageTypeResolverConfig } from './MessageTypeResolver.ts'; export type Deserializer = (message: unknown, type: ZodType, errorProcessor: ErrorResolver) => Either; type CommonQueueLocator = { queueName: string; }; export type ResolvedMessage = { body: unknown; attributes?: Record; }; export declare abstract class AbstractQueueService = QueueOptions, ExecutionContext = undefined, PrehandlerOutput = undefined> { /** * Used to keep track of the number of `retryLater` results received for a message to be able to * calculate the delay for the next retry */ private readonly messageRetryLaterCountField; /** * Used to know when the message was sent initially so we can have a max retry date and avoid * a infinite `retryLater` loop */ protected readonly messageTimestampField: string; /** * Used to know the message deduplication id */ protected readonly messageDeduplicationIdField: string; /** * Used to know the store-based message deduplication options */ protected readonly messageDeduplicationOptionsField: string; /** * Used to know where metadata is stored - for debug logging purposes only */ protected readonly messageMetadataField: string; protected readonly errorReporter: ErrorReporter; readonly logger: CommonLogger; protected readonly messageIdField: string; /** * Configuration for resolving message types. */ protected readonly messageTypeResolver?: MessageTypeResolverConfig; protected readonly logMessages: boolean; protected readonly creationConfig?: QueueConfiguration; protected readonly locatorConfig?: QueueLocatorType; protected readonly deletionConfig?: DeletionConfig; protected readonly payloadStoreConfig?: MakeRequired | MakeRequired; protected readonly messageDeduplicationConfig?: MessageDeduplicationConfig; protected readonly messageMetricsManager?: MessageMetricsManager; protected readonly _handlerSpy?: HandlerSpy; protected readonly skipCompressionBelow: number; protected readonly disableCodecAutoDetection: boolean; /** * Codec handler resolved from the `codec` option, used by `prepareOutgoingPayload`. * Undefined when no codec is configured (the common case). */ protected readonly resolvedCodecHandler?: MessageCodecHandler; /** Codec name matching `resolvedCodecHandler`, written into every codec envelope. */ protected readonly resolvedCodecName?: string; protected isInitted: boolean; get handlerSpy(): PublicHandlerSpy; constructor({ errorReporter, logger, messageMetricsManager }: DependenciesType, options: OptionsType); protected resolveConsumerMessageSchemaContainer(options: { handlers: MessageHandlerConfig[]; messageTypeResolver?: MessageTypeResolverConfig; }): MessageSchemaContainer; protected resolvePublisherMessageSchemaContainer(options: { messageSchemas: readonly ZodSchema[]; messageTypeResolver?: MessageTypeResolverConfig; }): MessageSchemaContainer; /** * Resolves message type from message data and optional attributes using messageTypeResolver. * * @param messageData - The parsed message data * @param messageAttributes - Optional message-level attributes (e.g., PubSub attributes) * @returns The resolved message type, or undefined if not configured */ protected resolveMessageTypeFromMessage(messageData: unknown, messageAttributes?: Record): string | undefined; protected abstract resolveSchema(message: MessagePayloadSchemas): Either>; protected abstract resolveMessage(message: MessageEnvelopeType): Either; /** * Format message for logging */ protected resolveMessageLog(_processedMessageMetadata: ProcessedMessageMetadata): unknown | null; protected logMessageProcessed(processedMessageMetadata: ProcessedMessageMetadata): void; protected handleError(err: unknown, context?: Record): void; protected handleMessageProcessed(params: { message: MessagePayloadSchemas | null; processingResult: MessageProcessingResult; messageProcessingStartTimestamp: number; queueName: string; messageId?: string; }): void; private resolveProcessedMessageMetadata; protected processPrehandlersInternal(preHandlers: Prehandler[], message: MessagePayloadSchemas): Promise; protected preHandlerBarrierInternal(barrier: BarrierCallback | undefined, message: MessagePayloadSchemas, executionContext: ExecutionContext, preHandlerOutput: PrehandlerOutput): Promise>; shouldBeRetried(message: MessagePayloadSchemas, maxRetryDuration: number): boolean; protected getMessageRetryDelayInSeconds(message: MessagePayloadSchemas): number; protected updateInternalProperties(message: MessagePayloadSchemas): MessagePayloadSchemas; private tryToExtractTimestamp; private tryToExtractNumberOfRetries; protected abstract resolveNextFunction(preHandlers: Prehandler[], message: MessagePayloadSchemas, index: number, preHandlerOutput: PrehandlerOutput, resolve: (value: PrehandlerOutput | PromiseLike) => void, reject: (err: Error) => void): (preHandlerResult: PrehandlerResult) => void; protected resolveNextPreHandlerFunctionInternal(preHandlers: Prehandler[], executionContext: ExecutionContext, message: MessagePayloadSchemas, index: number, preHandlerOutput: PrehandlerOutput, resolve: (value: PrehandlerOutput | PromiseLike) => void, reject: (err: Error) => void): (preHandlerResult: PrehandlerResult) => void; protected abstract processPrehandlers(message: MessagePayloadSchemas, messageType: string): Promise; protected abstract preHandlerBarrier(message: MessagePayloadSchemas, messageType: string, preHandlerOutput: PrehandlerOutput): Promise>; protected abstract processMessage(message: MessagePayloadSchemas, messageType: string, preHandlingOutputs: PreHandlingOutputs): Promise>; abstract close(): Promise; /** * Resolves the store and store name for outgoing (publishing) messages. * For multi-store: uses outgoingStore from config. * For single-store: uses the configured store and storeName. * @throws Error if payloadStoreConfig is not configured or the named store is not found. */ private resolveOutgoingStore; /** * Resolves store from payloadRef (new format). */ private resolveStoreFromPayloadRef; /** * Resolves store from legacy pointer (old format). */ private resolveStoreFromLegacyPointer; /** * Resolves the store for incoming (consuming) messages based on payload reference. * For multi-store with payloadRef: uses the store specified in payloadRef. * For multi-store with legacy format: uses defaultIncomingStore. * For single-store: always uses the configured store. */ private resolveIncomingStore; /** * Collects the identity/routing fields that must remain visible in plaintext on the * wire even when the message body is compressed or offloaded: the configured id, * timestamp and deduplication fields, plus the message `type` (resolved via * `messageTypeResolver`, defaulting to the conventional top-level `type` path). * * Keeping these as plaintext siblings of an offload pointer / codec envelope is what * lets routing and downstream filtering keep working. Without the `type` fallback, * `messageTypeResolver` modes that don't specify a body path silently strip `type`, * breaking downstream SNS subscription FilterPolicy filters (`FilterPolicyScope: * 'MessageBody'`). Shared by `buildPointer` (offload) and the codec-envelope path * (`prepareOutgoingPayload`) so both behave identically. */ protected collectPreservedFields(message: MessagePayloadSchemas): Record; /** * Builds an OffloadedPayloadPointerPayload from the given message and storage metadata. * Identity/routing fields are preserved through offloading via {@link collectPreservedFields}. */ private buildPointer; /** * Offloads the message payload to the configured store if it exceeds the size threshold. * Returns null if no offloading is needed (store not configured or message fits within threshold). * * For multi-store configuration, uses the configured outgoingStore. * For single-store configuration, uses the single store. * * The returned pointer includes both the new payloadRef format and legacy fields for backward * compatibility. The message type field is always preserved through offloading. */ protected offloadPayload(message: MessagePayloadSchemas, messageSizeFn: () => number): Promise; /** * Compresses (when codec is configured) or offloads (when a payload store is configured) * the outgoing message. Shared by all publisher subclasses via the `resolvedCodecHandler` * / `resolvedCodecName` fields resolved from the `codec` option in the base constructor. * * Returns: * - `{ payload, preBuiltBody }` — `preBuiltBody` is a ready-to-send wire body string * (codec envelope, or plain JSON when compression was skipped); `sendMessage` must * use it as-is. * - `{ payload }` — the payload is sent through the normal `JSON.stringify` path * (no codec configured), optionally replaced by an offloaded-payload pointer. */ protected prepareOutgoingPayload(message: MessagePayloadSchemas): Promise<{ payload: MessagePayloadSchemas | OffloadedPayloadPointerPayload; preBuiltBody?: string; }>; /** * Codec branch of {@link prepareOutgoingPayload}, extracted so each method stays within * the cognitive-complexity budget. * * With a payload store, serialization is delegated to {@link compressAndOffloadPayload}, * which uses the store serializer's reported size to honor `skipCompressionBelow` — the * message is never separately `JSON.stringify`'d, so a streaming serializer is not forced * to fully materialize a large payload. Without a payload store, the wire body is bounded * by the SQS/SNS 256 KB transport limit (and is a string parameter that cannot be * streamed), so a single in-memory `JSON.stringify` is both necessary and safe. */ private prepareCompressedOutgoingPayload; /** * Estimates the wire size in bytes of the codec envelope wrapping `compressedSize` * compressed bytes. The envelope is `{...preservedFields,"__mqtCodec":"", * "__mqtData":""}`: base64 expands the payload to `⌈N/3⌉×4`, the fixed JSON * framing adds 32 chars plus the codec name length, and any preserved sibling fields * add their serialized length. * * Note: this measures the envelope body only — transport-specific message attributes * (small, and identical with or without codec) are not included. */ protected estimateCodecEnvelopeSize(compressedSize: number, codecName: string, preservedFields?: Record): number; /** * Returns the wire size of the outgoing message in bytes, used by `offloadPayload` to decide * whether the payload exceeds `messageSizeThreshold`. * * Overridden by publisher subclasses (SQS, SNS) to call their transport-specific utility. * Not called on the consumer path; consumers do not override this method. */ protected calculateOutgoingMessageSize(_message: MessagePayloadSchemas): number; /** * Store-path handler for codec publishers (both `codec` and `payloadStoreConfig` set). * * Serializes the message **once**, through the payload store's serializer (which may * stream large payloads). `skipCompressionBelow` is evaluated against the size the * serializer reports — there is no separate `JSON.stringify`, so a streaming serializer * is never forced to fully materialize the payload just to evaluate the floor. * * - **Below `skipCompressionBelow`:** compression is skipped — the payload is sent inline * as plain JSON, or offloaded uncompressed if it still exceeds `messageSizeThreshold` * (possible only when the threshold is configured below the floor). * - **Otherwise:** the payload is compressed and either inlined as a codec envelope or * offloaded as raw compressed bytes (see {@link compressSerializedPayload}). * * Returns the same `{ payload, preBuiltBody? }` shape as {@link prepareOutgoingPayload}. */ protected compressAndOffloadPayload(message: MessagePayloadSchemas, handler: MessageCodecHandler, codecName: string): Promise<{ payload: MessagePayloadSchemas | OffloadedPayloadPointerPayload; preBuiltBody?: string; }>; /** * Compresses an already-serialized payload and decides — against the **codec envelope * wire size** (base64-encoded compressed bytes + JSON framing), not the raw compressed * byte count — whether to send it inline as a codec envelope or offload the compressed * bytes. Compression does not always shrink data, so the base64 envelope can exceed * `messageSizeThreshold` even when the raw payload did not. * * - **In-memory fast path:** a string payload below `messageSizeThreshold` is compressed * directly into a Buffer — no temp file is created, no disk I/O occurs. * - **Streaming path:** a stream payload (or large string) is piped through the codec * Transform into a temp file, so no full-payload buffer is materialized. * * Split out of {@link compressAndOffloadPayload} for the cognitive-complexity budget. */ private compressSerializedPayload; /** * Retrieve previously offloaded message payload using provided pointer payload. * Returns the original payload or an error if the payload was not found or could not be parsed. * * Supports both new multi-store format (payloadRef) and legacy format (offloadedPayloadPointer). * * When `resolveDecompressor` is provided and the pointer's `payloadRef.codec` is set, the * fetched bytes are treated as raw compressed binary and decompressed before JSON parsing. * `resolveDecompressor` is invoked *outside* the catch block: if it throws (e.g. the codec * named in the pointer is not registered on this consumer — a deployment misconfiguration, * not a bad message), the throw propagates as a retriable error so the message stays on the * queue instead of being silently routed to the DLQ. */ protected retrieveOffloadedMessagePayload(maybeOffloadedPayloadPointerPayload: unknown, resolveDecompressor?: (codec: string) => (data: Buffer) => Promise): Promise>; /** * Checks if the message is duplicated against the deduplication store. * Returns true if the message is duplicated. * Returns false if message is not duplicated or deduplication config is missing. */ protected isMessageDuplicated(message: MessagePayloadSchemas, requester: DeduplicationRequester): Promise; /** * Checks if the message is duplicated. * If it's not, stores the deduplication key in the deduplication store and returns false. * If it is, returns true. * If deduplication config is not provided, always returns false to allow further processing of the message. */ protected deduplicateMessage(message: MessagePayloadSchemas, requester: DeduplicationRequester): Promise<{ isDuplicated: boolean; }>; /** * Acquires exclusive lock for the message to prevent concurrent processing. * If lock was acquired successfully, returns a lock object that should be released after processing. * If lock couldn't be acquired due to timeout (meaning another process acquired it earlier), returns AcquireLockTimeoutError * If lock couldn't be acquired for any other reasons or if deduplication config is not provided, always returns a lock object that does nothing, so message processing can continue. */ protected acquireLockForMessage(message: MessagePayloadSchemas): Promise>; protected isDeduplicationEnabledForMessage(message: MessagePayloadSchemas): boolean; protected getMessageDeduplicationId(message: MessagePayloadSchemas): string | undefined; private getParsedMessageDeduplicationOptions; } export {};