import {Subject, Subscription, TeardownLogic} from 'rxjs'; import {Id, newId} from '../shared'; import {DataValue, JSONDataValue, JSONValue, Mapping} from './types'; import {ResourceField, ResourceInstance, ResourceSpecials} from './resource'; import {ResourceValue} from './client'; import {LayerProfile, LayerToken} from './access'; import {BooleanFormatType, FieldType, FormatType, NumberFormatType, StringFormatType} from '../modeling'; export interface NameEvent { id: Id; old?: string; new?: string; } export interface DataEvent { id: Id; old?: T; new?: T; } export interface SpecialEvent { id: Id; added?: Id[]; removed?: Id[]; } export interface ValueEvent { id: Id; old?: T; new?: T; } export interface ResourceEvent { id: Id; type: 'update' | 'add' | 'remove'; removed?: ResourceValue; added?: ResourceValue; } export interface StreamOptions { payload?: boolean; instant?: boolean; specials?: boolean; interval?: number; } export class SystemSubject extends Subject { protected _subscriptionId?: Id; constructor(subscribe: (subscriber: Subject) => TeardownLogic, subscriptionId: Id) { super(); this._subscriptionId = subscriptionId; new Promise(() => { subscribe(this); }).then(() => { // Done. }); } public get subscriptionId(): Id | undefined { return this._subscriptionId; } } export class SystemValueSubject extends SystemSubject { protected _value: U | undefined; constructor(subscribe: (subscriber: Subject) => TeardownLogic, subscriptionId: Id) { super(subscribe, subscriptionId); } public get value(): U | undefined { return this._value; } /** @internal */ public setValue(val: U | undefined) { this._value = val; } } export class ResourceStreamInstance extends SystemSubject { protected _value: T | undefined; private readonly _resource: ResourceInstance; constructor(resource: ResourceInstance, subscribe: (subscriber: Subject) => TeardownLogic, subscriptionId: Id) { super(subscribe, subscriptionId); this._resource = resource; } public get resource(): ResourceInstance { return this._resource; } public get value(): T | undefined { return this._value; } /** @internal */ public setValue(val: T | undefined) { this._value = val; } } export class ResourceStreamSpecials extends SystemSubject { private _values: T[] = []; private _valueIds = new WeakMap(); private _valueIndexes = new Map(); private readonly _resource: ResourceSpecials; constructor(resource: ResourceSpecials, subscribe: (subscriber: Subject) => TeardownLogic, subscriptionId: Id) { super(subscribe, subscriptionId); this._resource = resource; } public get resource(): ResourceSpecials { return this._resource; } public reorder() { const order = this.resource.order; if (!order || this.values.length === 0) { return; } // Evaluate entries in reverse order (use stable sorting algorithm!) for (let i = order.entries.length - 1; i >= 0; i--) { const entry = order.entries[i]; this._values.sort((a: T, b: T): number => { const va = a[entry.name]; const vb = b[entry.name]; if (!va || !vb) { return 0; } const cmp = va < vb ? -1 : 1; return entry.ascending ? cmp : -cmp; }); } for (let i = 0; i < this._values.length; i++) { const inst = this._values[i]; const id = this._valueIds.get(inst); if (!id) { continue; } this._valueIndexes.set(id, i); } } public get values(): T[] { return this._values; } public getIndex(id: Id): number { return this._valueIndexes.get(id) ?? -1; } public getValue(id: Id): T | undefined { const idx = this.getIndex(id); if (idx === -1) { return undefined; } return this._values[idx]; } /** @internal */ public putValue(id: Id): T { let inst = this.getValue(id); if (!inst) { inst = {} as T; this._valueIndexes.set(id, this._values.length); this._valueIds.set(inst, id); this._values.push(inst); } return inst; } /** @internal */ public removeValue(id: Id) { const idx = this.getIndex(id); if (idx === -1) { return; } this._valueIds.delete(this._values[idx]); for (let i = idx + 1; i < this._values.length; i++) { const postId = this._valueIds.get(this._values[i]); if (!postId) { continue; } this._valueIndexes.set(postId, i - 1); } this._values.splice(idx, 1); this._valueIndexes.delete(id); } } function defaultOptions(options?: StreamOptions): { payload: boolean; instant: boolean; specials: boolean; interval: number; } { return { payload: options?.payload ?? true, instant: options?.instant ?? false, specials: options?.specials ?? false, interval: options?.interval ?? 10, }; } function updateNewValue(fields: ResourceField[], value: ResourceValue, newValue: ResourceValue | undefined) { if (!newValue) { return; } for (const field of fields) { if (!Object.hasOwnProperty.call(newValue, field.name)) { continue; } const newFieldValue = newValue[field.name]; if (!field.relation || field.mapping === 'primitive') { // Instance object field if (typeof newFieldValue !== 'undefined') { value[field.name] = newFieldValue as JSONDataValue; } else { delete value[field.name]; } } else { // Instance relation field (list) if (!Array.isArray(newFieldValue)) { continue; } let values = value[field.name] as JSONDataValue[] | undefined; if (!values) { values = Array.from(newFieldValue); } else { values.push(...newFieldValue); } value[field.name] = values; } } } function updateOldValue(fields: ResourceField[], value: ResourceValue, oldValue: ResourceValue | undefined) { if (!oldValue) { return; } for (const field of fields) { if (!Object.hasOwnProperty.call(oldValue, field.name)) { continue; } if (!field.relation || field.mapping === 'primitive') { continue; } // Instance relation field (list) const oldFieldValue = oldValue[field.name]; if (!Array.isArray(oldFieldValue)) { continue; } const values = value[field.name] as JSONDataValue[] | undefined; if (!values) { continue } for (const remVal of oldFieldValue) { const idx = values.indexOf(remVal); if (idx !== -1) { values.splice(idx, 1); } } value[field.name] = values; } } /** * The stream version of `SystemClient`. * * Differences include that this client maintains an active session with the Vyze System and stores its state at the * server side. * * This includes space tokens and the currently used space. * A new concept this client introduces are subscriptions: * Most methods provided by `SystemClient` have an observable version of it, allowing for reacting upon changes in the * Vyze System. * * When a method from this client is used and its `instant` flag is `true`, an instant update should be given whose * `new` value is identical with the current response of the analogous method in `SystemClient`. * * ``` * const name1 = await systemClient.getName('id...'); * const name2 = firstValueFrom(systemStreamClient.getName('id...')); * // name1 and name2.new should have the same value. * ``` */ export class SystemStreamClient { private ws?: WebSocket; private subscriptions = new Map>(); constructor(private url = 'wss://api.vyze.io/api') { } public async connect(): Promise { return new Promise(resolve => { const url = `${this.url}/v1/stream`; this.ws = new WebSocket(url); this.ws.onopen = (evt: any) => { resolve(); }; this.ws.onmessage = (evt: any) => { if (typeof evt.data != 'string') { return; } const msg = JSON.parse(evt.data); const refId = msg['referenceId']; const subs = this.subscriptions.get(refId); if (subs) { subs.next(msg.payload); } }; }) } public async disconnect(): Promise { return new Promise(resolve => { if (!this.ws) { return; } this.ws.onclose = (evt) => { resolve(); }; this.ws.close(); }) } // Promises public getInfo(): Promise { return new Promise(resolve => { if (!this.ws) { throw new Error('not connected'); } const messageId = newId(); const msgSub = this.subscribeMessage(messageId); const sub = msgSub.subscribe((info: any) => { this.unsubscribeMessage(messageId, sub); resolve(info); }); this.ws.send(JSON.stringify({ command: 'info', messageId, })); }); } public async registerLayerToken(token: LayerToken): Promise { return new Promise(resolve => { if (!this.ws) { throw new Error('not connected'); } const messageId = newId(); this.ws.send(JSON.stringify({ command: 'registerLayerTokens', messageId, payload: { tokens: [token.token], }, })); resolve(); }); } public async registerLayerProfile(profile: LayerProfile): Promise { return new Promise(resolve => { if (!this.ws) { throw new Error('not connected'); } const messageId = newId(); const tokens: string[] = []; for (const ag of profile.accessGroups) { for (const tk of ag.tokens) { tokens.push(tk.token); } } this.ws.send(JSON.stringify({ command: 'registerLayerTokens', messageId, payload: { tokens: tokens, }, })); resolve(); }); } public getLayerTokens(): Promise { return new Promise(resolve => { if (!this.ws) { throw new Error('not connected'); } const messageId = newId(); this.ws.send(JSON.stringify({ command: 'getLayerTokens', messageId, })); const msgSub = this.subscribeMessage(messageId); const sub = msgSub.subscribe((msg: any) => { this.unsubscribeMessage(messageId, sub); resolve(msg['tokens']); }); }); } // Streams public getName(objectId: Id, options?: StreamOptions): SystemValueSubject { const opts = defaultOptions(options); const messageId = newId(); const obs = new SystemValueSubject(subscriber => { if (!this.ws) { throw new Error('not connected'); } const msgSub = this.subscribeMessage(messageId); msgSub.subscribe((msg) => { subscriber.next(msg); }); this.ws.send(JSON.stringify({ command: 'subscribe', messageId, payload: { event: 'name', params: { object: objectId, ...opts, params: {}, } } })); }, messageId); obs.subscribe((update) => { obs.setValue(update.new); }); return obs; } getData(objectId: Id, formatType: StringFormatType, options?: StreamOptions): SystemValueSubject, string> getData(objectId: Id, formatType: NumberFormatType, options?: StreamOptions): SystemValueSubject, number> getData(objectId: Id, formatType: BooleanFormatType, options?: StreamOptions): SystemValueSubject, boolean> getData(objectId: Id, formatType: 'raw', options?: StreamOptions): SystemValueSubject, ArrayBuffer> public getData(objectId: Id, formatType: FormatType, options?: StreamOptions): SystemValueSubject, T> { const opts = defaultOptions(options); const messageId = newId(); const obs = new SystemValueSubject, T>(subscriber => { if (!this.ws) { throw new Error('not connected'); } this.ws.send(JSON.stringify({ command: 'subscribe', messageId, payload: { event: 'data', params: { object: objectId, ...opts, params: { 'format': formatType, }, }, }, })); const msgSub = this.subscribeMessage(messageId); msgSub.subscribe((msg) => { subscriber.next(msg); }); }, messageId); obs.subscribe((update) => { obs.setValue(update.new); }); return obs; } public getSpecials(objectId: Id, includeSelf = false, includeDirect = true, includeIndirect = true, options?: StreamOptions): SystemValueSubject { const opts = defaultOptions(options); if (opts.specials && includeSelf) { throw new Error('specials and self cannot both be true'); } const messageId = newId(); const obs = new SystemValueSubject(subject => { if (!this.ws) { throw new Error('not connected'); } this.ws.send(JSON.stringify({ command: 'subscribe', messageId, payload: { event: 'specials', params: { object: objectId, ...opts, params: { 'self': includeSelf, 'direct': includeDirect, 'transitive': includeIndirect, }, }, }, })); const msgSub = this.subscribeMessage(messageId); msgSub.subscribe((msg) => { subject.next(msg); }); }, messageId); obs.subscribe((update) => { let values = obs.value; if (values) { for (const ov of update.removed ?? []) { const idx = values.indexOf(ov); if (idx !== -1) { values.splice(idx, 1); } } } for (const nv of update.added ?? []) { if (!values) { values = [nv]; } else { values.push(nv); } } obs.setValue(values); }); return obs; } public getValue(originId: Id, relationId: Id, fieldType: FieldType, formatType: FormatType, mapping: Mapping, options?: StreamOptions): SystemValueSubject, T> { const opts = defaultOptions(options); const messageId = newId(); const obs = new SystemValueSubject, T>(subscriber => { if (!this.ws) { throw new Error('not connected'); } const params: { [_: string]: JSONValue } = { 'relation': relationId, 'field': fieldType, 'format': formatType, 'mapping': mapping.valueType, 'mappingParams': mapping.parameters, }; this.ws.send(JSON.stringify({ command: 'subscribe', messageId, payload: { event: 'value', params: { object: originId, ...opts, params, } } })); const msgSub = this.subscribeMessage(messageId); msgSub.subscribe((msg) => { subscriber.next(msg); }); }, messageId); obs.subscribe((update) => { if (mapping.valueType === 'primitive') { obs.setValue(update.new); } else if (mapping.valueType === 'list') { if (update.old && Array.isArray(update.old) && obs.value) { const values = obs.value as JSONDataValue[]; for (const remVal of update.old) { const idx = values.indexOf(remVal); if (idx !== -1) { values.splice(idx, 1); } } obs.setValue(values as T); } if (update.new && Array.isArray(update.new)) { let values = obs.value as JSONDataValue[] | undefined; if (!values) { values = Array.from(update.new); } else { values.push(...update.new); } obs.setValue(values as T); } } }); return obs; } public getResourceInstance(res: ResourceInstance, options?: StreamOptions): ResourceStreamInstance { const opts = defaultOptions(options); const messageId = newId(); const obs = new ResourceStreamInstance(res, subscriber => { if (!this.ws) { throw new Error('not connected'); } this.ws.send(JSON.stringify({ command: 'subscribe', messageId, payload: { event: 'resource', params: { object: res.objectId, params: res.toGetRequest(), ...opts }, }, })); const msgSub = this.subscribeMessage(messageId); msgSub.subscribe((msg) => { subscriber.next(msg); }); }, messageId); obs.subscribe((update) => { if (update.type === 'remove') { obs.setValue(undefined); } else { const value = (obs.value ?? {}) as T; updateOldValue(res.schema.fields, value, update.removed); updateNewValue(res.schema.fields, value, update.added); obs.setValue(value); } }); return obs; } public getResourceSpecials(res: ResourceSpecials, options?: StreamOptions): ResourceStreamSpecials { const opts = defaultOptions(options); const messageId = newId(); const obs = new ResourceStreamSpecials(res, subscriber => { if (!this.ws) { throw new Error('not connected'); } this.ws.send(JSON.stringify({ command: 'subscribe', messageId, payload: { event: 'resource', params: { object: res.objectId, params: res.toGetRequest(), ...opts }, }, })); const msgSub = this.subscribeMessage(messageId); msgSub.subscribe((msg) => { subscriber.next(msg); }); }, messageId); if (res.order) { obs.reorder(); } obs.subscribe((update) => { if (update.type === 'remove') { obs.removeValue(update.id); return; } const value = obs.putValue(update.id); updateOldValue(res.schema.fields, value, update.removed); updateNewValue(res.schema.fields, value, update.added); obs.reorder(); }); return obs; } public async unsubscribe(observable: SystemValueSubject): Promise { if (!observable.subscriptionId) { return; } return new Promise(resolve => { if (!this.ws) { throw new Error('not connected'); } const messageId = newId(); this.ws.send(JSON.stringify({ command: 'unsubscribe', messageId, referenceId: observable.subscriptionId, })); resolve(); }); } // Private private subscribeMessage(refId: Id): Subject { let sub = this.subscriptions.get(refId); if (!sub) { sub = new Subject(); this.subscriptions.set(refId, sub); } return sub; } private unsubscribeMessage(refId: Id, subs: Subscription) { const subj = this.subscriptions.get(refId); if (subj) { subs.unsubscribe(); if (!subj.observed) { this.subscriptions.delete(refId); } } } } export function newSystemStreamClient(url = 'https://api.vyze.io/system'): SystemStreamClient { return new SystemStreamClient(url); }