{"version":3,"file":"mongodb-queue.cjs","names":["crypto"],"sources":["../src/mongodb-queue.ts"],"sourcesContent":["/*\n * Copyright (c) 2020-2026 Filipe Guerra\n * https://github.com/openwar/mongodb-queue\n *\n * For the full copyright and license information, please view the LICENSE\n * file that was distributed with this source code.\n */\n\n/**\n * A lightweight MongoDB-based message queue with TypeScript support, atomic\n * operations, and message deduplication.\n *\n * @example\n * ```ts\n * import { MongoClient } from 'mongodb';\n * import mongoDbQueue from '@openwar/mongodb-queue';\n *\n * const client = new MongoClient('mongodb://localhost:27017/');\n * await client.connect();\n *\n * const db = client.db('my-app');\n * const queue = mongoDbQueue(db, 'my-queue');\n *\n * // Add a message\n * await queue.add({ task: 'process-image', id: 123 });\n *\n * // Get and process a message\n * const msg = await queue.get();\n * if (msg) {\n *   console.log(msg.payload);\n *   await queue.ack(msg.ack);\n * }\n * ```\n *\n * @module\n */\n\nimport crypto from 'node:crypto';\nimport type { Db, Filter, UpdateFilter } from 'mongodb';\n\n// some helper functions\nfunction id() {\n  return crypto.randomBytes(16).toString('hex');\n}\n\ntype MessageSchema = {\n  createdAt: Date;\n  updatedAt?: Date;\n  visible: Date;\n  payload: unknown;\n  ack?: string;\n  tries: number;\n  occurrences?: number;\n};\n\n/**\n * A message retrieved from the queue.\n *\n * @typeParam T - The type of the message payload\n */\nexport type Message<T = unknown> = {\n  /**\n   * Unique identifier for the message\n   */\n  id: string;\n  /**\n   * Acknowledgement token used for {@link MongoDbQueue.ack} and\n   * {@link MongoDbQueue.ping}\n   */\n  ack: string;\n  /**\n   * When the message was first added to the queue\n   */\n  createdAt: Date;\n  /**\n   * When the message was last updated\n   */\n  updatedAt: Date;\n  /**\n   * The message payload\n   */\n  payload: T;\n  /**\n   * Number of times this message has been retrieved without being acknowledged\n   */\n  tries: number;\n  /**\n   * Number of times a duplicate message was added (when using hashKey)\n   */\n  occurrences?: number;\n};\n\ntype AddOptions<T> = {\n  hashKey?: keyof T | T;\n  delay?: number;\n};\n\n/**\n * A MongoDB-backed message queue interface.\n *\n * @typeParam T - The type of message payloads in this queue\n */\nexport interface MongoDbQueue<T = unknown> {\n  /**\n   * Creates the required indexes for optimal queue performance.\n   * Call this once when setting up your queue.\n   */\n  createIndexes(): Promise<void>;\n\n  /**\n   * Adds a message to the queue.\n   *\n   * @param payload - The message payload\n   * @param options - Optional settings for delay and deduplication\n   * @returns The ID of the added message\n   */\n  add(payload: T, options?: AddOptions<T>): Promise<string>;\n\n  /**\n   * Retrieves the next available message from the queue.\n   * The message becomes invisible to other consumers for the visibility\n   * timeout.\n   *\n   * @param options - Optional visibility timeout override\n   * @returns The message, or undefined if the queue is empty\n   */\n  get(options?: { visibility?: number }): Promise<Message<T> | undefined>;\n\n  /**\n   * Extends the visibility timeout for a message being processed.\n   * Use this for long-running tasks to prevent the message from being\n   * redelivered.\n   *\n   * @param ack - The acknowledgement token from the message\n   * @param options - Optional visibility timeout override\n   * @returns The message ID\n   */\n  ping(ack: string, options?: { visibility?: number }): Promise<string>;\n\n  /**\n   * Acknowledges successful processing of a message, removing it from the\n   * queue.\n   *\n   * @param ack - The acknowledgement token from the message\n   * @returns The message ID\n   */\n  ack(ack: string): Promise<string>;\n\n  /** Returns the total number of messages ever added to the queue. */\n  total(): Promise<number>;\n\n  /** Returns the number of messages waiting to be processed. */\n  size(): Promise<number>;\n\n  /** Returns the number of messages currently being processed. */\n  inFlight(): Promise<number>;\n\n  /** Returns the number of successfully processed messages. */\n  done(): Promise<number>;\n}\n\nclass MongoDbQueueImpl implements MongoDbQueue {\n  private _db: Db;\n  private _name: string;\n  private _visibility: number;\n\n  private get collection() {\n    return this._db.collection<MessageSchema>(this._name);\n  }\n\n  constructor(db: Db, name: string, options: { visibility?: number } = {}) {\n    if (!db) {\n      throw new Error('Please provide a mongodb.MongoClient.db');\n    }\n    if (!name) {\n      throw new Error('Please provide a queue name');\n    }\n\n    this._db = db;\n    this._name = name;\n    this._visibility = options.visibility || 30;\n  }\n\n  async createIndexes() {\n    await this.collection.createIndex({ deleted: 1, visible: 1 });\n    await this.collection.createIndex(\n      { ack: 1 },\n      { unique: true, sparse: true },\n    );\n  }\n\n  async add<T>(\n    payload: NonNullable<T>,\n    options?: AddOptions<T>,\n  ): Promise<string> {\n    const hashKey = options?.hashKey;\n    const delay = options?.delay ?? 0;\n    const now = Date.now();\n\n    const insertFields = {\n      createdAt: new Date(now),\n      visible: new Date(now + delay * 1000),\n      payload,\n      tries: 0,\n    };\n\n    if (hashKey === undefined) {\n      const result = await this.collection.insertOne({\n        ...insertFields,\n        occurrences: 1,\n      });\n\n      return result.insertedId.toHexString();\n    }\n\n    let filter: Filter<MessageSchema> = {\n      payload: { $eq: hashKey },\n    };\n\n    if (typeof payload === 'object') {\n      filter = {\n        [`payload.${String(hashKey)}`]: payload[hashKey as keyof T],\n      };\n    }\n\n    const message = await this.collection.findOneAndUpdate(\n      filter,\n      {\n        $inc: { occurrences: 1 },\n        $set: {\n          updatedAt: new Date(),\n        },\n        $setOnInsert: insertFields,\n      },\n      { upsert: true, returnDocument: 'after', includeResultMetadata: true },\n    );\n\n    if (!message.value) {\n      throw new Error(`Queue.add(): Failed add message`);\n    }\n\n    return message.value._id.toHexString();\n  }\n\n  async get<T>(\n    options: { visibility?: number } = {},\n  ): Promise<Message<T> | undefined> {\n    const visibility = options.visibility || this._visibility;\n\n    const now = Date.now();\n\n    const query = {\n      deleted: null,\n      visible: { $lte: new Date(now) },\n    };\n    const update: UpdateFilter<MessageSchema> = {\n      $inc: { tries: 1 },\n      $set: {\n        updatedAt: new Date(now),\n        ack: id(),\n        visible: new Date(now + visibility * 1000),\n      },\n    };\n\n    const result = await this.collection.findOneAndUpdate(query, update, {\n      sort: { _id: 1 },\n      returnDocument: 'after',\n      includeResultMetadata: true,\n    });\n\n    const message = result.value;\n\n    // nothing in the queue\n    if (!message) return;\n\n    if (!message.ack || !message.updatedAt) {\n      throw new Error(`Queue.get(): Failed to update message`);\n    }\n\n    // convert to an external representation\n    return {\n      id: message._id.toHexString(),\n      ack: message.ack,\n      createdAt: message.createdAt,\n      updatedAt: message.updatedAt,\n      payload: message.payload as T,\n      tries: message.tries,\n      occurrences: message.occurrences ?? 1,\n    };\n  }\n\n  async ping(\n    ack: string,\n    options: { visibility?: number } = {},\n  ): Promise<string> {\n    const now = Date.now();\n\n    const visibility = options.visibility || this._visibility;\n    const query = {\n      ack: ack,\n      visible: { $gt: new Date(now) },\n      deleted: null,\n    };\n    const update = {\n      $set: {\n        visible: new Date(now + visibility * 1000),\n      },\n    };\n\n    const message = await this.collection.findOneAndUpdate(query, update, {\n      returnDocument: 'after',\n      includeResultMetadata: true,\n    });\n\n    if (!message.value) {\n      throw new Error(`Queue.ping(): Unidentified ack : ${ack}`);\n    }\n\n    return message.value._id.toHexString();\n  }\n\n  async ack(ack: string): Promise<string> {\n    const now = Date.now();\n    const query = {\n      ack: ack,\n      visible: { $gt: new Date(now) },\n      deleted: null,\n    };\n    const update = {\n      $set: {\n        deleted: new Date(now),\n      },\n    };\n\n    const message = await this.collection.findOneAndUpdate(query, update, {\n      returnDocument: 'after',\n      includeResultMetadata: true,\n    });\n\n    if (!message.value) {\n      throw new Error(`Queue.ack(): Unidentified ack : ${ack}`);\n    }\n\n    return message.value._id.toHexString();\n  }\n\n  async total() {\n    return await this.collection.countDocuments();\n  }\n\n  async size() {\n    const query = {\n      deleted: null,\n      visible: { $lte: new Date() },\n    };\n\n    return await this.collection.countDocuments(query);\n  }\n\n  async inFlight() {\n    const query = {\n      ack: { $exists: true },\n      visible: { $gt: new Date() },\n      deleted: null,\n    };\n\n    return await this.collection.countDocuments(query);\n  }\n\n  async done() {\n    const query = {\n      deleted: { $exists: true },\n    };\n\n    return await this.collection.countDocuments(query);\n  }\n}\n\n/**\n * Creates a new MongoDB-backed message queue.\n *\n * @typeParam T - The type of message payloads in this queue\n * @param db - A MongoDB database instance\n * @param name - The name of the queue (used as the collection name)\n * @param options - Optional queue configuration\n * @param options.visibility - Default visibility timeout in seconds\n *   (default: 30)\n * @returns A new queue instance\n *\n * @example\n * ```ts\n * const queue = mongoDbQueue<MyPayload>(db, 'my-queue', { visibility: 60 });\n * ```\n */\nexport default function mongoDbQueue<T = unknown>(\n  db: Db,\n  name: string,\n  options: { visibility?: number } = {},\n): MongoDbQueue<T> {\n  return new MongoDbQueueImpl(db, name, options);\n}\n"],"mappings":";;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;AAyCA,SAAS,KAAK;AACZ,QAAOA,oBAAO,YAAY,GAAG,CAAC,SAAS,MAAM;;AAuH/C,IAAM,mBAAN,MAA+C;CAC7C,AAAQ;CACR,AAAQ;CACR,AAAQ;CAER,IAAY,aAAa;AACvB,SAAO,KAAK,IAAI,WAA0B,KAAK,MAAM;;CAGvD,YAAY,IAAQ,MAAc,UAAmC,EAAE,EAAE;AACvE,MAAI,CAAC,GACH,OAAM,IAAI,MAAM,0CAA0C;AAE5D,MAAI,CAAC,KACH,OAAM,IAAI,MAAM,8BAA8B;AAGhD,OAAK,MAAM;AACX,OAAK,QAAQ;AACb,OAAK,cAAc,QAAQ,cAAc;;CAG3C,MAAM,gBAAgB;AACpB,QAAM,KAAK,WAAW,YAAY;GAAE,SAAS;GAAG,SAAS;GAAG,CAAC;AAC7D,QAAM,KAAK,WAAW,YACpB,EAAE,KAAK,GAAG,EACV;GAAE,QAAQ;GAAM,QAAQ;GAAM,CAC/B;;CAGH,MAAM,IACJ,SACA,SACiB;EACjB,MAAM,UAAU,SAAS;EACzB,MAAM,QAAQ,SAAS,SAAS;EAChC,MAAM,MAAM,KAAK,KAAK;EAEtB,MAAM,eAAe;GACnB,WAAW,IAAI,KAAK,IAAI;GACxB,SAAS,IAAI,KAAK,MAAM,QAAQ,IAAK;GACrC;GACA,OAAO;GACR;AAED,MAAI,YAAY,OAMd,SALe,MAAM,KAAK,WAAW,UAAU;GAC7C,GAAG;GACH,aAAa;GACd,CAAC,EAEY,WAAW,aAAa;EAGxC,IAAI,SAAgC,EAClC,SAAS,EAAE,KAAK,SAAS,EAC1B;AAED,MAAI,OAAO,YAAY,SACrB,UAAS,GACN,WAAW,OAAO,QAAQ,KAAK,QAAQ,UACzC;EAGH,MAAM,UAAU,MAAM,KAAK,WAAW,iBACpC,QACA;GACE,MAAM,EAAE,aAAa,GAAG;GACxB,MAAM,EACJ,2BAAW,IAAI,MAAM,EACtB;GACD,cAAc;GACf,EACD;GAAE,QAAQ;GAAM,gBAAgB;GAAS,uBAAuB;GAAM,CACvE;AAED,MAAI,CAAC,QAAQ,MACX,OAAM,IAAI,MAAM,kCAAkC;AAGpD,SAAO,QAAQ,MAAM,IAAI,aAAa;;CAGxC,MAAM,IACJ,UAAmC,EAAE,EACJ;EACjC,MAAM,aAAa,QAAQ,cAAc,KAAK;EAE9C,MAAM,MAAM,KAAK,KAAK;EAEtB,MAAM,QAAQ;GACZ,SAAS;GACT,SAAS,EAAE,MAAM,IAAI,KAAK,IAAI,EAAE;GACjC;EACD,MAAM,SAAsC;GAC1C,MAAM,EAAE,OAAO,GAAG;GAClB,MAAM;IACJ,WAAW,IAAI,KAAK,IAAI;IACxB,KAAK,IAAI;IACT,SAAS,IAAI,KAAK,MAAM,aAAa,IAAK;IAC3C;GACF;EAQD,MAAM,WANS,MAAM,KAAK,WAAW,iBAAiB,OAAO,QAAQ;GACnE,MAAM,EAAE,KAAK,GAAG;GAChB,gBAAgB;GAChB,uBAAuB;GACxB,CAAC,EAEqB;AAGvB,MAAI,CAAC,QAAS;AAEd,MAAI,CAAC,QAAQ,OAAO,CAAC,QAAQ,UAC3B,OAAM,IAAI,MAAM,wCAAwC;AAI1D,SAAO;GACL,IAAI,QAAQ,IAAI,aAAa;GAC7B,KAAK,QAAQ;GACb,WAAW,QAAQ;GACnB,WAAW,QAAQ;GACnB,SAAS,QAAQ;GACjB,OAAO,QAAQ;GACf,aAAa,QAAQ,eAAe;GACrC;;CAGH,MAAM,KACJ,KACA,UAAmC,EAAE,EACpB;EACjB,MAAM,MAAM,KAAK,KAAK;EAEtB,MAAM,aAAa,QAAQ,cAAc,KAAK;EAC9C,MAAM,QAAQ;GACP;GACL,SAAS,EAAE,KAAK,IAAI,KAAK,IAAI,EAAE;GAC/B,SAAS;GACV;EACD,MAAM,SAAS,EACb,MAAM,EACJ,SAAS,IAAI,KAAK,MAAM,aAAa,IAAK,EAC3C,EACF;EAED,MAAM,UAAU,MAAM,KAAK,WAAW,iBAAiB,OAAO,QAAQ;GACpE,gBAAgB;GAChB,uBAAuB;GACxB,CAAC;AAEF,MAAI,CAAC,QAAQ,MACX,OAAM,IAAI,MAAM,oCAAoC,MAAM;AAG5D,SAAO,QAAQ,MAAM,IAAI,aAAa;;CAGxC,MAAM,IAAI,KAA8B;EACtC,MAAM,MAAM,KAAK,KAAK;EACtB,MAAM,QAAQ;GACP;GACL,SAAS,EAAE,KAAK,IAAI,KAAK,IAAI,EAAE;GAC/B,SAAS;GACV;EACD,MAAM,SAAS,EACb,MAAM,EACJ,SAAS,IAAI,KAAK,IAAI,EACvB,EACF;EAED,MAAM,UAAU,MAAM,KAAK,WAAW,iBAAiB,OAAO,QAAQ;GACpE,gBAAgB;GAChB,uBAAuB;GACxB,CAAC;AAEF,MAAI,CAAC,QAAQ,MACX,OAAM,IAAI,MAAM,mCAAmC,MAAM;AAG3D,SAAO,QAAQ,MAAM,IAAI,aAAa;;CAGxC,MAAM,QAAQ;AACZ,SAAO,MAAM,KAAK,WAAW,gBAAgB;;CAG/C,MAAM,OAAO;EACX,MAAM,QAAQ;GACZ,SAAS;GACT,SAAS,EAAE,sBAAM,IAAI,MAAM,EAAE;GAC9B;AAED,SAAO,MAAM,KAAK,WAAW,eAAe,MAAM;;CAGpD,MAAM,WAAW;EACf,MAAM,QAAQ;GACZ,KAAK,EAAE,SAAS,MAAM;GACtB,SAAS,EAAE,qBAAK,IAAI,MAAM,EAAE;GAC5B,SAAS;GACV;AAED,SAAO,MAAM,KAAK,WAAW,eAAe,MAAM;;CAGpD,MAAM,OAAO;AAKX,SAAO,MAAM,KAAK,WAAW,eAJf,EACZ,SAAS,EAAE,SAAS,MAAM,EAC3B,CAEiD;;;;;;;;;;;;;;;;;;;AAoBtD,SAAwB,aACtB,IACA,MACA,UAAmC,EAAE,EACpB;AACjB,QAAO,IAAI,iBAAiB,IAAI,MAAM,QAAQ"}