import { MetaverseExplorer } from './explorer' import { take, filter, switchMap } from 'rxjs/operators' import { BehaviorSubject, interval, combineLatest, of } from 'rxjs' import { MetaverseLightwalletDatabase } from './database/database' import { RxDumpDatabaseAny, CollectionsOfDatabase } from 'rxdb' import { merge } from 'lodash' import { Balances } from './interfaces/balance.interface' import { defaultBalances } from './defaults' import { calculateBalancesFromUtxo, calculateAddressesBalancesFromUtxo } from './helpers/utxo.helper' export class MetaverseLightwalletCore { height$ = new BehaviorSubject(0) // if set to false it does not try to sync active$ = new BehaviorSubject(true) syncing$ = new BehaviorSubject(false) heartbeat$ = interval(5000) // first sync for the active account has been done initialized$ = new BehaviorSubject(false) constructor( public db: MetaverseLightwalletDatabase, private defaults: { balances?: Balances } = {}, private explorer = new MetaverseExplorer(), ) { this.defaults.balances = this.defaults.balances ?? defaultBalances this.init(db) } utxos$ = (debounce?: number) => { return this.db.outputs.utxos$(this.addresses$(), debounce) } balances$ = (debounce = 2000, min_confirnations = 3) => combineLatest([ this.utxos$(debounce), this.addresses$(), this.height$, ]) .pipe( // debounceTime(debounce), switchMap(([utxos, addresses, currentHeight]: [any[], string[], number]) => { const defaultBalances = JSON.parse(JSON.stringify(this.defaults.balances)) return of(merge(defaultBalances, calculateBalancesFromUtxo(utxos, addresses, currentHeight, defaultBalances, min_confirnations))) }), ) addressBalances$ = (debounce = 2000, min_confirnations = 3) => combineLatest([ this.utxos$(debounce), this.addresses$(), this.height$, ]) .pipe( // debounceTime(debounce), switchMap(([utxos, addresses, currentHeight]: [any[], string[], number]) => { const defaultBalances = JSON.parse(JSON.stringify(this.defaults.balances)) return of(merge(defaultBalances, calculateAddressesBalancesFromUtxo(utxos, addresses, currentHeight, defaultBalances, min_confirnations))) }), ) addresses$() { return this.db.addresses.addresses$() } isSyncMaster() { return this.db.isLeader() } getName() { return this.db.name } transactionCollection() { return this.db.transactions } import(data: RxDumpDatabaseAny) { this.db.importDump(data) } export(): Promise> { return this.db.dump() } destroy() { return this.db.destroy() } async resetTransactions() { if (this.syncing$.value) { await this.syncing$.pipe(filter(status => status === false)).toPromise() } const tmpActive = this.active$.value if (tmpActive) { this.active$.next(false) } await this.db.transactions.remove() if (tmpActive) { this.active$.next(true) } } private async init(database: MetaverseLightwalletDatabase) { await database.waitForLeadership() console.info('taking the lead') let initialized = false database.addresses.addresses$() .subscribe(async addresses=>{ if(addresses.length===0 && initialized){ await this.resetTransactions() } if (addresses.length) { initialized=true await this.sync() this.syncInterval() } }) } private syncInterval() { this.heartbeat$ .subscribe(() => { if (!this.active$.value) { return } this.sync() }) } private async sync() { return Promise.all([ this.syncHeight(), this.syncTransactions() .catch(error => { console.log(error) this.syncing$.next(false) }), ]) } private async syncTransactions() { if(!this.isSyncMaster()){ return } if (this.syncing$.value) { return } this.syncing$.next(true) let lastHeight = (await this.db.transactions.latest())?.height || 0 const addresses = await this.addresses$().pipe(take(1)).toPromise() let transactions = await this.explorer.listAddressTransactions({ addresses, min_height: lastHeight + 1, }) while (this.active$.value && transactions.length) { lastHeight = transactions[0].height transactions = transactions.sort((a,b)=>a.height - b.height) for(const transaction of transactions){ await this.db.transactions.upsert(transaction) } transactions = await this.explorer.listAddressTransactions({ addresses, min_height: lastHeight + 1, }) } this.syncing$.next(false) if (!this.initialized$.value) { this.initialized$.next(true) } } private async syncHeight() { const height = await this.explorer.getHeight() if (height && height > this.height$.value) { this.height$.next(height) } } }