// Import for testing import { log } from 'console'; // This is so that we can see `console.log`s when we run `jest` // Import Mongo import MongoDB from 'mongodb'; // Import UUID import { v4 as genUniqueId } from 'uuid'; // Import state import dbState from '../dbState'; // Import types import CollectionInterface from '../types/CollectionInterface'; import CollectionProcedure from '../coconut-types/CollectionProcedure'; import CollectionOpts from '../types/CollectionOpts'; import Id from '../types/Id'; import Query from '../types/Query'; import PaginatedResponse from '../types/PaginatedResponse'; // Import helpers import initCollection from '../helpers/initCollection'; import processFindResult from '../helpers/processFindResult'; import getLockCollectionName from '../helpers/getLockCollectionName'; import getLockCollectionOpts from '../helpers/getLockCollectionOpts'; import pollResource, { PollingError } from '../helpers/pollResource'; // Import constants import LOCK_POLL_INTERVAL_MS from '../constants/LOCK_POLL_INTERVAL_MS'; import DEFAULT_LOCK_TTL_MS from '../constants/DEFAULT_LOCK_TTL_MS'; /*------------------------------------------------------------------------*/ /* Collection */ /*------------------------------------------------------------------------*/ class Collection implements CollectionInterface { // Name of this collection private collectionName: string; // Name of the unique key for the unique index (if one exists) private uniqueKey?: string; // can only be 'id' for concurrent // Promise that resolves with the mongo collection private collection: Promise>; // Whether the collection supports concurrency private supportConcurrency = false; // The collection of locks private lockCollection?: Promise>; // The maximum time that a lock can be in place private lockTimeToLiveMS: number; // The id of the 'server' using this Collection, for use with locks private serverId?: string; // uuid // If true, this collection uses TTL private usesTTL: boolean = false; /** * Create a new Collection * @author Gabe Abrams * @param collectionName the collection name * @param options the options for the collection (used when * creating it) * @param [options.uniqueIndexKey] the name of the unique index * (only created if included) * @param [options.expireAfterSeconds=no expiry] if unique index is * created, this is the number of seconds before items on each key expire * @param [options.indexKeys] the names of keys to build * secondary indexes on */ public constructor( collectionName: string, options: CollectionOpts = {}, ) { // Save collection name this.collectionName = collectionName; // Remember unique index this.uniqueKey = options.uniqueIndexKey; // Make sure init has been called first if ( !dbState.schemaVersionTag || !dbState.initDB ) { // Init hasn't been called yet // eslint-disable-next-line no-console log('DCE-MANGO: Collection class instance created before initMango was called. Fatal error. Exiting.'); process.exit(1); } // Remember if using TTL this.usesTTL = Boolean(options.expireAfterSeconds); // Initialize and save the promise so other functions can wait for the // initialization to complete this.collection = initCollection(collectionName, options); if (options.supportConcurrency) { this.supportConcurrency = true; if (!this.uniqueKey) { throw new Error('A collection without a `uniqueIndexKey` cannot support concurrency.'); } const lockCollectionName = getLockCollectionName(collectionName); const lockCollectionOpts = getLockCollectionOpts(options.atomicUpdateTimeoutMs); this.serverId = genUniqueId(); this.lockCollection = initCollection(lockCollectionName, lockCollectionOpts); this.lockTimeToLiveMS = options.atomicUpdateTimeoutMs ?? DEFAULT_LOCK_TTL_MS; } } /*------------------------------------------------------------------------*/ /* Getter Functions */ /*------------------------------------------------------------------------*/ /** * Get whether the collection supports concurrency or not. * @author Benedikt Arnarsson * @returns boolean indicating whether the collection supports concurrency. */ public getSupportConcurrency() { return this.supportConcurrency; } /*------------------------------------------------------------------------*/ /* Public Functions */ /*------------------------------------------------------------------------*/ /** * Run a query * @author Gabe Abrams * @param query the query to run * @param [includeMongoTimestamp] if true, include the timestamp * in the mongo objects * @returns documents */ public async find( query: Query, includeMongoTimestamp?: boolean, ): Promise { // Wait for initialization to complete const collection = await this.collection; // Get the list of matching items const items = await collection.find(query).toArray(); // Filter out internal ids and add timestamps const processedItems: unknown[] = items.map((item) => { return processFindResult(item, includeMongoTimestamp); }); return ( items ? processedItems : [] ) as DocumentType[]; } /** * Run a query with pagination * @author Yuen Ler Chow * @param query the query to run * @param perPage the number of items per page * @param pageNumber the page number to return, 1-indexed * @param [includeMongoTimestamp] if true, include the timestamp * in the mongo objects * @param [sortKey] the key to sort by, or _id if not provided * @param [sortDescending] if true, sort descending * @returns documents */ public async findPaged( opts: { query: Query, perPage?: number, pageNumber?: number, includeMongoTimestamp?: boolean, sortKey?: string, sortDescending?: boolean, }, ): Promise> { const { query, perPage = 10, pageNumber = 1, includeMongoTimestamp, sortKey = '_id', sortDescending, } = opts; if (pageNumber < 1) { throw new Error('Page number must be at least 1'); } if (perPage < 1) { throw new Error('Per page must be at least 1'); } // Wait for initialization to complete const collection = await this.collection; // Get the list of matching items for the specified page const items = await ( collection .find(query) .sort({ [sortKey]: sortDescending ? -1 : 1 }) .skip(perPage * (pageNumber - 1)) // Get 1 extra item to check if there is another page .limit(perPage + 1) .toArray() ); const hasAnotherPage = items.length > perPage; // Remove the extra item if there is another page if (hasAnotherPage) { items.pop(); } // Filter out internal ids and add timestamps const processedItems: unknown[] = items.map((item) => { return processFindResult(item, includeMongoTimestamp); }); return { items: ( items ? processedItems : [] ) as DocumentType[], currentPageNumber: pageNumber, perPage, hasAnotherPage, }; } /** * Find elements then only return the values for one specific property from * each item * @author Gabe Abrams * @param query the query to run * @param prop the name of the property to extract * @param [excludeFalsy] if true, exclude falsy values * @returns array of values of the property */ public async findAndExtractProp( query: Query, prop: string, excludeFalsy?: boolean, ): Promise { // Wait for initialization to complete const collection = await this.collection; // Get the list of matching items const items = await collection.find( query, { projection: { [prop]: 1 }, }, ).toArray(); // Only return the value of the prop, filter falsy values if requested return ( items .map((item) => { return item[prop]; }) .filter((item) => { return (!excludeFalsy || item); }) ); } /** * Count the number of matching elements * @author Gabe Abrams * @param query the query to run * @returns number of documents that match */ public async count(query: Query): Promise { // Wait for initialization to complete const collection = await this.collection; // Count const count = await collection.countDocuments(query); return count; } /** * List distinct values for a property in a collection * @author Gabe Abrams * @param prop the property to list distinct values for * @param [query] the query to run. If excluded, all distinct * values are included * @returns array of distinct values */ public async distinct( prop: string, query?: Query, ): Promise { // Wait for initialization to complete const collection = await this.collection; // Get distinct values const distinctValues = await collection.distinct( prop, query ?? {}, ); return distinctValues; } /** * Increment value of an integer for an object (doesn't reset TTL) * @author Gabe Abrams * @param id id of the object to increment * @param prop property to increment */ public async increment(id: Id, prop: string) { // Wait for initialization to complete const collection = await this.collection; // Increment return collection.findOneAndUpdate( { id, }, { $inc: { [prop]: 1, }, }, ); } /** * Increment value of an integer for an object, found by a query (doesn't reset TTL) * @author Gabe Abrams * @param query query to find the object to increment * @param prop property to increment */ public async incrementByQuery( query: { [k: string]: any }, prop: string, ) { // Wait for initialization to complete const collection = await this.collection; // Increment return collection.findOneAndUpdate( query, { $inc: { [prop]: 1, }, }, ); } /** * Add/update object values in an entry in the collection. The entry must * already exist (doesn't reset TTL) * @author Gabe Abrams * @param query query to apply to find the object to update * @param updates map of updates: { prop => value } where prop is * the potentially nested name of the property * (e.g. "age" or "profile.age") */ public async updatePropValues( query: Query, updates: Query, ) { // Wait for initialization to complete const collection = await this.collection; // Perform update return collection.findOneAndUpdate( query, { $set: updates, }, ); } /** * Add an object to an array in an object (doesn't reset TTL) * @author Gabe Abrams * @param id the id of the object to modify * @param arrayProp the name of the array to insert into * @param obj the object to insert into the array */ public async push( id: Id, arrayProp: string, obj: any, ) { // Wait for initialization to complete const collection = await this.collection; // Push return collection.findOneAndUpdate( { id, }, { $push: { [arrayProp]: obj, } as any, }, ); } /** * Filter an array of objects or primitives in an entry in the collection (doesn't reset TTL) * @author Gabe Abrams * @param opts object containing all args * @param opts.id the id of the object to modify * @param opts.arrayProp the name of the array to filter * @param [opts.compareProp] the name of the array entry prop to compare. * @param opts.compareValue the value of the array entry prop to filter * out */ public async filterOut( opts: { id: Id, arrayProp: string, compareProp?: string, compareValue: any, }, ) { const { id, arrayProp, compareProp, compareValue, } = opts; // Wait for initialization to complete const collection = await this.collection; // Perform update if (!compareProp) { return collection.findOneAndUpdate( { id, }, { $pull: { [arrayProp]: compareValue, } as any, // Cast is required because mongodb lib uses an improper type }, ); } return collection.findOneAndUpdate( { id, }, { $pull: { [arrayProp]: { [compareProp]: compareValue, }, } as any, // Cast is required because mongodb lib uses an improper type }, ); } /** * Write a record to the collection (DOES reset TTL) * @author Gabe Abrams * @param obj the object to insert */ public async insert(obj: DocumentType) { // Remove stuff from object const updatedObj = obj; delete (updatedObj as any).mongoTimestamp; // If using TTL, add last updated field if (this.usesTTL) { (updatedObj as any).lastUpdatedForTTL = (new Date()).toISOString(); } // Wait for initialization to complete const collection = await this.collection; // Check if we are inserting uniquely if (this.uniqueKey) { // Unique! Use replacement const query = { // Cast obj to any to force allowing this operation [this.uniqueKey]: (updatedObj as any)[this.uniqueKey], }; await collection.updateOne( query, // Query to find the option to replace { $set: updatedObj }, // Type of update is a "set" operation { upsert: true }, // Choose to replace ); } else { // Not unique. Just insert await collection.insertOne(updatedObj); } } /** * Delete the first document that matches the query in the collection * @author Gabe Abrams * @param query the query that will match the item */ public async delete(query: Query) { // Wait for initialization to complete const collection = await this.collection; // Delete the object await collection.deleteOne(query); } /** * Delete all documents that match the query in the collection * @author Gabe Abrams * @param query the query that will match the items to delete */ public async deleteAll(query: Query) { // Wait for initialization to complete const collection = await this.collection; // Delete all matches await collection.deleteMany(query); } /*------------------------------------------------------------------------*/ /* Concurrent Methods */ /*------------------------------------------------------------------------*/ /** * Creates a lock in the lock collection, to make sure modifications aren't made to the document with id=id. * @author Benedikt Arnarsson * @param id the 'id' of the document we want to lock. */ private async lock(id: Id) { if (!this.supportConcurrency) { throw new Error('Cannot call lock on a Collection without concurrency support!'); } const lockCollection = await this.lockCollection; const { serverId } = this; // Creating the lock with the document Id and serverId const lock = { id, serverId, }; /** * Process of acquiring the lock from the lock-collection, used by pollResource. * @author Benedikt Arnarsson * @returns the Lock which is needed for concurrent process, must be of type Promise */ const callResource = async () => { // Using updateOne w/ setOnInsert & upsert will make it only insert when there is no lock with same document Id return lockCollection.updateOne( // TODO: add time of insertion { id }, { $setOnInsert: lock }, { upsert: true }, ); }; /** * Process of validating that we acquired the lock from the lock-collection, used by pollResource. * @author Benedikt Arnarsson * @param res the output of acquiring the resource, in this case the lock. * @returns boolean indicating whether we successfully acquired the lock. */ const validateResult = (res: MongoDB.UpdateResult) => { return res.upsertedCount > 0; }; // Initiate pollResource const result = await pollResource({ callResource, validateResult, waitForMS: LOCK_POLL_INTERVAL_MS, // FIXME: when multiple servers are working // Add some randomness here so other servers can acquire lock timeOut: 2 * this.lockTimeToLiveMS, }); // Logging if ((process.env.MANGO_LOG_LEVEL ?? '').toLowerCase() === 'info') { log(`Locking on ${serverId} (${this.collectionName}):\n\t- upserted: ${result.upsertedCount}\n\t- matched: ${result.matchedCount}\n\t- modified: ${result.modifiedCount}`); } } /** * Inverse of lock operation. Unlock a document which you have locked. * @author Benedikt Arnarsson * @param id the 'id' of the document that is being unlocked. */ private async unlock(id: Id) { if (!this.supportConcurrency) { throw new Error('Cannot call unlock on a Collection without concurrency support!'); } const lockCollection = await this.lockCollection; const { serverId } = this; // Only deletes a lock with the same id *and* serverId await lockCollection.deleteOne({ id, serverId }); // Logging if ((process.env.MANGO_LOG_LEVEL ?? '').toLowerCase() === 'info') { log(`Unlocked document ${id}, with ${serverId} (${this.collectionName})`); } } /** * Given a function representing a set of operations on the collection and a set of ids to lock, * will run the function such that other collections cannot make modifications while the function runs. * For use in situations with multiple concurrent servers using the same database. * @author Benedikt Arnarsson * @param opts object containing all parameters * @param opts.idOrIdsToLock the one Id or list of Ids to lock for the procedure provided. * @param opts.procedure the procedure that is being wrapped in the lock-unlock calls * @returns the result of opts.procedure */ public async runAtomicProcedure( opts: { idOrIdsToLock: Id | Id[], procedure: CollectionProcedure, }, ): Promise { if (!this.supportConcurrency) { throw new Error('Cannot call runAtomicProcedure on a Collection without concurrency support!'); } // Destructure const { idOrIdsToLock, procedure, } = opts; const collection = await this.collection; const ids = ( Array.isArray(idOrIdsToLock) ? idOrIdsToLock : [idOrIdsToLock] ); const query = Object.fromEntries([ [this.uniqueKey, { $in: ids }], ]); // Get the list of matching items const items = await collection.find(query).toArray(); // Filter out internal ids const uniqueIndexValues: unknown[] = ( items ? items.map((item) => { return item[this.uniqueKey]; }) : [] ); let result: Result; try { // Lock all items await Promise.all( uniqueIndexValues.map(async (uniqueIndexValue) => { await this.lock(uniqueIndexValue as Id); }), ); // Execute the procedure result = await procedure(this); } catch (err) { if (err instanceof PollingError) { throw new PollingError('Exceeded timeout when attempting to lock for atomic procedure.'); } else { const errMsg = 'Failed to complete atomic procedure:'; err.message = `${errMsg} ${err.message}`; throw err; } } finally { // Unlock all items await Promise.all( uniqueIndexValues.map(async (uniqueIndexValue) => { await this.unlock(uniqueIndexValue as Id); }), ); } return result; } } export default Collection;