/** * These files contain the replication protocol. * It can be used to replicated RxStorageInstances or RxCollections * or even to do a client(s)-server replication. */ import { BehaviorSubject, combineLatest, filter, firstValueFrom, map, Subject } from 'rxjs'; import { getPrimaryFieldOfPrimaryKey } from 'nxdb-old/src/rx-schema-helper'; import type { BulkWriteRow, ById, DocumentsWithCheckpoint, RxConflictHandler, RxReplicationHandler, RxReplicationWriteToMasterRow, RxStorageInstance, RxStorageInstanceReplicationInput, RxStorageInstanceReplicationState, WithDeleted } from 'nxdb-old/src/types'; import { ensureNotFalsy, PROMISE_RESOLVE_VOID } from 'nxdb-old/src/plugins/utils'; import { getCheckpointKey } from 'nxdb-old/src/replication-protocol/checkpoint'; import { startReplicationDownstream } from 'nxdb-old/src/replication-protocol/downstream'; import { docStateToWriteDoc, writeDocToDocState } from 'nxdb-old/src/replication-protocol/helper'; import { startReplicationUpstream } from 'nxdb-old/src/replication-protocol/upstream'; export * from 'nxdb-old/src/replication-protocol/checkpoint'; export * from 'nxdb-old/src/replication-protocol/downstream'; export * from 'nxdb-old/src/replication-protocol/upstream'; export * from 'nxdb-old/src/replication-protocol/meta-instance'; export * from 'nxdb-old/src/replication-protocol/conflicts'; export * from 'nxdb-old/src/replication-protocol/helper'; export function replicateRxStorageInstance( input: RxStorageInstanceReplicationInput ): RxStorageInstanceReplicationState { const checkpointKey = getCheckpointKey(input); const state: RxStorageInstanceReplicationState = { primaryPath: getPrimaryFieldOfPrimaryKey(input.forkInstance.schema.primaryKey), input, checkpointKey, downstreamBulkWriteFlag: 'replication-downstream-' + checkpointKey, events: { canceled: new BehaviorSubject(false), active: { down: new BehaviorSubject(true), up: new BehaviorSubject(true) }, processed: { down: new Subject(), up: new Subject() }, resolvedConflicts: new Subject(), error: new Subject() }, stats: { down: { addNewTask: 0, downstreamProcessChanges: 0, downstreamResyncOnce: 0, masterChangeStreamEmit: 0, persistFromMaster: 0 }, up: { forkChangeStreamEmit: 0, persistToMaster: 0, persistToMasterConflictWrites: 0, persistToMasterHadConflicts: 0, processTasks: 0, upstreamInitialSync: 0 } }, firstSyncDone: { down: new BehaviorSubject(false), up: new BehaviorSubject(false) }, streamQueue: { down: PROMISE_RESOLVE_VOID, up: PROMISE_RESOLVE_VOID }, checkpointQueue: PROMISE_RESOLVE_VOID, lastCheckpointDoc: {} }; startReplicationDownstream(state); startReplicationUpstream(state); return state; } export function awaitRxStorageReplicationFirstInSync( state: RxStorageInstanceReplicationState ): Promise { return firstValueFrom( combineLatest([ state.firstSyncDone.down.pipe( filter(v => !!v) ), state.firstSyncDone.up.pipe( filter(v => !!v) ) ]) ).then(() => { }); } export function awaitRxStorageReplicationInSync( replicationState: RxStorageInstanceReplicationState ) { return Promise.all([ replicationState.streamQueue.up, replicationState.streamQueue.down, replicationState.checkpointQueue ]); } export async function awaitRxStorageReplicationIdle( state: RxStorageInstanceReplicationState ) { await awaitRxStorageReplicationFirstInSync(state); while (true) { const { down, up } = state.streamQueue; await Promise.all([ up, down ]); /** * If the Promises have not been reassigned * after awaiting them, we know that the replication * is in idle state at this point in time. */ if ( down === state.streamQueue.down && up === state.streamQueue.up ) { return; } } } export function rxStorageInstanceToReplicationHandler( instance: RxStorageInstance, conflictHandler: RxConflictHandler, databaseInstanceToken: string ): RxReplicationHandler { const primaryPath = getPrimaryFieldOfPrimaryKey(instance.schema.primaryKey); const replicationHandler: RxReplicationHandler = { masterChangeStream$: instance.changeStream().pipe( map(eventBulk => { const ret: DocumentsWithCheckpoint = { checkpoint: eventBulk.checkpoint, documents: eventBulk.events.map(event => { return writeDocToDocState(ensureNotFalsy(event.documentData) as any); }) }; return ret; }) ), masterChangesSince( checkpoint, batchSize ) { return instance.getChangedDocumentsSince( batchSize, checkpoint ).then(result => { return { checkpoint: result.documents.length > 0 ? result.checkpoint : checkpoint, documents: result.documents.map(d => writeDocToDocState(d)) }; }); }, async masterWrite( rows ) { const rowById: ById> = {}; rows.forEach(row => { const docId: string = (row.newDocumentState as any)[primaryPath]; rowById[docId] = row; }); const ids = Object.keys(rowById); const masterDocsState = await instance.findDocumentsById( ids, true ); const conflicts: WithDeleted[] = []; const writeRows: BulkWriteRow[] = []; await Promise.all( Object.entries(rowById) .map(async ([id, row]) => { const masterState = masterDocsState[id]; if (!masterState) { writeRows.push({ document: docStateToWriteDoc(databaseInstanceToken, row.newDocumentState) }); } else if ( masterState && !row.assumedMasterState ) { conflicts.push(writeDocToDocState(masterState)); } else if ( (await conflictHandler({ realMasterState: writeDocToDocState(masterState), newDocumentState: ensureNotFalsy(row.assumedMasterState) }, 'rxStorageInstanceToReplicationHandler-masterWrite')).isEqual === true ) { writeRows.push({ previous: masterState, document: docStateToWriteDoc(databaseInstanceToken, row.newDocumentState, masterState) }); } else { conflicts.push(writeDocToDocState(masterState)); } }) ); if (writeRows.length > 0) { const result = await instance.bulkWrite( writeRows, 'replication-master-write' ); Object .values(result.error) .forEach(err => { if (err.status !== 409) { throw new Error('non conflict error'); } else { conflicts.push( writeDocToDocState(ensureNotFalsy(err.documentInDb)) ); } }); } return conflicts; } }; return replicationHandler; } export function cancelRxStorageReplication( replicationState: RxStorageInstanceReplicationState ) { replicationState.events.canceled.next(true); replicationState.events.active.up.complete(); replicationState.events.active.down.complete(); replicationState.events.processed.up.complete(); replicationState.events.processed.down.complete(); replicationState.events.resolvedConflicts.complete(); replicationState.events.canceled.complete(); }