import { Observable, Subject } from 'rxjs'; import { getPrimaryFieldOfPrimaryKey } from 'nxdb-old/src/rx-schema-helper'; import type { BulkWriteRow, CategorizeBulkWriteRowsOutput, EventBulk, RxAttachmentWriteData, RxConflictResultionTask, RxConflictResultionTaskSolution, RxDocumentData, RxDocumentDataById, RxJsonSchema, RxStorageBulkWriteResponse, RxStorageChangeEvent, RxStorageCountResult, RxStorageDefaultCheckpoint, RxStorageInstance, RxStorageInstanceCreationParams, RxStorageQueryResult, StringKeys } from 'nxdb-old/src/types'; import type { FoundationDBDatabase, FoundationDBIndexMeta, FoundationDBPreparedQuery, FoundationDBStorageInternals, RxStorageFoundationDB, RxStorageFoundationDBInstanceCreationOptions, RxStorageFoundationDBSettings } from 'nxdb-old/src/plugins/storage-foundationdb/foundationdb-types'; // import { // open as foundationDBOpen, // directory as foundationDBDirectory, // encoders as foundationDBEncoders, // keySelector as foundationDBKeySelector, // StreamingMode as foundationDBStreamingMode // } from 'foundationdb'; import { categorizeBulkWriteRows } from 'nxdb-old/src/rx-storage-helper'; import { CLEANUP_INDEX, FOUNDATION_DB_WRITE_BATCH_SIZE, getFoundationDBIndexName } from 'nxdb-old/src/plugins/storage-foundationdb/foundationdb-helpers'; import { getIndexableStringMonad, getStartIndexStringFromLowerBound, getStartIndexStringFromUpperBound } from 'nxdb-old/src/custom-index'; import { appendToArray, batchArray, ensureNotFalsy, lastOfArray, now, PROMISE_RESOLVE_VOID, toArray } from 'nxdb-old/src/plugins/utils'; import { queryFoundationDB } from 'nxdb-old/src/plugins/storage-foundationdb/foundationdb-query'; import { INDEX_MAX } from 'nxdb-old/src/query-planner'; import { attachmentMapKey } from 'nxdb-old/src/plugins/storage-memory'; export class RxStorageInstanceFoundationDB implements RxStorageInstance< RxDocType, FoundationDBStorageInternals, RxStorageFoundationDBInstanceCreationOptions, RxStorageDefaultCheckpoint > { public readonly primaryPath: StringKeys>; public closed = false; private changes$: Subject>, RxStorageDefaultCheckpoint>> = new Subject(); constructor( public readonly storage: RxStorageFoundationDB, public readonly databaseName: string, public readonly collectionName: string, public readonly schema: Readonly>>, public readonly internals: FoundationDBStorageInternals, public readonly options: Readonly, public readonly settings: RxStorageFoundationDBSettings ) { this.primaryPath = getPrimaryFieldOfPrimaryKey(this.schema.primaryKey); } async bulkWrite( documentWrites: BulkWriteRow[], context: string ): Promise> { const dbs = await this.internals.dbsPromise; const ret: RxStorageBulkWriteResponse = { success: {}, error: {} }; /** * Doing too many write in a single transaction * will throw with a 'Transaction exceeds byte limit' * so we have to batch up the writes. */ const writeBatches = batchArray(documentWrites, FOUNDATION_DB_WRITE_BATCH_SIZE); await Promise.all( writeBatches.map(async (writeBatch) => { let categorized: CategorizeBulkWriteRowsOutput | undefined = null as any; await dbs.root.doTransaction(async (tx: any) => { const ids = writeBatch.map(row => (row.document as any)[this.primaryPath]); const mainTx = tx.at(dbs.main.subspace); const attachmentTx = tx.at(dbs.attachments.subspace); const docsInDB = new Map>(); /** * TODO this might be faster if fdb * any time adds a bulk-fetch-by-key method. */ await Promise.all( ids.map(async (id) => { const doc = await mainTx.get(id); docsInDB.set(id, doc); }) ); categorized = categorizeBulkWriteRows( this, this.primaryPath as any, docsInDB, writeBatch, context ); Object.keys(categorized.errors).forEach(errorKey => { ret.error[errorKey] = ensureNotFalsy(categorized).errors[errorKey]; }); // INSERTS categorized.bulkInsertDocs.forEach(writeRow => { const docId: string = writeRow.document[this.primaryPath] as any; ret.success[docId] = writeRow.document as any; // insert document data mainTx.set(docId, writeRow.document); // insert secondary indexes Object.values(dbs.indexes).forEach(indexMeta => { const indexString = indexMeta.getIndexableString(writeRow.document as any); const indexTx = tx.at(indexMeta.db.subspace); indexTx.set(indexString, docId); }); }); // UPDATES categorized.bulkUpdateDocs.forEach((writeRow: BulkWriteRow) => { const docId: string = writeRow.document[this.primaryPath] as any; // overwrite document data mainTx.set(docId, writeRow.document); // update secondary indexes Object.values(dbs.indexes).forEach(indexMeta => { const oldIndexString = indexMeta.getIndexableString(ensureNotFalsy(writeRow.previous)); const newIndexString = indexMeta.getIndexableString(writeRow.document as any); if (oldIndexString !== newIndexString) { const indexTx = tx.at(indexMeta.db.subspace); indexTx.delete(oldIndexString); indexTx.set(newIndexString, docId); } }); ret.success[docId] = writeRow.document as any; }); // attachments categorized.attachmentsAdd.forEach(attachment => { attachmentTx.set( attachmentMapKey(attachment.documentId, attachment.attachmentId), attachment.attachmentData ); }); categorized.attachmentsUpdate.forEach(attachment => { attachmentTx.set( attachmentMapKey(attachment.documentId, attachment.attachmentId), attachment.attachmentData ); }); categorized.attachmentsRemove.forEach(attachment => { attachmentTx.delete( attachmentMapKey(attachment.documentId, attachment.attachmentId) ); }); }); categorized = ensureNotFalsy(categorized); /** * The events must be emitted AFTER the transaction * has finished. * Otherwise an observable changestream might cause a read * to a document that does not already exist outside of the transaction. */ if (categorized.eventBulk.events.length > 0) { const lastState = ensureNotFalsy(categorized.newestRow).document; categorized.eventBulk.checkpoint = { id: lastState[this.primaryPath], lwt: lastState._meta.lwt }; this.changes$.next(categorized.eventBulk); } }) ); return ret; } async findDocumentsById(ids: string[], withDeleted: boolean): Promise> { const dbs = await this.internals.dbsPromise; return dbs.main.doTransaction(async (tx: any) => { const ret: RxDocumentDataById = {}; await Promise.all( ids.map(async (docId) => { const docInDb = await tx.get(docId); if ( docInDb && ( !docInDb._deleted || withDeleted ) ) { ret[docId] = docInDb; } }) ); return ret; }); } query(preparedQuery: FoundationDBPreparedQuery): Promise> { return queryFoundationDB(this, preparedQuery); } async count( preparedQuery: FoundationDBPreparedQuery ): Promise { /** * At this point in time (end 2022), FoundationDB does not support * range counts. So we have to run a normal query and use the result set length. * @link https://github.com/apple/foundationdb/issues/5981 */ const result = await this.query(preparedQuery); return { count: result.documents.length, mode: 'fast' }; } async getAttachmentData(documentId: string, attachmentId: string, _digest: string): Promise { const dbs = await this.internals.dbsPromise; const attachment = await dbs.attachments.get(attachmentMapKey(documentId, attachmentId)); return attachment.data; } async getChangedDocumentsSince(limit: number, checkpoint?: RxStorageDefaultCheckpoint): Promise<{ documents: RxDocumentData[]; checkpoint: RxStorageDefaultCheckpoint; }> { const { keySelector, StreamingMode } = require('foundationdb'); const dbs = await this.internals.dbsPromise; const index = [ '_meta.lwt', this.primaryPath as any ]; const indexName = getFoundationDBIndexName(index); const indexMeta = dbs.indexes[indexName]; let lowerBoundString = ''; if (checkpoint) { const checkpointPartialDoc: any = { [this.primaryPath]: checkpoint.id, _meta: { lwt: checkpoint.lwt } }; lowerBoundString = indexMeta.getIndexableString(checkpointPartialDoc); } const result: RxDocumentData[] = await dbs.root.doTransaction(async (tx: any) => { const innerResult: RxDocumentData[] = []; const indexTx = tx.at(indexMeta.db.subspace); const mainTx = tx.at(dbs.main.subspace); const range = await indexTx.getRangeAll( keySelector.firstGreaterThan(lowerBoundString), INDEX_MAX, { limit, streamingMode: StreamingMode.Exact } ); const docIds = range.map((row: string[]) => row[1]); const docsData: RxDocumentData[] = await Promise.all( docIds.map((docId: string) => mainTx.get(docId)) ); appendToArray(innerResult, docsData); return innerResult; }); const lastDoc = lastOfArray(result); return { documents: result, checkpoint: lastDoc ? { id: lastDoc[this.primaryPath] as any, lwt: lastDoc._meta.lwt } : checkpoint ? checkpoint : { id: '', lwt: 0 } }; } changeStream(): Observable, RxStorageDefaultCheckpoint>> { return this.changes$.asObservable(); } async remove(): Promise { const dbs = await this.internals.dbsPromise; await dbs.root.doTransaction((tx: any) => { tx.clearRange('', INDEX_MAX); return PROMISE_RESOLVE_VOID; }); return this.close(); } async cleanup(minimumDeletedTime: number): Promise { const { keySelector, StreamingMode } = require('foundationdb'); const maxDeletionTime = now() - minimumDeletedTime; const dbs = await this.internals.dbsPromise; const index = CLEANUP_INDEX; const indexName = getFoundationDBIndexName(index); const indexMeta = dbs.indexes[indexName]; const lowerBoundString = getStartIndexStringFromLowerBound( this.schema, index, [ true, /** * Do not use 0 here, * because 1 is the minimum value for _meta.lwt */ 1 ], false ); const upperBoundString = getStartIndexStringFromUpperBound( this.schema, index, [ true, maxDeletionTime ], true ); let noMoreUndeleted: boolean = true; await dbs.root.doTransaction(async (tx: any) => { const batchSize = ensureNotFalsy(this.settings.batchSize); const indexTx = tx.at(indexMeta.db.subspace); const mainTx = tx.at(dbs.main.subspace); const range = await indexTx.getRangeAll( keySelector.firstGreaterThan(lowerBoundString), upperBoundString, { limit: batchSize + 1, // get one more extra to detect what to return from cleanup() streamingMode: StreamingMode.Exact } ); if (range.length > batchSize) { noMoreUndeleted = false; range.pop(); } const docIds = range.map((row: string[]) => row[1]); const docsData: RxDocumentData[] = await Promise.all(docIds.map((docId: string) => mainTx.get(docId))); Object .values(dbs.indexes) .forEach(indexMetaInner => { const subIndexDB = tx.at(indexMetaInner.db.subspace); docsData.forEach(docData => { const indexString = indexMetaInner.getIndexableString(docData); subIndexDB.delete(indexString); }); }); docIds.forEach((id: string) => mainTx.delete(id)); }); return noMoreUndeleted; } conflictResultionTasks(): Observable> { return new Subject().asObservable(); } resolveConflictResultionTask(_taskSolution: RxConflictResultionTaskSolution): Promise { return PROMISE_RESOLVE_VOID; } async close() { if (this.closed) { return Promise.reject(new Error('already closed')); } this.closed = true; this.changes$.complete(); const dbs = await this.internals.dbsPromise; dbs.root.close(); // TODO shouldn't we close the index databases? // Object.values(dbs.indexes).forEach(db => db.close()); } } export function createFoundationDBStorageInstance( storage: RxStorageFoundationDB, params: RxStorageInstanceCreationParams, settings: RxStorageFoundationDBSettings ): Promise> { const primaryPath = getPrimaryFieldOfPrimaryKey(params.schema.primaryKey); const { open, directory, encoders } = require('foundationdb'); const connection = open(settings.clusterFile); const dbsPromise = (async () => { const dir = await directory.createOrOpen(connection, 'nxdb'); const root = connection .at(dir) .at(params.databaseName + '.') .at(params.collectionName + '.') .at(params.schema.version + '.'); const main: FoundationDBDatabase = root .at('main.') .withKeyEncoding(encoders.string) // automatically encode & decode keys using tuples .withValueEncoding(encoders.json) as any; // and values using JSON const events: FoundationDBDatabase>, RxStorageDefaultCheckpoint>> = root .at('events.') .withKeyEncoding(encoders.string) .withValueEncoding(encoders.json) as any; const attachments: FoundationDBDatabase = root .at('attachments.') .withKeyEncoding(encoders.string) .withValueEncoding(encoders.json) as any; const indexDBs: { [indexName: string]: FoundationDBIndexMeta; } = {}; const useIndexes = params.schema.indexes ? params.schema.indexes.slice(0) : []; useIndexes.push([primaryPath]); const useIndexesFinal = useIndexes.map(index => { const indexAr = toArray(index); indexAr.unshift('_deleted'); return indexAr; }); // used for `getChangedDocumentsSince()` useIndexesFinal.push([ '_meta.lwt', primaryPath ]); useIndexesFinal.push(CLEANUP_INDEX); useIndexesFinal.forEach(indexAr => { const indexName = getFoundationDBIndexName(indexAr); const indexDB = root.at(indexName + '.') .withKeyEncoding(encoders.string) .withValueEncoding(encoders.string); indexDBs[indexName] = { indexName, db: indexDB, getIndexableString: getIndexableStringMonad(params.schema, indexAr), index: indexAr }; }); return { root, main, events, attachments, indexes: indexDBs }; })(); const internals: FoundationDBStorageInternals = { connection, dbsPromise: dbsPromise }; const instance = new RxStorageInstanceFoundationDB( storage, params.databaseName, params.collectionName, params.schema, internals, params.options, settings ); return Promise.resolve(instance); }