/** * Helper functions for accessing the RxStorage instances. */ import { overwritable } from 'nxdb-old/src/overwritable'; import { newRxError } from 'nxdb-old/src/rx-error'; import { fillPrimaryKey, getPrimaryFieldOfPrimaryKey } from 'nxdb-old/src/rx-schema-helper'; import type { BulkWriteRow, BulkWriteRowProcessed, ById, CategorizeBulkWriteRowsOutput, EventBulk, RxAttachmentData, RxAttachmentWriteData, RxChangeEvent, RxCollection, RxDatabase, RxDocumentData, RxDocumentWriteData, RxJsonSchema, RxStorageWriteError, RxStorageChangeEvent, RxStorageInstance, RxStorageInstanceCreationParams, StringKeys, RxStorageWriteErrorConflict, RxStorageWriteErrorAttachment } from 'nxdb-old/src/types'; import { createRevision, defaultHashSha256, ensureNotFalsy, firstPropertyValueOfObject, flatClone, getDefaultRevision, getDefaultRxDocumentMeta, now, randomCouchString } from 'nxdb-old/src/plugins/utils'; export const INTERNAL_STORAGE_NAME = '_nxdb_internal'; export const RX_DATABASE_LOCAL_DOCS_STORAGE_NAME = 'rxdatabase_storage_local'; export async function getSingleDocument( storageInstance: RxStorageInstance, documentId: string ): Promise | null> { const results = await storageInstance.findDocumentsById([documentId], false); const doc = results[documentId]; if (doc) { return doc; } else { return null; } } /** * Writes a single document, * throws RxStorageBulkWriteError on failure */ export async function writeSingle( instance: RxStorageInstance, writeRow: BulkWriteRow, context: string ): Promise> { const writeResult = await instance.bulkWrite( [writeRow], context ); if (Object.keys(writeResult.error).length > 0) { const error = firstPropertyValueOfObject(writeResult.error); throw error; } else { const ret = firstPropertyValueOfObject(writeResult.success); return ret; } } /** * Checkpoints must be stackable over another. * This is required form some RxStorage implementations * like the sharding plugin, where a checkpoint only represents * the document state from some, but not all shards. */ export function stackCheckpoints( checkpoints: CheckpointType[] ): CheckpointType { return Object.assign( {}, ...checkpoints ); } export function storageChangeEventToRxChangeEvent( isLocal: boolean, rxStorageChangeEvent: RxStorageChangeEvent, rxCollection?: RxCollection, ): RxChangeEvent { const documentData = rxStorageChangeEvent.documentData; const previousDocumentData = rxStorageChangeEvent.previousDocumentData; const ret: RxChangeEvent = { eventId: rxStorageChangeEvent.eventId, documentId: rxStorageChangeEvent.documentId, collectionName: rxCollection ? rxCollection.name : undefined, startTime: rxStorageChangeEvent.startTime, endTime: rxStorageChangeEvent.endTime, isLocal, operation: rxStorageChangeEvent.operation, documentData: overwritable.deepFreezeWhenDevMode(documentData as any), previousDocumentData: overwritable.deepFreezeWhenDevMode(previousDocumentData as any) }; return ret; } export function throwIfIsStorageWriteError( collection: RxCollection, documentId: string, writeData: RxDocumentWriteData | RxDocType, error: RxStorageWriteError | undefined ) { if (error) { if (error.status === 409) { throw newRxError('CONFLICT', { collection: collection.name, id: documentId, writeError: error, data: writeData }); } else if (error.status === 422) { throw newRxError('VD2', { collection: collection.name, id: documentId, writeError: error, data: writeData }); } else { throw error; } } } /** * Analyzes a list of BulkWriteRows and determines * which documents must be inserted, updated or deleted * and which events must be emitted and which documents cause a conflict * and must not be written. * Used as helper inside of some RxStorage implementations. * @hotPath The performance of this function is critical */ export function categorizeBulkWriteRows( storageInstance: RxStorageInstance, primaryPath: StringKeys, /** * Current state of the documents * inside of the storage. Used to determine * which writes cause conflicts. * This can be a Map for better performance * but it can also be an object because some storages * need to work with something that is JSON-stringify-able * and we do not want to transform a big object into a Map * each time we use it. */ docsInDb: Map[StringKeys] | string, RxDocumentData> | ById>, /** * The write rows that are passed to * RxStorageInstance().bulkWrite(). */ bulkWriteRows: BulkWriteRow[], context: string ): CategorizeBulkWriteRowsOutput { const hasAttachments = !!storageInstance.schema.attachments; const bulkInsertDocs: BulkWriteRowProcessed[] = []; const bulkUpdateDocs: BulkWriteRowProcessed[] = []; const errors: ById> = {}; const changeByDocId = new Map>>(); const eventBulkId = randomCouchString(10); const eventBulk: EventBulk>, any> = { id: eventBulkId, events: [], checkpoint: null, context }; const eventBulkEvents = eventBulk.events; const attachmentsAdd: { documentId: string; attachmentId: string; attachmentData: RxAttachmentWriteData; digest: string; }[] = []; const attachmentsRemove: { documentId: string; attachmentId: string; digest: string; }[] = []; const attachmentsUpdate: { documentId: string; attachmentId: string; attachmentData: RxAttachmentWriteData; digest: string; }[] = []; const startTime = now(); const docsByIdIsMap = typeof docsInDb.get === 'function'; const hasDocsInDb = docsByIdIsMap ? (docsInDb as Map).size > 0 : Object.keys(docsInDb).length > 0; let newestRow: BulkWriteRowProcessed | undefined; const rowAmount = bulkWriteRows.length; for (let rowId = 0; rowId < rowAmount; rowId++) { const writeRow = bulkWriteRows[rowId]; const docId = writeRow.document[primaryPath] as string; let documentInDb: RxDocumentData | false = false; if (hasDocsInDb) { documentInDb = docsByIdIsMap ? (docsInDb as any).get(docId) : (docsInDb as any)[docId]; } let attachmentError: RxStorageWriteErrorAttachment | undefined; if (!documentInDb) { /** * It is possible to insert already deleted documents, * this can happen on replication. */ const insertedIsDeleted = writeRow.document._deleted ? true : false; if (hasAttachments) { Object.entries(writeRow.document._attachments).forEach(([attachmentId, attachmentData]) => { if ( !(attachmentData as RxAttachmentWriteData).data ) { attachmentError = { documentId: docId, isError: true, status: 510, writeRow, attachmentId }; errors[docId] = attachmentError; } else { attachmentsAdd.push({ documentId: docId, attachmentId, attachmentData: attachmentData as any, digest: defaultHashSha256((attachmentData as RxAttachmentWriteData).data) }); } }); } if (!attachmentError) { if (hasAttachments) { bulkInsertDocs.push(stripAttachmentsDataFromRow(writeRow)); } else { bulkInsertDocs.push(writeRow as any); } if ( !newestRow || newestRow.document._meta.lwt < writeRow.document._meta.lwt ) { newestRow = writeRow as any; } } if (!insertedIsDeleted) { const event = { eventId: getUniqueDeterministicEventKey( eventBulkId, rowId, docId, writeRow ), documentId: docId, operation: 'INSERT' as const, documentData: hasAttachments ? stripAttachmentsDataFromDocument(writeRow.document) : writeRow.document as any, previousDocumentData: hasAttachments && writeRow.previous ? stripAttachmentsDataFromDocument(writeRow.previous) : writeRow.previous as any, startTime, endTime: now() }; changeByDocId.set(docId, event); eventBulkEvents.push(event); } } else { // update existing document const revInDb: string = documentInDb._rev; /** * Check for conflict */ if ( ( !writeRow.previous ) || ( !!writeRow.previous && revInDb !== writeRow.previous._rev ) ) { // is conflict error const err: RxStorageWriteError = { isError: true, status: 409, documentId: docId, writeRow: writeRow, documentInDb }; errors[docId] = err; continue; } // handle attachments data const updatedRow: BulkWriteRowProcessed = hasAttachments ? stripAttachmentsDataFromRow(writeRow) : writeRow as any; if (hasAttachments) { if (writeRow.document._deleted) { /** * Deleted documents must have cleared all their attachments. */ if (writeRow.previous) { Object .keys(writeRow.previous._attachments) .forEach(attachmentId => { attachmentsRemove.push({ documentId: docId, attachmentId, digest: ensureNotFalsy(writeRow.previous)._attachments[attachmentId].digest }); }); } } else { // first check for errors Object .entries(writeRow.document._attachments) .find(([attachmentId, attachmentData]) => { const previousAttachmentData = writeRow.previous ? writeRow.previous._attachments[attachmentId] : undefined; if ( !previousAttachmentData && !(attachmentData as RxAttachmentWriteData).data ) { attachmentError = { documentId: docId, documentInDb: documentInDb as any, isError: true, status: 510, writeRow, attachmentId }; } return true; }); if (!attachmentError) { Object .entries(writeRow.document._attachments) .forEach(([attachmentId, attachmentData]) => { const previousAttachmentData = writeRow.previous ? writeRow.previous._attachments[attachmentId] : undefined; if (!previousAttachmentData) { attachmentsAdd.push({ documentId: docId, attachmentId, attachmentData: attachmentData as any, digest: defaultHashSha256((attachmentData as RxAttachmentWriteData).data) }); } else { const newDigest = updatedRow.document._attachments[attachmentId].digest; if ( (attachmentData as RxAttachmentWriteData).data && /** * Performance shortcut, * do not update the attachment data if it did not change. */ previousAttachmentData.digest !== newDigest ) { attachmentsUpdate.push({ documentId: docId, attachmentId, attachmentData: attachmentData as RxAttachmentWriteData, digest: defaultHashSha256((attachmentData as RxAttachmentWriteData).data) }); } } }); } } } if (attachmentError) { errors[docId] = attachmentError; } else { bulkUpdateDocs.push(updatedRow); if ( !newestRow || newestRow.document._meta.lwt < updatedRow.document._meta.lwt ) { newestRow = updatedRow as any; } } const writeDoc = writeRow.document; let eventDocumentData: RxDocumentData | undefined = null as any; let previousEventDocumentData: RxDocumentData | undefined = null as any; let operation: 'INSERT' | 'UPDATE' | 'DELETE' = null as any; if (writeRow.previous && writeRow.previous._deleted && !writeDoc._deleted) { operation = 'INSERT'; eventDocumentData = hasAttachments ? stripAttachmentsDataFromDocument(writeDoc) : writeDoc as any; } else if (writeRow.previous && !writeRow.previous._deleted && !writeDoc._deleted) { operation = 'UPDATE'; eventDocumentData = hasAttachments ? stripAttachmentsDataFromDocument(writeDoc) : writeDoc as any; previousEventDocumentData = writeRow.previous; } else if (writeDoc._deleted) { operation = 'DELETE'; eventDocumentData = ensureNotFalsy(writeRow.document) as any; previousEventDocumentData = writeRow.previous; } else { throw newRxError('SNH', { args: { writeRow } }); } const event = { eventId: getUniqueDeterministicEventKey( eventBulkId, rowId, docId, writeRow ), documentId: docId, documentData: eventDocumentData as RxDocumentData, previousDocumentData: previousEventDocumentData, operation: operation, startTime, endTime: now() }; changeByDocId.set(docId, event); eventBulkEvents.push(event); } } return { bulkInsertDocs, bulkUpdateDocs, newestRow, errors, changeByDocId, eventBulk, attachmentsAdd, attachmentsRemove, attachmentsUpdate }; } export function stripAttachmentsDataFromRow(writeRow: BulkWriteRow): BulkWriteRowProcessed { return { previous: writeRow.previous, document: stripAttachmentsDataFromDocument(writeRow.document) }; } export function getAttachmentSize( attachmentBase64String: string ): number { return atob(attachmentBase64String).length; } /** * Used in custom RxStorage implementations. */ export function attachmentWriteDataToNormalData(writeData: RxAttachmentData | RxAttachmentWriteData): RxAttachmentData { const data = (writeData as RxAttachmentWriteData).data; if (!data) { return writeData as any; } const ret: RxAttachmentData = { digest: defaultHashSha256(data), length: getAttachmentSize(data), type: writeData.type }; return ret; } export function stripAttachmentsDataFromDocument(doc: RxDocumentWriteData): RxDocumentData { const useDoc: RxDocumentData = flatClone(doc) as any; useDoc._attachments = {}; Object .entries(doc._attachments) .forEach(([attachmentId, attachmentData]) => { useDoc._attachments[attachmentId] = attachmentWriteDataToNormalData(attachmentData); }); return useDoc; } /** * Flat clone the document data * and also the _meta field. * Used many times when we want to change the meta * during replication etc. */ export function flatCloneDocWithMeta( doc: RxDocumentData ): RxDocumentData { const ret = flatClone(doc); ret._meta = flatClone(doc._meta); return ret; } /** * Each event is labeled with the id * to make it easy to filter out duplicates * even on flattened eventBulks */ export function getUniqueDeterministicEventKey( eventBulkId: string, rowId: number, docId: string, writeRow: BulkWriteRow ): string { return eventBulkId + '|' + rowId + '|' + docId + '|' + writeRow.document._rev; } export type WrappedRxStorageInstance = RxStorageInstance & { originalStorageInstance: RxStorageInstance; }; /** * Wraps the normal storageInstance of a RxCollection * to ensure that all access is properly using the hooks * and other data transformations and also ensure that database.lockedRun() * is used properly. */ export function getWrappedStorageInstance< RxDocType, Internals, InstanceCreationOptions, CheckpointType >( database: RxDatabase<{}, Internals, InstanceCreationOptions>, storageInstance: RxStorageInstance, /** * The original RxJsonSchema * before it was mutated by hooks. */ rxJsonSchema: RxJsonSchema> ): WrappedRxStorageInstance { overwritable.deepFreezeWhenDevMode(rxJsonSchema); const primaryPath = getPrimaryFieldOfPrimaryKey(rxJsonSchema.primaryKey); function transformDocumentDataFromNxDBToRxStorage( writeRow: BulkWriteRow ) { let data = flatClone(writeRow.document); data._meta = flatClone(data._meta); /** * Do some checks in dev-mode * that would be too performance expensive * in production. */ if (overwritable.isDevMode()) { // ensure that the primary key has not been changed data = fillPrimaryKey( primaryPath, rxJsonSchema, data as any ); /** * Ensure that the new revision is higher * then the previous one */ if (writeRow.previous) { // TODO run this in the dev-mode plugin // const prev = parseRevision(writeRow.previous._rev); // const current = parseRevision(writeRow.document._rev); // if (current.height <= prev.height) { // throw newRxError('SNH', { // dataBefore: writeRow.previous, // dataAfter: writeRow.document, // args: { // prev, // current // } // }); // } } /** * Ensure that _meta fields have been merged * and not replaced. * This is important so that when one plugin A * sets a _meta field and another plugin B does a write * to the document, it must be ensured that the * field of plugin A was not removed. */ if (writeRow.previous) { Object.keys(writeRow.previous._meta) .forEach(metaFieldName => { if (!writeRow.document._meta.hasOwnProperty(metaFieldName)) { throw newRxError('SNH', { dataBefore: writeRow.previous, dataAfter: writeRow.document }); } }); } } data._meta.lwt = now(); /** * Yes we really want to set the revision here. * If you make a plugin that relies on having its own revision * stored into the storage, use this.originalStorageInstance.bulkWrite() instead. */ data._rev = createRevision( database.token, writeRow.previous ); return { document: data, previous: writeRow.previous }; } const ret: WrappedRxStorageInstance = { originalStorageInstance: storageInstance, schema: storageInstance.schema, internals: storageInstance.internals, collectionName: storageInstance.collectionName, databaseName: storageInstance.databaseName, options: storageInstance.options, bulkWrite( rows: BulkWriteRow[], context: string ) { const toStorageWriteRows: BulkWriteRow[] = rows .map(row => transformDocumentDataFromNxDBToRxStorage(row)); return database.lockedRun( () => storageInstance.bulkWrite( toStorageWriteRows, context ) ) /** * The RxStorageInstance MUST NOT allow to insert already _deleted documents, * without sending the previous document version. * But for better developer experience, NxDB does allow to re-insert deleted documents. * We do this by automatically fixing the conflict errors for that case * by running another bulkWrite() and merging the results. * @link https://github.com/nxpkg/nxdb/pull/3839 */ .then(writeResult => { const reInsertErrors: RxStorageWriteErrorConflict[] = Object .values(writeResult.error) .filter((error) => { if ( error.status === 409 && !error.writeRow.previous && !error.writeRow.document._deleted && ensureNotFalsy(error.documentInDb)._deleted ) { return true; } return false; }) as any; if (reInsertErrors.length > 0) { const useWriteResult: typeof writeResult = { error: flatClone(writeResult.error), success: flatClone(writeResult.success) }; const reInserts: BulkWriteRow[] = reInsertErrors .map((error) => { delete useWriteResult.error[error.documentId]; return { previous: error.documentInDb, document: Object.assign( {}, error.writeRow.document, { _rev: createRevision( database.token, error.documentInDb ) } ) }; }); return database.lockedRun( () => storageInstance.bulkWrite( reInserts, context ) ).then(subResult => { useWriteResult.error = Object.assign( useWriteResult.error, subResult.error ); useWriteResult.success = Object.assign( useWriteResult.success, subResult.success ); return useWriteResult; }); } return writeResult; }); }, query(preparedQuery) { return database.lockedRun( () => storageInstance.query(preparedQuery) ); }, count(preparedQuery) { return database.lockedRun( () => storageInstance.count(preparedQuery) ); }, findDocumentsById(ids, deleted) { return database.lockedRun( () => storageInstance.findDocumentsById(ids, deleted) ); }, getAttachmentData( documentId: string, attachmentId: string, digest: string ) { return database.lockedRun( () => storageInstance.getAttachmentData(documentId, attachmentId, digest) ); }, getChangedDocumentsSince(limit: number, checkpoint?: any) { return database.lockedRun( () => storageInstance.getChangedDocumentsSince(ensureNotFalsy(limit), checkpoint) ); }, cleanup(minDeletedTime: number) { return database.lockedRun( () => storageInstance.cleanup(minDeletedTime) ); }, remove() { database.storageInstances.delete(ret); return database.lockedRun( () => storageInstance.remove() ); }, close() { database.storageInstances.delete(ret); return database.lockedRun( () => storageInstance.close() ); }, changeStream() { return storageInstance.changeStream(); }, conflictResultionTasks() { return storageInstance.conflictResultionTasks(); }, resolveConflictResultionTask(taskSolution) { if (taskSolution.output.isEqual) { return storageInstance.resolveConflictResultionTask(taskSolution); } const doc = Object.assign( {}, taskSolution.output.documentData, { _meta: getDefaultRxDocumentMeta(), _rev: getDefaultRevision(), _attachments: {} } ); const documentData = flatClone(doc); delete (documentData as any)._meta; delete (documentData as any)._rev; delete (documentData as any)._attachments; return storageInstance.resolveConflictResultionTask({ id: taskSolution.id, output: { isEqual: false, documentData } }); } }; database.storageInstances.add(ret); return ret; } /** * Each RxStorage implementation should * run this method at the first step of createStorageInstance() * to ensure that the configuration is correct. */ export function ensureRxStorageInstanceParamsAreCorrect( params: RxStorageInstanceCreationParams ) { if (params.schema.keyCompression) { throw newRxError('UT5', { args: { params } }); } if (hasEncryption(params.schema)) { throw newRxError('UT6', { args: { params } }); } if ( params.schema.attachments && params.schema.attachments.compression ) { throw newRxError('UT7', { args: { params } }); } } export function hasEncryption(jsonSchema: RxJsonSchema): boolean { if ( (!!jsonSchema.encrypted && jsonSchema.encrypted.length > 0) || (jsonSchema.attachments && jsonSchema.attachments.encrypted) ) { return true; } else { return false; } }