// Package: com.lightningkite.lightningdb.live // Generated by Khrysalis - this file will be overwritten. import { ObserveModelApi } from '../ObserveModelApi' import { HasId } from '../db/HasId' import { ListChange } from '../db/ListChange' import { Query } from '../db/Query' import { xListComparatorGet } from '../db/SortPart' import { WebSocketIsh, multiplexedSocketReified } from './sockets' import { Comparable, Comparator, EqualOverrideMap, compareBy, listRemoveAll, runOrNull, safeEq, xMutableMapGetOrPut } from '@lightningkite/khrysalis-runtime' import { Observable } from 'rxjs' import { delay, map, publishReplay, refCount, retryWhen, switchMap, tap } from 'rxjs/operators' //! Declares com.lightningkite.lightningdb.live.LiveObserveModelApi export class LiveObserveModelApi> extends ObserveModelApi { public constructor(public readonly openSocket: ((query: Query) => Observable>)) { super(); this.alreadyOpen = new EqualOverrideMap, Observable>>(); } public readonly alreadyOpen: Map, Observable>>; public observe(query: Query): Observable> { //multiplexedSocket, Query>("$multiplexUrl?jwt=$token", path) return xMutableMapGetOrPut, Observable>>(this.alreadyOpen, query, (): Observable> => (this.openSocket(query) .pipe(tap({ unsubscribe: (): void => { this.alreadyOpen.delete(query); } })) .pipe(publishReplay(1)) .pipe(refCount()))); } } export namespace LiveObserveModelApi { //! Declares com.lightningkite.lightningdb.live.LiveObserveModelApi.Companion export class Companion { private constructor() { } public static INSTANCE = new Companion(); public create>(Model: Array, multiplexUrl: string, token: string, headers: Map, path: string): LiveObserveModelApi { return new LiveObserveModelApi((query: Query): Observable> => (xObservableFilter(multiplexedSocketReified, Query>([ListChange, Model], [Query, Model], multiplexUrl, path, new Map([["jwt", [token]]])), query))); } } } //! Declares com.lightningkite.lightningdb.live.toListObservable>io.reactivex.rxjava3.core.Observablecom.lightningkite.lightningdb.ListChangecom.lightningkite.lightningdb.live.toListObservable.T export function xObservableToListObservable, ID extends Comparable>(this_: Observable>, ordering: Comparator): Observable> { const localList = ([] as Array); return this_.pipe(map((it: ListChange): Array => { const it_8 = it.wholeList; if (it_8 !== null) { localList.length = 0; localList.push(...it_8.slice().sort(ordering)); } const it_10 = it._new; if (it_10 !== null) { listRemoveAll(localList, (o: T): boolean => (safeEq(it_10._id, o._id))); let index = localList.findIndex((inList: T): boolean => (ordering(it_10, inList) < 0)); if (index === (-1)) { index = localList.length } localList.splice(index, 0, it_10); } else { const it_17 = it.old; if (it_17 !== null) { listRemoveAll(localList, (o: T): boolean => (safeEq(it_17._id, o._id))); } } return localList; })); } //! Declares com.lightningkite.lightningdb.live.filter>io.reactivex.rxjava3.core.Observablecom.lightningkite.lightningdb.live.WebSocketIshcom.lightningkite.lightningdb.ListChangecom.lightningkite.lightningdb.live.filter.T, com.lightningkite.lightningdb.Querycom.lightningkite.lightningdb.live.filter.T export function xObservableFilter, ID extends Comparable>(this_: Observable, Query>>, query: Query): Observable> { return xObservableToListObservable(this_ .pipe(tap((it: WebSocketIsh, Query>): void => { it.send(query); })) .pipe(switchMap((it: WebSocketIsh, Query>): Observable> => (it.messages))) .pipe(retryWhen( (it: Observable): Observable => (it.pipe(delay(5000))))), xListComparatorGet(query.orderBy) ?? compareBy((it: T): (Comparable<(any | null)> | null) => (it._id))); }