import { BehaviorSubject, filter, firstValueFrom, map, Subject, Subscription } from 'rxjs'; import { addRxPlugin } from 'nxdb-old/src/plugin'; import { rxStorageInstanceToReplicationHandler } from 'nxdb-old/src/replication-protocol'; import type { RxCollection, RxError, RxReplicationHandler, RxReplicationWriteToMasterRow, RxTypeError } from 'nxdb-old/src/types'; import { ensureNotFalsy, getFromMapOrThrow, randomCouchString } from 'nxdb-old/src/plugins/utils'; import { NxDBLeaderElectionPlugin } from 'nxdb-old/src/plugins/leader-election'; import { replicateRxCollection } from 'nxdb-old/src/plugins/replication'; import { isMasterInP2PReplication, sendMessageAndAwaitAnswer } from 'nxdb-old/src/plugins/replication-p2p/p2p-helper'; import type { P2PConnectionHandler, P2PPeer, P2PPeerState, P2PReplicationCheckpoint, P2PResponse, RxP2PReplicationState, SyncOptionsP2P } from 'nxdb-old/src/plugins/replication-p2p/p2p-types'; export async function replicateP2P( options: SyncOptionsP2P ): Promise> { const collection = options.collection; addRxPlugin(NxDBLeaderElectionPlugin); // fill defaults if (options.pull) { if (!options.pull.batchSize) { options.pull.batchSize = 20; } } if (options.push) { if (!options.push.batchSize) { options.push.batchSize = 20; } } if (collection.database.multiInstance) { await collection.database.waitForLeadership(); } // used to easier debug stuff let requestCounter = 0; const requestFlag = randomCouchString(10); function getRequestId() { const count = requestCounter++; return collection.database.token + '|' + requestFlag + '|' + count; } const storageToken = await collection.database.storageToken; const pool = new RxP2PReplicationPool( collection, options, options.connectionHandlerCreator(options) ); pool.subs.push( pool.connectionHandler.error$.subscribe(err => pool.error$.next(err)), pool.connectionHandler.disconnect$.subscribe(peer => pool.removePeer(peer)) ); /** * Answer if someone requests our storage token */ pool.subs.push( pool.connectionHandler.message$.pipe( filter(data => data.message.method === 'token') ).subscribe(data => { pool.connectionHandler.send(data.peer, { id: data.message.id, result: storageToken }); }) ); const connectSub = pool.connectionHandler.connect$ .pipe( filter(() => !pool.canceled) ) .subscribe(async (peer) => { /** * TODO ensure both know the correct secret */ const tokenResponse = await sendMessageAndAwaitAnswer( pool.connectionHandler, peer, { id: getRequestId(), method: 'token', params: [] } ); const peerToken: string = tokenResponse.result; const isMaster = isMasterInP2PReplication(collection.database.hashFunction, storageToken, peerToken); let replicationState: RxP2PReplicationState | undefined; if (isMaster) { const masterHandler = pool.masterReplicationHandler; const masterChangeStreamSub = masterHandler.masterChangeStream$.subscribe(ev => { const streamResponse: P2PResponse = { id: 'masterChangeStream$', result: ev }; pool.connectionHandler.send(peer, streamResponse); }); // clean up the subscription pool.subs.push( masterChangeStreamSub, pool.connectionHandler.disconnect$.pipe( filter(p => p.id === peer.id) ).subscribe(() => masterChangeStreamSub.unsubscribe()) ); const messageSub = pool.connectionHandler.message$ .pipe( filter(data => data.peer.id === peer.id), filter(data => data.message.method !== 'token') ) .subscribe(async (data) => { const { peer: msgPeer, message } = data; /** * If it is not a function, * it means that the client requested the masterChangeStream$ */ const method = (masterHandler as any)[message.method].bind(masterHandler); const result = await (method as any)(...message.params); const response: P2PResponse = { id: message.id, result }; pool.connectionHandler.send(msgPeer, response); }); pool.subs.push(messageSub); } else { replicationState = replicateRxCollection({ replicationIdentifier: [collection.name, options.topic, peerToken].join('||'), collection: collection, autoStart: true, deletedField: '_deleted', live: true, retryTime: options.retryTime, waitForLeadership: false, pull: options.pull ? Object.assign({}, options.pull, { async handler(lastPulledCheckpoint: P2PReplicationCheckpoint) { const answer = await sendMessageAndAwaitAnswer( pool.connectionHandler, peer, { method: 'masterChangesSince', params: [ lastPulledCheckpoint, ensureNotFalsy(options.pull).batchSize ], id: getRequestId() } ); return answer.result; }, stream$: pool.connectionHandler.response$.pipe( filter(m => m.response.id === 'masterChangeStream$'), map(m => m.response.result) ) }) : undefined, push: options.push ? Object.assign({}, options.push, { async handler(docs: RxReplicationWriteToMasterRow[]) { const answer = await sendMessageAndAwaitAnswer( pool.connectionHandler, peer, { method: 'masterWrite', params: [docs], id: getRequestId() } ); return answer.result; } }) : undefined }); } pool.addPeer(peer, replicationState); }); pool.subs.push(connectSub); return pool; } /** * Because the P2P replication runs between many instances, * we use a Pool instead of returning a single replication state. */ export class RxP2PReplicationPool { peerStates$: BehaviorSubject>> = new BehaviorSubject(new Map()); canceled: boolean = false; masterReplicationHandler: RxReplicationHandler; subs: Subscription[] = []; public error$ = new Subject(); constructor( public readonly collection: RxCollection, public readonly options: SyncOptionsP2P, public readonly connectionHandler: P2PConnectionHandler ) { this.collection.onDestroy.push(() => this.cancel()); this.masterReplicationHandler = rxStorageInstanceToReplicationHandler( collection.storageInstance, collection.conflictHandler, collection.database.token, ); } addPeer( peer: P2PPeer, replicationState?: RxP2PReplicationState ) { const peerState: P2PPeerState = { peer, replicationState, subs: [] }; this.peerStates$.next(this.peerStates$.getValue().set(peer, peerState)); if (replicationState) { peerState.subs.push( replicationState.error$.subscribe(ev => this.error$.next(ev)) ); } } removePeer(peer: P2PPeer) { const peerState = getFromMapOrThrow(this.peerStates$.getValue(), peer); this.peerStates$.getValue().delete(peer); this.peerStates$.next(this.peerStates$.getValue()); peerState.subs.forEach(sub => sub.unsubscribe()); if (peerState.replicationState) { peerState.replicationState.cancel(); } } // often used in unit tests awaitFirstPeer() { return firstValueFrom( this.peerStates$.pipe( filter(peerStates => peerStates.size > 0) ) ); } public async cancel() { if (this.canceled) { return; } this.canceled = true; this.subs.forEach(sub => sub.unsubscribe()); Array.from(this.peerStates$.getValue().keys()).forEach(peer => { this.removePeer(peer); }); await this.connectionHandler.destroy(); } } export * from 'nxdb-old/src/plugins/replication-p2p/p2p-helper'; export * from 'nxdb-old/src/plugins/replication-p2p/p2p-types'; // export * from './connection-handler-webtorrent'; // export * from './connection-handler-p2pcf'; export * from 'nxdb-old/src/plugins/replication-p2p/connection-handler-simple-peer';