import { Observable, Subject } from 'rxjs'; import { getStartIndexStringFromLowerBound, getStartIndexStringFromUpperBound } from 'nxdb-old/src/custom-index'; import { getPrimaryFieldOfPrimaryKey } from 'nxdb-old/src/rx-schema-helper'; import { categorizeBulkWriteRows } from 'nxdb-old/src/rx-storage-helper'; import type { BulkWriteRow, ById, EventBulk, QueryMatcher, RxConflictResultionTask, RxConflictResultionTaskSolution, RxDocumentData, RxDocumentDataById, RxJsonSchema, RxStorageBulkWriteResponse, RxStorageChangeEvent, RxStorageCountResult, RxStorageDefaultCheckpoint, RxStorageInstance, RxStorageInstanceCreationParams, RxStorageQueryResult, RxStorageWriteError, StringKeys } from 'nxdb-old/src/types'; import { ensureNotFalsy, getFromMapOrThrow, lastOfArray, now, PROMISE_RESOLVE_TRUE, PROMISE_RESOLVE_VOID, RX_META_LWT_MINIMUM } from 'nxdb-old/src/plugins/utils'; import { boundGE, boundGT, boundLE, boundLT } from 'nxdb-old/src/plugins/storage-memory/binary-search-bounds'; import { attachmentMapKey, compareDocsWithIndex, ensureNotRemoved, getMemoryCollectionKey, putWriteRowToState, removeDocFromState } from 'nxdb-old/src/plugins/storage-memory/memory-helper'; import { addIndexesToInternalsState, getMemoryIndexName } from 'nxdb-old/src/plugins/storage-memory/memory-indexes'; import type { MemoryPreparedQuery, MemoryStorageInternals, RxStorageMemory, RxStorageMemoryInstanceCreationOptions, RxStorageMemorySettings } from 'nxdb-old/src/plugins/storage-memory/memory-types'; import { getQueryMatcher, getSortComparator } from 'nxdb-old/src/rx-query-helper'; export class RxStorageInstanceMemory implements RxStorageInstance< RxDocType, MemoryStorageInternals, RxStorageMemoryInstanceCreationOptions, RxStorageDefaultCheckpoint > { public readonly primaryPath: StringKeys>; public closed = false; constructor( public readonly storage: RxStorageMemory, public readonly databaseName: string, public readonly collectionName: string, public readonly schema: Readonly>>, public readonly internals: MemoryStorageInternals, public readonly options: Readonly, public readonly settings: RxStorageMemorySettings ) { this.primaryPath = getPrimaryFieldOfPrimaryKey(this.schema.primaryKey); } bulkWrite( documentWrites: BulkWriteRow[], context: string ): Promise> { ensureNotRemoved(this); const internals = this.internals; const documentsById = this.internals.documents; const primaryPath = this.primaryPath; const success: RxDocumentDataById = {}; let error: ById> = {}; const categorized = categorizeBulkWriteRows( this, primaryPath as any, documentsById, documentWrites, context ); error = categorized.errors; /** * Do inserts/updates */ const stateByIndex = Object.values(this.internals.byIndex); const bulkInsertDocs = categorized.bulkInsertDocs; for (let i = 0; i < bulkInsertDocs.length; ++i) { const writeRow = bulkInsertDocs[i]; const docId = writeRow.document[primaryPath]; putWriteRowToState( docId as any, internals, stateByIndex, writeRow, undefined ); success[docId as any] = writeRow.document; } const bulkUpdateDocs = categorized.bulkUpdateDocs; for (let i = 0; i < bulkUpdateDocs.length; ++i) { const writeRow = bulkUpdateDocs[i]; const docId = writeRow.document[primaryPath]; putWriteRowToState( docId as any, internals, stateByIndex, writeRow, documentsById.get(docId as any) ); success[docId as any] = writeRow.document; } /** * Handle attachments */ if (this.schema.attachments) { const attachmentsMap = internals.attachments; categorized.attachmentsAdd.forEach(attachment => { attachmentsMap.set( attachmentMapKey(attachment.documentId, attachment.attachmentId), { writeData: attachment.attachmentData, digest: attachment.digest } ); }); if (this.schema.attachments) { categorized.attachmentsUpdate.forEach(attachment => { attachmentsMap.set( attachmentMapKey(attachment.documentId, attachment.attachmentId), { writeData: attachment.attachmentData, digest: attachment.digest } ); }); categorized.attachmentsRemove.forEach(attachment => { attachmentsMap.delete( attachmentMapKey(attachment.documentId, attachment.attachmentId) ); }); } } if (categorized.eventBulk.events.length > 0) { const lastState = ensureNotFalsy(categorized.newestRow).document; categorized.eventBulk.checkpoint = { id: lastState[primaryPath], lwt: lastState._meta.lwt }; internals.changes$.next(categorized.eventBulk); } return Promise.resolve({ success, error }); } findDocumentsById( docIds: string[], withDeleted: boolean ): Promise> { const documentsById = this.internals.documents; const ret: RxDocumentDataById = {}; for (let i = 0; i < docIds.length; ++i) { const docId = docIds[i]; const docInDb = documentsById.get(docId); if ( docInDb && ( !docInDb._deleted || withDeleted ) ) { ret[docId] = docInDb; } } return Promise.resolve(ret); } query( preparedQuery: MemoryPreparedQuery ): Promise> { const queryPlan = preparedQuery.queryPlan; const query = preparedQuery.query; const skip = query.skip ? query.skip : 0; const limit = query.limit ? query.limit : Infinity; const skipPlusLimit = skip + limit; let queryMatcher: QueryMatcher> | false = false; if (!queryPlan.selectorSatisfiedByIndex) { queryMatcher = getQueryMatcher( this.schema, preparedQuery.query ); } const queryPlanFields: string[] = queryPlan.index; const mustManuallyResort = !queryPlan.sortFieldsSameAsIndexFields; const index: string[] | undefined = ['_deleted'].concat(queryPlanFields); let lowerBound: any[] = queryPlan.startKeys; lowerBound = [false].concat(lowerBound); const lowerBoundString = getStartIndexStringFromLowerBound( this.schema, index, lowerBound, queryPlan.inclusiveStart ); let upperBound: any[] = queryPlan.endKeys; upperBound = [false].concat(upperBound); const upperBoundString = getStartIndexStringFromUpperBound( this.schema, index, upperBound, queryPlan.inclusiveEnd ); const indexName = getMemoryIndexName(index); const docsWithIndex = this.internals.byIndex[indexName].docsWithIndex; let indexOfLower = (queryPlan.inclusiveStart ? boundGE : boundGT)( docsWithIndex, { indexString: lowerBoundString } as any, compareDocsWithIndex ); const indexOfUpper = (queryPlan.inclusiveEnd ? boundLE : boundLT)( docsWithIndex, { indexString: upperBoundString } as any, compareDocsWithIndex ); let rows: RxDocumentData[] = []; let done = false; while (!done) { const currentRow = docsWithIndex[indexOfLower]; if ( !currentRow || indexOfLower > indexOfUpper ) { break; } const currentDoc = currentRow.doc; if (!queryMatcher || queryMatcher(currentDoc)) { rows.push(currentDoc); } if ( (rows.length >= skipPlusLimit && !mustManuallyResort) || indexOfLower >= docsWithIndex.length ) { done = true; } indexOfLower++; } if (mustManuallyResort) { const sortComparator = getSortComparator(this.schema, preparedQuery.query); rows = rows.sort(sortComparator); } // apply skip and limit boundaries. rows = rows.slice(skip, skipPlusLimit); return Promise.resolve({ documents: rows }); } async count( preparedQuery: MemoryPreparedQuery ): Promise { const result = await this.query(preparedQuery); return { count: result.documents.length, mode: 'fast' }; } getChangedDocumentsSince( limit: number, checkpoint?: RxStorageDefaultCheckpoint ): Promise<{ documents: RxDocumentData[]; checkpoint: RxStorageDefaultCheckpoint; }> { const sinceLwt = checkpoint ? checkpoint.lwt : RX_META_LWT_MINIMUM; const sinceId = checkpoint ? checkpoint.id : ''; const index = ['_meta.lwt', this.primaryPath as any]; const indexName = getMemoryIndexName(index); const lowerBoundString = getStartIndexStringFromLowerBound( this.schema, ['_meta.lwt', this.primaryPath as any], [ sinceLwt, sinceId ], false ); const docsWithIndex = this.internals.byIndex[indexName].docsWithIndex; let indexOfLower = boundGT( docsWithIndex, { indexString: lowerBoundString } as any, compareDocsWithIndex ); // TODO use array.slice() so we do not have to iterate here const rows: RxDocumentData[] = []; while (rows.length < limit && indexOfLower < docsWithIndex.length) { const currentDoc = docsWithIndex[indexOfLower]; rows.push(currentDoc.doc); indexOfLower++; } const lastDoc = lastOfArray(rows); return Promise.resolve({ documents: rows, checkpoint: lastDoc ? { id: lastDoc[this.primaryPath] as any, lwt: lastDoc._meta.lwt } : checkpoint ? checkpoint : { id: '', lwt: 0 } }); } cleanup(minimumDeletedTime: number): Promise { const maxDeletionTime = now() - minimumDeletedTime; const index = ['_deleted', '_meta.lwt', this.primaryPath as any]; const indexName = getMemoryIndexName(index); const docsWithIndex = this.internals.byIndex[indexName].docsWithIndex; const lowerBoundString = getStartIndexStringFromLowerBound( this.schema, index, [ true, 0, '' ], false ); let indexOfLower = boundGT( docsWithIndex, { indexString: lowerBoundString } as any, compareDocsWithIndex ); let done = false; while (!done) { const currentDoc = docsWithIndex[indexOfLower]; if (!currentDoc || currentDoc.doc._meta.lwt > maxDeletionTime) { done = true; } else { removeDocFromState( this.primaryPath as any, this.schema, this.internals, currentDoc.doc ); indexOfLower++; } } return PROMISE_RESOLVE_TRUE; } getAttachmentData( documentId: string, attachmentId: string, digest: string ): Promise { ensureNotRemoved(this); const data = getFromMapOrThrow( this.internals.attachments, attachmentMapKey(documentId, attachmentId) ); if ( !digest || data.digest !== digest ) { throw new Error('attachment does not exist'); } return Promise.resolve(data.writeData.data); } changeStream(): Observable>, RxStorageDefaultCheckpoint>> { ensureNotRemoved(this); return this.internals.changes$.asObservable(); } async remove(): Promise { ensureNotRemoved(this); this.internals.removed = true; this.storage.collectionStates.delete( getMemoryCollectionKey( this.databaseName, this.collectionName, this.schema.version ) ); await this.close(); } close(): Promise { if (this.closed) { return Promise.reject(new Error('already closed')); } this.closed = true; this.internals.refCount = this.internals.refCount - 1; return PROMISE_RESOLVE_VOID; } conflictResultionTasks(): Observable> { return this.internals.conflictResultionTasks$.asObservable(); } resolveConflictResultionTask(_taskSolution: RxConflictResultionTaskSolution): Promise { return PROMISE_RESOLVE_VOID; } } export function createMemoryStorageInstance( storage: RxStorageMemory, params: RxStorageInstanceCreationParams, settings: RxStorageMemorySettings ): Promise> { const collectionKey = getMemoryCollectionKey( params.databaseName, params.collectionName, params.schema.version ); let internals = storage.collectionStates.get(collectionKey); if (!internals) { internals = { removed: false, refCount: 1, documents: new Map(), attachments: params.schema.attachments ? new Map() : undefined as any, byIndex: {}, conflictResultionTasks$: new Subject(), changes$: new Subject() }; addIndexesToInternalsState(internals, params.schema); storage.collectionStates.set(collectionKey, internals); } else { internals.refCount = internals.refCount + 1; } const instance = new RxStorageInstanceMemory( storage, params.databaseName, params.collectionName, params.schema, internals, params.options, settings ); return Promise.resolve(instance); }