import { ensureNotFalsy, errorToPlainJson } from 'nxdb-old/src/plugins/utils'; import { NxDBLeaderElectionPlugin } from 'nxdb-old/src/plugins/leader-election'; import type { RxCollection, ReplicationPullOptions, ReplicationPushOptions, RxReplicationWriteToMasterRow, RxReplicationPullStreamItem } from 'nxdb-old/src/types'; import { RxReplicationState, startReplicationOnLeaderShip } from 'nxdb-old/src/plugins/replication'; import { addRxPlugin, newRxError, WithDeleted } from 'nxdb-old/src'; import { Subject } from 'rxjs'; import type { NatsCheckpointType, NatsSyncOptions } from 'nxdb-old/src/plugins/replication-nats/nats-types'; import { connect, DeliverPolicy, JSONCodec, ReplayPolicy } from 'nats'; import { NATS_REPLICATION_PLUGIN_IDENTITY_PREFIX, getNatsServerDocumentState } from 'nxdb-old/src/plugins/replication-nats/nats-helper'; import { awaitRetry } from 'nxdb-old/src/plugins/replication/replication-helper'; export * from 'nxdb-old/src/plugins/replication-nats/nats-types'; export * from 'nxdb-old/src/plugins/replication-nats/nats-helper'; export class RxNatsReplicationState extends RxReplicationState { constructor( public readonly replicationIdentifierHash: string, public readonly collection: RxCollection, public readonly pull?: ReplicationPullOptions, public readonly push?: ReplicationPushOptions, public readonly live: boolean = true, public retryTime: number = 1000 * 5, public autoStart: boolean = true ) { super( replicationIdentifierHash, collection, '_deleted', pull, push, live, retryTime, autoStart ); } } export function replicateNats( options: NatsSyncOptions ): RxNatsReplicationState { options.live = typeof options.live === 'undefined' ? true : options.live; options.waitForLeadership = typeof options.waitForLeadership === 'undefined' ? true : options.waitForLeadership; const collection = options.collection; const primaryPath = collection.schema.primaryPath; addRxPlugin(NxDBLeaderElectionPlugin); const jc = JSONCodec(); const connectionStatePromise = (async () => { const nc = await connect(options.connection); const jetstreamClient = nc.jetstream(); const jsm = await nc.jetstreamManager(); await jsm.streams.add({ name: options.streamName, subjects: [ options.subjectPrefix + '.*' ] }); const natsStream = await jetstreamClient.streams.get(options.streamName); return { nc, jetstreamClient, jsm, natsStream }; })(); const pullStream$: Subject> = new Subject(); let replicationPrimitivesPull: ReplicationPullOptions | undefined; if (options.pull) { replicationPrimitivesPull = { async handler( lastPulledCheckpoint: NatsCheckpointType, batchSize: number ) { const cn = await connectionStatePromise; const newCheckpoint: NatsCheckpointType = { sequence: lastPulledCheckpoint ? lastPulledCheckpoint.sequence : 0 }; const consumer = await cn.natsStream.getConsumer({ opt_start_seq: lastPulledCheckpoint ? lastPulledCheckpoint.sequence : 0, deliver_policy: DeliverPolicy.LastPerSubject, replay_policy: ReplayPolicy.Instant }); const fetchedMessages = await consumer.fetch({ max_messages: batchSize }); await (fetchedMessages as any).signal; await fetchedMessages.close(); const useMessages: WithDeleted[] = []; for await (const m of fetchedMessages) { useMessages.push(m.json()); newCheckpoint.sequence = m.seq; m.ack(); } return { documents: useMessages, checkpoint: newCheckpoint }; }, batchSize: ensureNotFalsy(options.pull).batchSize, modifier: ensureNotFalsy(options.pull).modifier, stream$: pullStream$.asObservable() }; } let replicationPrimitivesPush: ReplicationPushOptions | undefined; if (options.push) { replicationPrimitivesPush = { async handler( rows: RxReplicationWriteToMasterRow[] ) { const cn = await connectionStatePromise; const conflicts: WithDeleted[] = []; await Promise.all( rows.map(async (writeRow) => { const docId = (writeRow.newDocumentState as any)[primaryPath]; /** * first get the current state of the documents from the server * so that we have the sequence number for conflict detection. */ let remoteDocState; try { remoteDocState = await getNatsServerDocumentState( cn.natsStream, options.subjectPrefix, docId ); } catch (err: Error | any) { if (!err.message.includes('no message found')) { throw err; } } if ( remoteDocState && ( !writeRow.assumedMasterState || (await collection.conflictHandler({ newDocumentState: remoteDocState.json(), realMasterState: writeRow.assumedMasterState }, 'replication-firestore-push')).isEqual === false ) ) { // conflict conflicts.push(remoteDocState.json()); } else { // no conflict (yet) let pushDone = false; while (!pushDone) { try { await cn.jetstreamClient.publish( options.subjectPrefix + '.' + docId, jc.encode(writeRow.newDocumentState), { expect: remoteDocState ? { streamName: options.streamName, lastSubjectSequence: remoteDocState.seq } : undefined } ); pushDone = true; } catch (err: Error | any) { if (err.message.includes('wrong last sequence')) { // A write happened while we are doing our write -> handle conflict const newServerState = await getNatsServerDocumentState( cn.natsStream, options.subjectPrefix, docId ); conflicts.push(ensureNotFalsy(newServerState).json()); pushDone = true; } else { replicationState.subjects.error.next( newRxError('RC_STREAM', { document: writeRow.newDocumentState, error: errorToPlainJson(err) }) ); // -> retry after wait await awaitRetry( collection, replicationState.retryTime ); } } } } }) ); return conflicts; }, batchSize: options.push.batchSize, modifier: options.push.modifier }; } const replicationState = new RxNatsReplicationState( NATS_REPLICATION_PLUGIN_IDENTITY_PREFIX + options.collection.database.hashFunction(options.replicationIdentifier), collection, replicationPrimitivesPull, replicationPrimitivesPush, options.live, options.retryTime, options.autoStart ); /** * Use long polling to get live changes for the pull.stream$ */ if (options.live && options.pull) { const startBefore = replicationState.start.bind(replicationState); const cancelBefore = replicationState.cancel.bind(replicationState); replicationState.start = async () => { const cn = await connectionStatePromise; /** * First get the last sequence so that we can * laster only fetch 'newer' messages. */ let lastSeq = 0; try { const lastDocState = await cn.natsStream.getMessage({ last_by_subj: options.subjectPrefix + '.*' }); lastSeq = lastDocState.seq; } catch (err: any | Error) { if (!err.message.includes('no message found')) { throw err; } } const consumer = await cn.natsStream.getConsumer({ opt_start_seq: lastSeq }); const newMessages = await consumer.consume(); (async () => { for await (const m of newMessages) { const docData: WithDeleted = m.json(); pullStream$.next({ documents: [docData], checkpoint: { sequence: m.seq } }); m.ack(); } })(); replicationState.cancel = () => { newMessages.close(); return cancelBefore(); }; return startBefore(); }; } startReplicationOnLeaderShip(options.waitForLeadership, replicationState); return replicationState; }