import {EventEmitter} from 'events'; import { ChangePacket, ChangePacketId, ChangePlant, ChangePlantBlueprint, ChangePlantProcessingResultWithClock, GeneralChange, IChangePlantBlueprintGenericParams, IContext, IRPCDefinition, ISyncable, ISyncableAdapter, ISyncableObject, IViewQuery, NumericTimestamp, RPCError, RefDictToSyncableObjectDict, ResolvedViewQuery, SyncableContainer, SyncableRef, ViewQueryFilter, ViewQueryUpdateObject, generateUniqueId, getNonCreationRefsFromRefDict, getRefsFromRefDict, getSyncableKey, } from '@syncable/core'; import _ from 'lodash'; import {Dict, OmitValueOfKey} from 'tslang'; import {filterReadableSyncables} from '../@utils'; import {Connection} from '../connection'; import {BroadcastChangeResult, IServerAdapter} from './server-adapter'; export interface LoadOptions { context?: IContext; loadRequisiteDependencyOnly?: boolean; } export interface LoadSyncablesByRefsOptions { loadedKeySet?: Set; changeType?: string; loadRequisiteDependencyOnly?: boolean; skipReadableFilter?: boolean; } export interface LoadDependentSyncablesOptions { loadedKeySet?: Set; changeType?: string; requisiteOnly?: boolean; skipReadableFilter?: boolean; } export interface IServerGenericParams extends IChangePlantBlueprintGenericParams { syncableObject: ISyncableObject; viewQueryDict: object; customClientRPCDefinition: IRPCDefinition; } export type SyncableTypeToSyncableObjectsDict< TSyncableObject extends ISyncableObject > = { [TKey in TSyncableObject['ref']['type']]?: Extract< TSyncableObject, {ref: {type: TKey}} >[]; }; export interface ViewQueryInfo { filter: ViewQueryFilter; query: IViewQuery; } export interface ServerApplyChangeResult extends OmitValueOfKey { subsequent?: Promise[]; } export class Server< TGenericParams extends IServerGenericParams > extends EventEmitter { private groupToConnectionSetMap = new Map>(); private changePlant: ChangePlant; constructor( /** * Non-user context for server-side initiated changes. */ context: TGenericParams['context'], serverAdapter: IServerAdapter, syncableAdapter: ISyncableAdapter, blueprint: ChangePlantBlueprint, ); constructor( private context: IContext, private serverAdapter: IServerAdapter, private syncableAdapter: ISyncableAdapter, blueprint: ChangePlantBlueprint, ) { super(); if (context.type !== 'server' || context.environment !== 'server') { throw new Error('Invalid context'); } serverAdapter.connection$.subscribe(this.onConnection); serverAdapter.broadcast$.subscribe(this.onBroadcast); this.changePlant = new ChangePlant(blueprint); } /** @internal */ async resolveQueryToContextDependencyRefsDict( context: IContext, ): Promise> { return this.serverAdapter.resolveQueryToContextDependencyRefsDict(context); } /** @internal */ async preloadQueryMetadata( group: string, context: IContext, viewQueryName: string, ): Promise { let queryMetadata = await this.serverAdapter.preloadQueryMetadata( group, context, viewQueryName, ); this.log('preloaded-query-metadata', { group, context, viewQueryName, queryMetadata, }); context.setQueryMetadata(viewQueryName, queryMetadata); } /** @internal */ async loadSyncablesByQuery( group: string, context: IContext, resolvedViewQueryDict: object, loadedKeySet: Set, ): Promise { this.log('load-syncables-by-query', { group, context, resolvedViewQueryDict, }); let serverAdapter = this.serverAdapter; let syncableAdapter = this.syncableAdapter; loadedKeySet = new Set(loadedKeySet); let directSyncables = await serverAdapter.loadSyncablesByQuery( group, context, resolvedViewQueryDict, loadedKeySet, ); directSyncables = filterReadableSyncables( context, syncableAdapter, directSyncables, ); for (let syncable of directSyncables) { loadedKeySet.add(getSyncableKey(syncable)); } let dependentSyncables = await this.loadDependentSyncables( group, context, directSyncables, { loadedKeySet, requisiteOnly: false, }, ); this.log('loaded-syncables-by-query', { group, context, resolvedViewQueryDict, directSyncablesCount: directSyncables.length, dependentSyncablesCount: dependentSyncables.length, }); return [...directSyncables, ...dependentSyncables]; } /** @internal */ async loadSyncablesByRefs( group: string, context: IContext, refs: SyncableRef[], { loadedKeySet, changeType, loadRequisiteDependencyOnly = false, skipReadableFilter = false, }: LoadSyncablesByRefsOptions = {}, ): Promise { this.log('load-syncables-by-refs', { group, context, refs, changeType, }); let serverAdapter = this.serverAdapter; let syncableAdapter = this.syncableAdapter; loadedKeySet = new Set(loadedKeySet || []); if (loadedKeySet) { refs = refs.filter(ref => !loadedKeySet!.has(getSyncableKey(ref))); } let directSyncables = await serverAdapter.loadSyncablesByRefs(group, refs); if (!skipReadableFilter) { directSyncables = filterReadableSyncables( context, syncableAdapter, directSyncables, ); } let dependentSyncables = await this.loadDependentSyncables( group, context, directSyncables, { loadedKeySet, changeType, skipReadableFilter, requisiteOnly: loadRequisiteDependencyOnly, }, ); this.log('loaded-syncables-by-refs', { group, context, refs, changeType, directSyncablesCount: directSyncables.length, dependentSyncablesCount: dependentSyncables.length, }); return [...directSyncables, ...dependentSyncables]; } /** @internal */ async loadDependentSyncables( group: string, context: IContext, syncables: ISyncable[], { loadedKeySet, changeType, requisiteOnly, skipReadableFilter, }: LoadDependentSyncablesOptions, ): Promise { let serverAdapter = this.serverAdapter; let syncableAdapter = this.syncableAdapter; loadedKeySet = new Set(loadedKeySet || []); for (let syncable of syncables) { loadedKeySet.add(getSyncableKey(syncable)); } let loadedSyncables: ISyncable[] = []; let pendingResolvingSyncables = syncables; while (true) { let refs = _.uniqBy( _.flatMap(pendingResolvingSyncables, syncable => { let object = syncableAdapter.instantiateBySyncable(syncable); return [ ...object.resolveRequisiteDependencyRefs(changeType), ...(requisiteOnly ? [] : object.resolveDependencyRefs()), ]; }), ref => getSyncableKey(ref), ).filter(ref => !loadedKeySet!.has(getSyncableKey(ref))); if (!refs.length) { break; } let dependentSyncables = await serverAdapter.loadSyncablesByRefs( group, refs, ); if (!skipReadableFilter) { dependentSyncables = filterReadableSyncables( context, syncableAdapter, dependentSyncables, ); } for (let syncable of dependentSyncables) { loadedKeySet.add(getSyncableKey(syncable)); } loadedSyncables.push(...dependentSyncables); pendingResolvingSyncables = dependentSyncables; } return loadedSyncables; } async load( group: string, refDict: TRefDict, { context = this.context, loadRequisiteDependencyOnly = true, }: LoadOptions = {}, ): Promise> { this.log('load', { group, refDict, }); let container = new SyncableContainer(this.syncableAdapter); let refs = getRefsFromRefDict(refDict as Dict); let syncables = await this.loadSyncablesByRefs(group, context, refs, { loadRequisiteDependencyOnly, }); for (let syncable of syncables) { container.addSyncable(syncable); } return container.buildSyncableObjectDict(refDict); } async query( group: string, update: ViewQueryUpdateObject, ): Promise< SyncableTypeToSyncableObjectsDict >; async query( group: string, update: ViewQueryUpdateObject, ): Promise< SyncableTypeToSyncableObjectsDict > { let container = new SyncableContainer(this.syncableAdapter); let {syncables} = await this._query( group, update, new Set(), container, this.context, ); for (let syncable of syncables) { container.addSyncable(syncable); } return _.groupBy( container.getSyncableObjects(), syncableObject => syncableObject.ref.type, ) as object; } async applyChange( group: string, change: TGenericParams['change'], context = this.context, ): Promise { let packet: ChangePacket = { id: generateUniqueId(), createdAt: Date.now() as NumericTimestamp, ...(change as GeneralChange), }; return this.applyChangePacket(group, packet, context); } /** @internal */ async applyChangePacket( group: string, packet: ChangePacket, context: TGenericParams['context'], ): Promise { this.log('apply-change-packet', { group, packet, context, }); let serverAdapter = this.serverAdapter; let syncableAdapter = this.syncableAdapter; let changePlant = this.changePlant; let result: ChangePlantProcessingResultWithClock | undefined; try { await serverAdapter.queueChange(group, packet.id, async clock => { let refs = getNonCreationRefsFromRefDict(packet.refs); let syncables = await this.loadSyncablesByRefs(group, context, refs, { changeType: packet.type, loadRequisiteDependencyOnly: true, skipReadableFilter: true, }); let relatedRefs = changePlant.resolve(packet, syncables); let relatedSyncables = relatedRefs.length ? await this.loadSyncablesByRefs(group, context, relatedRefs, { changeType: packet.type, loadRequisiteDependencyOnly: true, skipReadableFilter: true, }) : []; let container = new SyncableContainer(syncableAdapter); for (let syncable of [...syncables, ...relatedSyncables]) { container.addSyncable(syncable); } result = changePlant.process(packet, context, container, clock); let { id, updates: updateItems, creations: createdSyncables, removals: removedSyncableRefs, notifications, } = result; let updatedSyncables = updateItems.map(item => item.snapshot); await serverAdapter.saveSyncables( group, createdSyncables, updatedSyncables, removedSyncableRefs, ); let broadcastResult: BroadcastChangeResult = { group, id, clock, creations: createdSyncables, updates: updateItems, removals: removedSyncableRefs, }; await serverAdapter.broadcast(broadcastResult); await serverAdapter.handleNotifications(group, notifications, id); }); } catch (error) { this.log('apply-change-packet-failed', { group, packet, context, error, }); throw error; } if (!result) { throw new RPCError('CHANGE_NOT_APPLIED'); } this.log('applied-change', { group, packet, context, result, }); let {changes: subsequentChanges, ...rest} = result; return { subsequent: subsequentChanges.length ? subsequentChanges.map(change => this.applyChange(group, change, context), ) : undefined, ...rest, }; } /** @internal */ async _query( group: string, update: ViewQueryUpdateObject, loadedKeySet: Set, container: SyncableContainer, context: TGenericParams['context'], ): Promise<{ syncables: ISyncable[]; nameToViewQueryMapToAdd: Map; viewQueryNamesToRemove: string[]; }> { let syncableAdapter = this.syncableAdapter; let queryEntries = Object.entries(update); let refs = _.uniqBy( _.flatMapDeep(queryEntries, ([, query]) => query ? Object.values(query.refs) : [], ), ref => getSyncableKey(ref), ).filter(ref => !container.existsSyncable(ref)); if (refs.length) { let syncables = await this.loadSyncablesByRefs(group, context, refs, { loadRequisiteDependencyOnly: true, }); for (let syncable of syncables) { container.addSyncable(syncable); } } let nameToViewQueryMapToAdd = new Map(); let viewQueryNamesToRemove = []; let resolvedViewQueryDict: Dict = {}; for (let [name, query] of queryEntries) { if (query) { await this.preloadQueryMetadata(group, context, name); let {refs: refDict, options} = query; let syncableDict = container.buildSyncableDict(refDict); let resolvedViewQuery = { syncables: syncableDict, options, }; let filter = syncableAdapter.getViewQueryFilter( context, name, resolvedViewQuery, ); nameToViewQueryMapToAdd.set(name, { filter, query, }); resolvedViewQueryDict[name] = resolvedViewQuery; } else { viewQueryNamesToRemove.push(name); } } let syncables = await this.loadSyncablesByQuery( group, context, resolvedViewQueryDict, loadedKeySet, ); return { syncables, nameToViewQueryMapToAdd, viewQueryNamesToRemove, }; } protected log(event: string, data: object): void { this.emit('log', { event, ...data, }); } private onConnection = (connection: Connection): void => { this.addConnection(connection).catch(console.error); }; private onBroadcast = (result: BroadcastChangeResult): void => { this.broadcastChangeResult(result); }; private async addConnection(connection: Connection): Promise { let group = connection.group; let groupToConnectionSetMap = this.groupToConnectionSetMap; let connectionSet = groupToConnectionSetMap.get(group); connection.close$.subscribe({ error: error => { console.error(error); this.removeConnection(connection).catch(console.error); }, complete: () => { this.removeConnection(connection).catch(console.error); }, }); if (connectionSet) { connectionSet.add(connection); } else { connectionSet = new Set([connection]); groupToConnectionSetMap.set(group, connectionSet); await this.serverAdapter.subscribe(group); } } private async removeConnection(connection: Connection): Promise { let group = connection.group; let groupToConnectionSetMap = this.groupToConnectionSetMap; let connectionSet = groupToConnectionSetMap.get(group); if (!connectionSet) { return; } connectionSet.delete(connection); if (!connectionSet.size) { groupToConnectionSetMap.delete(group); await this.serverAdapter.unsubscribe(group); } connection.dispose(); } private broadcastChangeResult(result: BroadcastChangeResult): void { let {group} = result; let connectionSet = this.groupToConnectionSetMap.get(group); if (!connectionSet) { return; } for (let connection of connectionSet) { connection.handleBroadcastChangeResult(result); } } }