import { Observable, Subject } from 'rxjs'; import { getPrimaryFieldOfPrimaryKey } from '../../rx-schema-helper.ts'; import type { BulkWriteRow, CategorizeBulkWriteRowsOutput, EventBulk, PreparedQuery, RxDocumentData, RxJsonSchema, RxStorageBulkWriteResponse, RxStorageChangeEvent, RxStorageCountResult, RxStorageDefaultCheckpoint, RxStorageInstance, RxStorageInstanceCreationParams, RxStorageQueryResult, StringKeys } from '../../types/index.d.ts'; import type { FoundationDBDatabase, FoundationDBIndexMeta, FoundationDBStorageInternals, RxStorageFoundationDB, RxStorageFoundationDBInstanceCreationOptions, RxStorageFoundationDBSettings } from './foundationdb-types.ts'; // import { // open as foundationDBOpen, // directory as foundationDBDirectory, // encoders as foundationDBEncoders, // keySelector as foundationDBKeySelector, // StreamingMode as foundationDBStreamingMode // } from 'foundationdb'; import { categorizeBulkWriteRows } from '../../rx-storage-helper.ts'; import { CLEANUP_INDEX, FOUNDATION_DB_WRITE_BATCH_SIZE, getFoundationDBIndexName } from './foundationdb-helpers.ts'; import { getIndexableStringMonad, getStartIndexStringFromLowerBound, getStartIndexStringFromUpperBound } from '../../custom-index.ts'; import { batchArray, ensureNotFalsy, now, PROMISE_RESOLVE_VOID, toArray } from '../../plugins/utils/index.ts'; import { queryFoundationDB } from './foundationdb-query.ts'; import { INDEX_MAX } from '../../query-planner.ts'; import { attachmentMapKey } from '../storage-memory/index.ts'; export class RxStorageInstanceFoundationDB implements RxStorageInstance< RxDocType, FoundationDBStorageInternals, RxStorageFoundationDBInstanceCreationOptions, RxStorageDefaultCheckpoint > { public readonly primaryPath: StringKeys>; public closed?: Promise; private changes$: Subject>, RxStorageDefaultCheckpoint>> = new Subject(); constructor( public readonly storage: RxStorageFoundationDB, public readonly databaseName: string, public readonly collectionName: string, public readonly schema: Readonly>>, public readonly internals: FoundationDBStorageInternals, public readonly options: Readonly, public readonly settings: RxStorageFoundationDBSettings ) { this.primaryPath = getPrimaryFieldOfPrimaryKey(this.schema.primaryKey); } async bulkWrite( documentWrites: BulkWriteRow[], context: string ): Promise> { const dbs = await this.internals.dbsPromise; const ret: RxStorageBulkWriteResponse = { error: [] }; /** * Doing too many write in a single transaction * will throw with a 'Transaction exceeds byte limit' * so we have to batch up the writes. */ const writeBatches = batchArray(documentWrites, FOUNDATION_DB_WRITE_BATCH_SIZE); await Promise.all( writeBatches.map(async (writeBatch) => { let categorized: CategorizeBulkWriteRowsOutput | undefined = null as any; /** * Pre-convert all attachment Blobs to Buffers before entering the transaction. * This avoids holding the FDB transaction open during async Blob reads, * which can trigger 'Transaction exceeds byte limit' or timeout errors. */ const preConvertedBuffers = new Map(); await Promise.all( writeBatch.flatMap(row => { const docId: string = (row.document as any)[this.primaryPath]; const attachments: Record = (row.document as any)._attachments || {}; return Object.entries(attachments) .filter(([, att]) => att.data instanceof Blob) .map(async ([attId, att]) => { const buffer = Buffer.from(await att.data.arrayBuffer()); preConvertedBuffers.set(attachmentMapKey(docId, attId), buffer); }); }) ); await dbs.root.doTransaction(async (tx: any) => { const ids = writeBatch.map(row => (row.document as any)[this.primaryPath]); const mainTx = tx.at(dbs.main.subspace); const attachmentTx = tx.at(dbs.attachments.subspace); const docsInDB = new Map>(); /** * This might be faster if fdb * any time adds a bulk-fetch-by-key method. */ await Promise.all( ids.map(async (id) => { const doc = await mainTx.get(id); docsInDB.set(id, doc); }) ); categorized = categorizeBulkWriteRows( this, this.primaryPath as any, docsInDB, writeBatch, context ); ret.error = ret.error.concat(categorized.errors); // INSERTS categorized.bulkInsertDocs.forEach(writeRow => { const docId: string = writeRow.document[this.primaryPath] as any; // insert document data mainTx.set(docId, writeRow.document); // insert secondary indexes Object.values(dbs.indexes).forEach(indexMeta => { const indexString = indexMeta.getIndexableString(writeRow.document as any); const indexTx = tx.at(indexMeta.db.subspace); indexTx.set(indexString, docId); }); }); // UPDATES categorized.bulkUpdateDocs.forEach((writeRow: BulkWriteRow) => { const docId: string = writeRow.document[this.primaryPath] as any; // overwrite document data mainTx.set(docId, writeRow.document); // update secondary indexes Object.values(dbs.indexes).forEach(indexMeta => { const oldIndexString = indexMeta.getIndexableString(ensureNotFalsy(writeRow.previous)); const newIndexString = indexMeta.getIndexableString(writeRow.document as any); if (oldIndexString !== newIndexString) { const indexTx = tx.at(indexMeta.db.subspace); indexTx.delete(oldIndexString); indexTx.set(newIndexString, docId); } }); }); // attachments // FoundationDB stores attachment data as raw binary Buffers. // Buffers were pre-converted from Blobs before the transaction started. for (const attachment of categorized.attachmentsAdd) { const key = attachmentMapKey(attachment.documentId, attachment.attachmentId); const buffer = preConvertedBuffers.get(key)!; attachmentTx.set(key, buffer); } for (const attachment of categorized.attachmentsUpdate) { const key = attachmentMapKey(attachment.documentId, attachment.attachmentId); const buffer = preConvertedBuffers.get(key)!; attachmentTx.set(key, buffer); } categorized.attachmentsRemove.forEach(attachment => { attachmentTx.delete( attachmentMapKey(attachment.documentId, attachment.attachmentId) ); }); }); categorized = ensureNotFalsy(categorized); /** * The events must be emitted AFTER the transaction * has finished. * Otherwise an observable changestream might cause a read * to a document that does not already exist outside of the transaction. */ if (categorized.eventBulk.events.length > 0) { const lastState = ensureNotFalsy(categorized.newestRow).document; categorized.eventBulk.checkpoint = { id: lastState[this.primaryPath], lwt: lastState._meta.lwt }; this.changes$.next(categorized.eventBulk); } }) ); return ret; } async findDocumentsById(ids: string[], withDeleted: boolean): Promise[]> { const dbs = await this.internals.dbsPromise; return dbs.main.doTransaction(async (tx: any) => { const ret: RxDocumentData[] = []; await Promise.all( ids.map(async (docId) => { const docInDb = await tx.get(docId); if ( docInDb && ( !docInDb._deleted || withDeleted ) ) { ret.push(docInDb); } }) ); return ret; }); } query(preparedQuery: PreparedQuery): Promise> { return queryFoundationDB(this, preparedQuery); } async count( preparedQuery: PreparedQuery ): Promise { /** * At this point in time (end 2022), FoundationDB does not support * range counts. So we have to run a normal query and use the result set length. * @link https://github.com/apple/foundationdb/issues/5981 */ const result = await this.query(preparedQuery); return { count: result.documents.length, mode: 'fast' }; } async getAttachmentData(documentId: string, attachmentId: string, _digest: string): Promise { const dbs = await this.internals.dbsPromise; const key = attachmentMapKey(documentId, attachmentId); const buffer = await dbs.attachments.get(key); if (!buffer) { throw new Error('attachment does not exist: ' + key); } return new Blob([buffer]); } changeStream(): Observable, RxStorageDefaultCheckpoint>> { return this.changes$.asObservable(); } async remove(): Promise { const dbs = await this.internals.dbsPromise; await dbs.root.doTransaction((tx: any) => { tx.clearRange('', INDEX_MAX); return PROMISE_RESOLVE_VOID; }); return this.close(); } async cleanup(minimumDeletedTime: number): Promise { const { keySelector, StreamingMode } = require('foundationdb'); const maxDeletionTime = now() - minimumDeletedTime; const dbs = await this.internals.dbsPromise; const index = CLEANUP_INDEX; const indexName = getFoundationDBIndexName(index); const indexMeta = dbs.indexes[indexName]; const lowerBoundString = getStartIndexStringFromLowerBound( this.schema, index, [ true, /** * Do not use 0 here, * because 1 is the minimum value for _meta.lwt */ 1 ] ); const upperBoundString = getStartIndexStringFromUpperBound( this.schema, index, [ true, maxDeletionTime ] ); let noMoreUndeleted: boolean = true; await dbs.root.doTransaction(async (tx: any) => { const batchSize = ensureNotFalsy(this.settings.batchSize); const indexTx = tx.at(indexMeta.db.subspace); const mainTx = tx.at(dbs.main.subspace); const range = await indexTx.getRangeAll( keySelector.firstGreaterThan(lowerBoundString), upperBoundString, { limit: batchSize + 1, // get one more extra to detect what to return from cleanup() streamingMode: StreamingMode.Exact } ); if (range.length > batchSize) { noMoreUndeleted = false; range.pop(); } const docIds = range.map((row: string[]) => row[1]); const docsData: RxDocumentData[] = await Promise.all(docIds.map((docId: string) => mainTx.get(docId))); Object .values(dbs.indexes) .forEach(indexMetaInner => { const subIndexDB = tx.at(indexMetaInner.db.subspace); docsData.forEach(docData => { const indexString = indexMetaInner.getIndexableString(docData); subIndexDB.delete(indexString); }); }); docIds.forEach((id: string) => mainTx.delete(id)); }); return noMoreUndeleted; } async close() { if (this.closed) { return this.closed; } this.closed = (async () => { this.changes$.complete(); const dbs = await this.internals.dbsPromise; await dbs.root.close(); // TODO shouldn't we close the index databases? // Object.values(dbs.indexes).forEach(db => db.close()); })(); return this.closed; } } export function createFoundationDBStorageInstance( storage: RxStorageFoundationDB, params: RxStorageInstanceCreationParams, settings: RxStorageFoundationDBSettings ): Promise> { const primaryPath = getPrimaryFieldOfPrimaryKey(params.schema.primaryKey); const { open, directory, encoders } = require('foundationdb'); const connection = open(settings.clusterFile); const dbsPromise = (async () => { const dir = await directory.createOrOpen(connection, 'rxdb'); const root = connection .at(dir) .at(params.databaseName + '.') .at(params.collectionName + '.') .at(params.schema.version + '.'); const main: FoundationDBDatabase = root .at('main.') .withKeyEncoding(encoders.string) // automatically encode & decode keys using tuples .withValueEncoding(encoders.json) as any; // and values using JSON const events: FoundationDBDatabase>, RxStorageDefaultCheckpoint>> = root .at('events.') .withKeyEncoding(encoders.string) .withValueEncoding(encoders.json) as any; const attachments: FoundationDBDatabase = root .at('attachments.') .withKeyEncoding(encoders.string) .withValueEncoding(encoders.buffer) as any; const indexDBs: { [indexName: string]: FoundationDBIndexMeta; } = {}; const useIndexes = params.schema.indexes ? params.schema.indexes.slice(0) : []; useIndexes.push([primaryPath]); const useIndexesFinal = useIndexes.map(index => { const indexAr = toArray(index); return indexAr; }); // used for `getChangedDocumentsSince()` useIndexesFinal.push([ '_meta.lwt', primaryPath ]); useIndexesFinal.push(CLEANUP_INDEX); useIndexesFinal.forEach(indexAr => { const indexName = getFoundationDBIndexName(indexAr); const indexDB = root.at(indexName + '.') .withKeyEncoding(encoders.string) .withValueEncoding(encoders.string); indexDBs[indexName] = { indexName, db: indexDB, getIndexableString: getIndexableStringMonad(params.schema, indexAr), index: indexAr }; }); return { root, main, events, attachments, indexes: indexDBs }; })(); const internals: FoundationDBStorageInternals = { connection, dbsPromise: dbsPromise }; const instance = new RxStorageInstanceFoundationDB( storage, params.databaseName, params.collectionName, params.schema, internals, params.options, settings ); return Promise.resolve(instance); }