import { IsolationLevel, sql, transaction, TxnClient, vals } from 'zapatos/db'; import { OwnerPgPool } from '../common/types'; const incrementAndGetCounter = async ( key: string, ownerPool: OwnerPgPool, ): Promise => { return transaction( ownerPool, IsolationLevel.Serializable, async (txnClient) => { const counterKey = { key: key, }; const [{ counter }] = await sql` INSERT INTO app_private.messaging_counter (key) VALUES (${vals(counterKey)}) ON CONFLICT (key) DO UPDATE SET counter = app_private.messaging_counter.counter + 1, expiration_date = (now() + INTERVAL '1 days') at time zone 'utc' RETURNING counter; `.run(txnClient); if (Math.random() > 0.95) { // Every now and then cleanup counters that are expired. // In addition, the `cleanupExpiredMessagingCounters` function can be called from a CRON job/task scheduler. await deleteExpiredMessagingCounters(txnClient); } return counter; }, ); }; const deleteExpiredMessagingCounters = async ( txnClient: TxnClient, ): Promise => { await sql`DELETE FROM app_private.messaging_counter WHERE expiration_date < current_timestamp`.run( txnClient, ); }; export interface NextRedeliveriesFunction { (error: Error | undefined | null, redeliveries?: number): void; } /** * This method can be called from a CRON job/task scheduler to regularly cleanup expired messages. * @param ownerPool database pool with select and delete permissions to `app_private.messaging_counter` */ export const cleanupExpiredMessagingCounters = async ( ownerPool: OwnerPgPool, ): Promise => { await transaction( ownerPool, IsolationLevel.Serializable, async (txnClient) => { await deleteExpiredMessagingCounters(txnClient); }, ); }; /** * ## PostgreSQL messaging counter for Rascal redeliveries * * ### Rascal retry and redelivery concept * * Rascal has the following two concepts for retries and redeliveries: * * - **retry/recovery**: this happens when a message handler did throw an error that * got caught and a retry is done based on a configured recovery strategy. * This scenario is fully managed by Rascal code. * - **redeliveries**: this concept is when a message was received from RabbitMQ and * an error could not be caught and the application crashes. The message * counter implementation tries to solve this issue. * * ### Redeliveries * * When the application crashes Rascal is not able to start a retry mechanism, * add some message headers, or track something elsewhere. * RabbitMQ marks a message that was delivered but then the channel closed * without the consumer sending a nack or ack with a boolean "redelivery" flag. * The message is then made available again - meaning it is at the head of the * queue (not sent to the back). * When the application is restarted, rascal will thus get the message again. * Only on this second attempt this redeliveries counter will be called from * rascal. The counter will then track for any further attempt to process the * same message an incremented redelivery count in a PostgreSQL database table. * After the maximum amount of retries is reached, rascal moves the message to * the configured dead letter queue and the app should be able to run again. * * Crashing applications should normally not occur due to message handling as * Rascal is using a very robust approach. But some messages may be too big and * cause out-of-memory exceptions or other errors. Those messages are called * poisonous messages. * * To test that the message redeliveries counter works add the following code to * any message handlers `onMessage` function to trigger an application crash * by throwing an exception that is not caught. * ``` * setImmediate(function() { throw new Error('Crashing...'); }); * ``` * When the message handler is triggered by an incoming message the application * will crash. After restarting the application five times, rascal moves the * message to the dead letter queue. * * ### Rascal configuration * * The following settings are managed by the message-bus library and should not * be altered from customizable code. Each Rascal subscription configuration * defines the redeliveries counter with the name 'mosaic': * ``` * "": { * queue: "", * ... * redeliveries: { * limit: 5, * counter: 'mosaic', timeout: 10000, * }, * ``` * And the rascal configuration contains the following section in order to use * the PostgreSQL based counter: * ```js * redeliveries: { * counters: { * mosaic: { * type: 'postgresCounter', * } * }, * } * ``` * When creating a message-bus BrokerProxy, the counter must be added as part of * the components parameter with `postgresCounter` as counter name: * ``` * const components = { counters: { postgresCounter: counter } }, * const broker = await BrokerProxy.create(cfg, logger, components); * ``` * @param ownerPool database pool with full read/modify permissions to `app_private.messaging_counter` */ export const initMessagingCounter = (ownerPool: OwnerPgPool) => { return function init(): { incrementAndGet: (key: string, next: NextRedeliveriesFunction) => void; } { return { incrementAndGet: function (key: string, next: NextRedeliveriesFunction) { incrementAndGetCounter(key, ownerPool).then( (counter: number) => { next(null, counter); }, (err: Error) => { if (err) { return next(err); } }, ); }, }; }; };