import { BehaviorSubject, Observable, Subject, filter, firstValueFrom } from 'rxjs'; import { getPrimaryFieldOfPrimaryKey } from 'nxdb-old/src/rx-schema-helper'; import type { BulkWriteRow, ById, EventBulk, RxConflictResultionTask, RxConflictResultionTaskSolution, RxDocumentData, RxDocumentDataById, RxJsonSchema, RxStorageBulkWriteResponse, RxStorageChangeEvent, RxStorageCountResult, RxStorageDefaultCheckpoint, RxStorageInstance, RxStorageInstanceCreationParams, RxStorageQueryResult, RxStorageWriteErrorConflict, StringKeys } from 'nxdb-old/src/types'; import { ensureNotFalsy, getFromMapOrThrow, getFromObjectOrThrow, isMaybeReadonlyArray, lastOfArray, now, requestIdlePromise, RX_META_LWT_MINIMUM } from 'nxdb-old/src/plugins/utils'; import { MongoDBPreparedQuery, MongoDBStorageInternals, MongoQuerySelector, RxStorageMongoDBInstanceCreationOptions, RxStorageMongoDBSettings } from 'nxdb-old/src/plugins/storage-mongodb/mongodb-types'; import { RxStorageMongoDB } from 'nxdb-old/src/plugins/storage-mongodb/rx-storage-mongodb'; import { Db as MongoDatabase, Collection as MongoCollection, MongoClient, ObjectId, ClientSession } from 'mongodb'; import { categorizeBulkWriteRows } from 'nxdb-old/src/rx-storage-helper'; import { MONGO_ID_SUBSTITUTE_FIELDNAME, getMongoDBIndexName, swapMongoToRxDoc, swapRxDocToMongo } from 'nxdb-old/src/plugins/storage-mongodb/mongodb-helper'; export class RxStorageInstanceMongoDB implements RxStorageInstance< RxDocType, MongoDBStorageInternals, RxStorageMongoDBInstanceCreationOptions, RxStorageDefaultCheckpoint > { public readonly primaryPath: StringKeys>; public readonly inMongoPrimaryPath: string; public closed = false; private readonly changes$: Subject>, RxStorageDefaultCheckpoint>> = new Subject(); public readonly mongoClient: MongoClient; public readonly mongoDatabase: MongoDatabase; public readonly mongoCollectionPromise: Promise | any>>; // public mongoChangeStream?: MongoChangeStream>; /** * Closing the connection must not happen when * an operation is running, otherwise we get an error. * So we store all running operations here so that * they can be awaited. */ public readonly runningOperations = new BehaviorSubject(0); public readonly runningWrites = new BehaviorSubject(0); /** * We use this to be able to still fetch * the objectId after transforming the document from mongo-style (with _id) * to NxDB */ public readonly mongoObjectIdCache = new WeakMap, ObjectId>(); constructor( public readonly storage: RxStorageMongoDB, public readonly databaseName: string, public readonly collectionName: string, public readonly schema: Readonly>>, public readonly internals: MongoDBStorageInternals, public readonly options: Readonly, public readonly settings: RxStorageMongoDBSettings ) { if (this.schema.attachments) { throw new Error('attachments not supported in mongodb storage, make a PR if you need that'); } this.primaryPath = getPrimaryFieldOfPrimaryKey(this.schema.primaryKey); this.inMongoPrimaryPath = this.primaryPath === '_id' ? MONGO_ID_SUBSTITUTE_FIELDNAME : this.primaryPath; this.mongoClient = new MongoClient(storage.databaseSettings.connection); this.mongoDatabase = this.mongoClient.db(databaseName + '-v' + this.schema.version); const indexes = (this.schema.indexes ? this.schema.indexes.slice() : []).map(index => { const arIndex = isMaybeReadonlyArray(index) ? index.slice(0) : [index]; return arIndex; }); indexes.push([this.inMongoPrimaryPath]); this.mongoCollectionPromise = this.mongoDatabase.createCollection(collectionName) .then(async (mongoCollection) => { await mongoCollection.createIndexes( indexes.map(index => { const mongoIndex: any = {}; index.forEach(field => mongoIndex[field] = 1); return { name: getMongoDBIndexName(index), key: mongoIndex }; }) ); /** * TODO in a setup where multiple servers run node.js * processes that use the mongodb storage, we should propagate * events by listening to the mongodb changestream. * This maybe should be a premium feature. */ // this.mongoChangeStream = mongoCollection.watch( // undefined, { // batchSize: 100 // } // ).on('change', change => { // const eventBulkId = randomCouchString(10); // const newDocData: RxDocumentData = (change as any).fullDocument; // const documentId = newDocData[this.primaryPath] as any; // const eventBulk: EventBulk>, RxStorageDefaultCheckpoint> = { // checkpoint: { // id: newDocData[this.primaryPath] as any, // lwt: newDocData._meta.lwt // }, // context: 'mongodb-write', // id: eventBulkId, // events: [{ // documentData: newDocData, // documentId, // eventId: randomCouchString(10), // operation: 'INSERT', // previousDocumentData: undefined, // startTime: now(), // endTime: now() // }] // }; // this.changes$.next(eventBulk); // }); return mongoCollection; }); } /** * Bulk writes on the mongodb storage. * Notice that MongoDB does not support cross-document transactions * so we have to do a update-if-previous-is-correct like operations. * (Similar to what NxDB does with the revision system) */ async bulkWrite( documentWrites: BulkWriteRow[], context: string ): Promise> { this.runningOperations.next(this.runningOperations.getValue() + 1); await firstValueFrom(this.runningWrites.pipe(filter(c => c === 0))); this.runningWrites.next(this.runningWrites.getValue() + 1); const mongoCollection = await this.mongoCollectionPromise; if (this.closed) { return Promise.reject(new Error('already closed')); } const primaryPath = this.primaryPath; const ret: RxStorageBulkWriteResponse = { success: {}, error: {} }; const docIds = documentWrites.map(d => (d.document as any)[primaryPath]); const documentStates = await this.findDocumentsById( docIds, true ); const categorized = categorizeBulkWriteRows( this, primaryPath as any, documentStates, documentWrites, context ); ret.error = categorized.errors; /** * Reset the event bulk because * conflicts can still appear after the categorization */ const eventBulk = categorized.eventBulk; eventBulk.events = []; await Promise.all([ /** * Inserts * @link https://sparkbyexamples.com/mongodb/mongodb-insert-if-not-exists/ */ Promise.all( categorized.bulkInsertDocs.map(async (writeRow) => { const docId: string = writeRow.document[primaryPath] as any; const writeResult = await mongoCollection.findOneAndUpdate( { [this.inMongoPrimaryPath]: docId }, { $setOnInsert: swapRxDocToMongo(writeRow.document) }, { upsert: true, includeResultMetadata: true } ); if (writeResult.value) { // had insert conflict const conflictError: RxStorageWriteErrorConflict = { status: 409, documentId: docId, writeRow, documentInDb: swapMongoToRxDoc(writeResult.value), isError: true }; ret.error[docId] = conflictError; } else { const event = categorized.changeByDocId.get(docId); if (event) { eventBulk.events.push(event); } ret.success[docId as any] = writeRow.document; } }) ), /** * Updates */ Promise.all( categorized.bulkUpdateDocs.map(async (writeRow) => { const docId = writeRow.document[primaryPath] as string; const writeResult = await mongoCollection.findOneAndReplace( { [this.inMongoPrimaryPath]: docId, _rev: ensureNotFalsy(writeRow.previous)._rev }, swapRxDocToMongo(writeRow.document), { includeResultMetadata: true, upsert: false } ); if (!writeResult.value) { const currentDocState = await this.findDocumentsById([docId], true); const currentDoc = getFromObjectOrThrow(currentDocState, docId); // had insert conflict const conflictError: RxStorageWriteErrorConflict = { status: 409, documentId: docId, writeRow, documentInDb: currentDoc, isError: true }; ret.error[docId] = conflictError; } else { const event = getFromMapOrThrow(categorized.changeByDocId, docId); eventBulk.events.push(event); ret.success[docId as any] = writeRow.document; } }) ) ]); if (categorized.eventBulk.events.length > 0) { const lastState = ensureNotFalsy(categorized.newestRow).document; categorized.eventBulk.checkpoint = { id: lastState[primaryPath], lwt: lastState._meta.lwt }; this.changes$.next(categorized.eventBulk); } this.runningWrites.next(this.runningWrites.getValue() - 1); this.runningOperations.next(this.runningOperations.getValue() - 1); return ret; } async findDocumentsById( docIds: string[], withDeleted: boolean, session?: ClientSession ): Promise> { this.runningOperations.next(this.runningOperations.getValue() + 1); const mongoCollection = await this.mongoCollectionPromise; const primaryPath = this.primaryPath; const plainQuery: MongoQuerySelector = { [primaryPath]: { $in: docIds } }; if (!withDeleted) { plainQuery._deleted = false; } const result: ById> = {}; const queryResult = await mongoCollection.find( plainQuery, { session } ).toArray(); queryResult.forEach(row => { result[(row as any)[primaryPath]] = swapMongoToRxDoc( row as any ); }); this.runningOperations.next(this.runningOperations.getValue() - 1); return result; } async query( preparedQuery: MongoDBPreparedQuery ): Promise> { this.runningOperations.next(this.runningOperations.getValue() + 1); const mongoCollection = await this.mongoCollectionPromise; let query = mongoCollection.find(preparedQuery.mongoSelector); if (preparedQuery.query.skip) { query = query.skip(preparedQuery.query.skip); } if (preparedQuery.query.limit) { query = query.limit(preparedQuery.query.limit); } if (preparedQuery.query.sort) { query = query.sort(preparedQuery.mongoSort); } const resultDocs = await query.toArray(); this.runningOperations.next(this.runningOperations.getValue() - 1); return { documents: resultDocs.map(d => swapMongoToRxDoc(d)) }; } async count( preparedQuery: MongoDBPreparedQuery ): Promise { this.runningOperations.next(this.runningOperations.getValue() + 1); const mongoCollection = await this.mongoCollectionPromise; const count = await mongoCollection.countDocuments(preparedQuery.mongoSelector); this.runningOperations.next(this.runningOperations.getValue() - 1); return { count, mode: 'fast' }; } async getChangedDocumentsSince( limit: number, checkpoint?: RxStorageDefaultCheckpoint ): Promise<{ documents: RxDocumentData[]; checkpoint: RxStorageDefaultCheckpoint; }> { this.runningOperations.next(this.runningOperations.getValue() + 1); const mongoCollection = await this.mongoCollectionPromise; const sinceLwt = checkpoint ? checkpoint.lwt : RX_META_LWT_MINIMUM; const plainQuery = { $or: [ { '_meta.lwt': { $gt: sinceLwt } }, { '_meta.lwt': { $eq: sinceLwt }, [this.inMongoPrimaryPath]: { $gt: checkpoint ? checkpoint.id : '' } } ] }; const query = mongoCollection.find(plainQuery) .sort({ '_meta.lwt': 1, [this.inMongoPrimaryPath]: 1 }) .limit(limit); const documents = await query.toArray(); const lastDoc = lastOfArray(documents); this.runningOperations.next(this.runningOperations.getValue() - 1); return { documents: documents.map(d => swapMongoToRxDoc(d)), checkpoint: lastDoc ? { id: lastDoc[this.primaryPath], lwt: lastDoc._meta.lwt } : checkpoint ? checkpoint : { id: '', lwt: 0 } }; } async cleanup(minimumDeletedTime: number): Promise { this.runningOperations.next(this.runningOperations.getValue() + 1); const mongoCollection = await this.mongoCollectionPromise; const maxDeletionTime = now() - minimumDeletedTime; await mongoCollection.deleteMany({ _deleted: true, '_meta.lwt': { $lt: maxDeletionTime } }); this.runningOperations.next(this.runningOperations.getValue() - 1); return true; } async getAttachmentData( _documentId: string, _attachmentId: string, _digest: string ): Promise { await this.mongoCollectionPromise; throw new Error('attachments not implemented, make a PR'); } changeStream(): Observable>, RxStorageDefaultCheckpoint>> { return this.changes$; } async remove(): Promise { this.runningOperations.next(this.runningOperations.getValue() + 1); const mongoCollection = await this.mongoCollectionPromise; await mongoCollection.drop(); this.runningOperations.next(this.runningOperations.getValue() - 1); await this.close(); } async close(): Promise { // TODO without this next-tick we have random fails in the tests await requestIdlePromise(200); if (this.closed) { return Promise.reject(new Error('already closed')); } this.closed = true; await this.mongoCollectionPromise; await firstValueFrom(this.runningOperations.pipe(filter(c => c === 0))); // await ensureNotFalsy(this.mongoChangeStream).close(); await this.mongoClient.close(); } conflictResultionTasks(): Observable> { return new Subject(); } async resolveConflictResultionTask(_taskSolution: RxConflictResultionTaskSolution): Promise { } } export function createMongoDBStorageInstance( storage: RxStorageMongoDB, params: RxStorageInstanceCreationParams, settings: RxStorageMongoDBSettings ): Promise> { const instance = new RxStorageInstanceMongoDB( storage, params.databaseName, params.collectionName, params.schema, {}, params.options, settings ); return Promise.resolve(instance); }