// Package: com.lightningkite.lightningdb.live // Generated by Khrysalis - this file will be overwritten. import { MultiplexMessage } from '../db/MultiplexMessage' import { ReifiedType, runOrNull, xCharSequenceIsBlank, xMutableMapGetOrPut, xStringSubstringBefore } from '@lightningkite/khrysalis-runtime' import { HttpClient, JSON2, WebSocketFrame, WebSocketInterface, doOnSubscribe, isNonNull } from '@lightningkite/rxjs-plus' import { NEVER, Observable, SubscriptionLike, filter, interval, merge, of, map as rMap } from 'rxjs' import { delay, distinctUntilChanged, map, filter as oFilter, publishReplay, refCount, retryWhen, switchMap, take, tap, timeout } from 'rxjs/operators' import { v4 as randomUuidV4 } from 'uuid' //! Declares com.lightningkite.lightningdb.live.sharedSocketShouldBeActive export let _sharedSocketShouldBeActive: Observable = of(true); export function getSharedSocketShouldBeActive(): Observable { return _sharedSocketShouldBeActive; } export function setSharedSocketShouldBeActive(value: Observable) { _sharedSocketShouldBeActive = value; } let retryTime = 1000; let lastRetry = 0; //! Declares com.lightningkite.lightningdb.live._overrideWebSocketProvider export let __overrideWebSocketProvider: (((url: string) => Observable) | null) = null; export function get_overrideWebSocketProvider(): (((url: string) => Observable) | null) { return __overrideWebSocketProvider; } export function set_overrideWebSocketProvider(value: (((url: string) => Observable) | null)) { __overrideWebSocketProvider = value; } const sharedSocketCache = new Map>(); //! Declares com.lightningkite.lightningdb.live.sharedSocket export function sharedSocket(url: string): Observable { return xMutableMapGetOrPut>(sharedSocketCache, url, (): Observable => (getSharedSocketShouldBeActive() .pipe(distinctUntilChanged()) .pipe(switchMap((it: boolean): Observable => { const shortUrl = xStringSubstringBefore(url, '?', undefined); return ((): Observable => { if ((!it)) { return NEVER } else { console.log(`Creating socket to ${url}`); return (runOrNull(get_overrideWebSocketProvider(), _ => _(url)) ?? HttpClient.INSTANCE.webSocket(url)) .pipe(switchMap((it: WebSocketInterface): Observable => { lastRetry = Date.now(); // println("Connection to $shortUrl established, starting pings") // Only have this observable until it fails const pingMessages: Observable = interval(30000) .pipe(map((_0: number): void => { // println("Sending ping to $url") return it.write.next({ text: " ", binary: null }); })).pipe(switchMap((it: void): Observable => (NEVER))); const timeoutAfterSeconds: Observable = it.read .pipe(tap((it: WebSocketFrame): void => { // println("Got message from $shortUrl: ${it}") if (Date.now() > lastRetry + 60000) { retryTime = 1000; } })) .pipe(timeout(40000)) .pipe(switchMap((it: WebSocketFrame): Observable => (NEVER))); return merge(of(it), pingMessages, timeoutAfterSeconds); })) .pipe(tap(undefined, (it: any): void => { console.log(`Socket to ${shortUrl} FAILED with ${it}`); })) .pipe(retryWhen( (it: Observable): Observable => { const temp = retryTime; retryTime = temp * 2; return it.pipe(delay(temp)); })) .pipe(tap({ unsubscribe: (): void => { console.log(`Disconnecting socket to ${shortUrl}`); } })); } })() })) .pipe(publishReplay(1)) .pipe(refCount()))); } //! Declares com.lightningkite.lightningdb.live.WebSocketIsh export class WebSocketIsh { public constructor(public readonly messages: Observable, public readonly send: ((a: OUT) => void)) { } } //! Declares com.lightningkite.lightningdb.live.multiplexedSocket export function multiplexedSocketReified(IN: Array, OUT: Array, url: string, path: string, queryParams: Map> = new Map([])): Observable> { return multiplexedSocket(url, path, queryParams, IN, OUT); } //! Declares com.lightningkite.lightningdb.live.multiplexedSocket export function multiplexedSocket(url: string, path: string, queryParams: Map> = new Map([]), inType: ReifiedType, outType: ReifiedType): Observable> { return multiplexedSocketRaw(url, path, queryParams) .pipe(map((it: WebSocketIsh): WebSocketIsh => (new WebSocketIsh(it.messages.pipe(rMap((it: string): (IN | null) => (JSON2.parse(it, inType))), filter(isNonNull)), (m: OUT): void => { it.send(JSON.stringify(m)); })))); } //! Declares com.lightningkite.lightningdb.live.multiplexedSocketRaw export function multiplexedSocketRaw(url: string, path: string, queryParams: Map> = new Map([])): Observable> { const shortUrl = xStringSubstringBefore(url, '?', undefined); const channel = randomUuidV4(); let lastSocket: (WebSocketInterface | null) = null; return sharedSocket(url) .pipe(switchMap((it: WebSocketInterface): Observable> => { console.log(`Setting up socket to ${shortUrl} with ${path}`); lastSocket = it; const multiplexedIn = it.read.pipe(rMap((it: WebSocketFrame): (MultiplexMessage | null) => { const text = it.text if(text === null) { return null } if (xCharSequenceIsBlank(text)) { return null } return JSON2.parse(text, [MultiplexMessage]); }), filter(isNonNull)); return multiplexedIn .pipe(oFilter((it: MultiplexMessage): boolean => (it.channel === channel && it.start))) .pipe(take(1)) .pipe(map((_0: MultiplexMessage): WebSocketIsh => { console.log(`Connected to channel ${channel}`); return new WebSocketIsh(multiplexedIn.pipe(rMap((it: MultiplexMessage): (string | null) => (it.channel === channel ? it.data : null)), filter(isNonNull)), (message: string): void => { console.log(`Sending ${message} to ${it}`); it.write.next({ text: JSON.stringify(new MultiplexMessage(channel, undefined, undefined, undefined, undefined, message, undefined)), binary: null }); }); })) .pipe(doOnSubscribe((_0: SubscriptionLike): void => { it.write.next({ text: JSON.stringify(new MultiplexMessage(channel, path, queryParams, true, undefined, undefined, undefined)), binary: null }); })); })) .pipe(tap({ unsubscribe: (): void => { console.log(`Disconnecting channel on socket to ${shortUrl} with ${path}`); const temp64 = (lastSocket?.write ?? null); if (temp64 !== null && temp64 !== undefined) { temp64.next({ text: JSON.stringify(new MultiplexMessage(channel, path, undefined, undefined, true, undefined, undefined)), binary: null }) }; } })); }