import { Observer, Subscribable, Unsubscribable } from 'dexie'; import * as React from 'react'; import { usePromise } from './usePromise'; const observableCache = new Map>(); const promiseCache = new WeakMap, Promise>(); const valueCache = new WeakMap, any>(); const CLEANUP_DELAY = 3000; // Time to wait before cleaning up unused observables /** * Subscribes to an observable and returns the latest value. * Suspends until the first value is received. * * Calls with the same cache key will reuse the same observable. * Cache key must be globally unique. */ export function useSuspendingObservable( getObservable: (() => Subscribable) | Subscribable, cacheKey: React.DependencyList ): T { let observable: Subscribable | undefined; // Try to find an existing observable for this cache key for (const [key, value] of observableCache) { if ( key.length === cacheKey.length && key.every((k, i) => Object.is(k, cacheKey[i])) ) { observable = value; break; } } // If no observable was found, create a new one if (!observable) { // Create a multicast observable which subscribes to source at most once. const source = typeof getObservable === 'function' ? getObservable() : getObservable; let subscription: Unsubscribable | undefined; const observers = new Set>(); let timeout: ReturnType | undefined; const newObservable: Subscribable = { subscribe: (observer) => { observers.add(observer); // Cancel the cleanup timer if it's running if (timeout != null) { clearTimeout(timeout); timeout = undefined; } // If this is the first subscriber, subscribe to the source observable if (!subscription) { subscription = source.subscribe({ next: (val) => { valueCache.set(newObservable, val); // Clone observers in case the list changes during emission for (const obs of new Set(observers)) obs.next?.(val); }, error: (err) => { const lastObservers = new Set(observers); handleFinalize(); for (const obs of lastObservers) obs.error?.(err); }, complete: () => { const lastObservers = new Set(observers); handleFinalize(); for (const obs of lastObservers) obs.complete?.(); }, }); } // Otherwise, emit the current value to the new subscriber if any else if (valueCache.has(newObservable)) { observer.next?.(valueCache.get(newObservable)!); } // Return the unsubscriber return { unsubscribe: () => { if (!observers.has(observer)) return; observers.delete(observer); // If this was the last subscriber, schedule cleanup if (observers.size === 0) scheduleCleanup(); }, }; function handleFinalize() { // Reset this observable to the initial state subscription = undefined; observers.clear(); valueCache.delete(newObservable); promiseCache.delete(newObservable); // Schedule cleanup in case nobody subscribes again scheduleCleanup(); } function scheduleCleanup() { if (timeout != null) return; // Cleanup already scheduled timeout = setTimeout(() => { // Unsubscribe source if any subscription?.unsubscribe(); subscription = undefined; // Remove this observable from cache for (const [key, value] of observableCache) { if (value === newObservable) { observableCache.delete(key); break; } } }, CLEANUP_DELAY); } }, }; observable = newObservable; observableCache.set(cacheKey, newObservable); } // Get or initialize promise for first value let promise = promiseCache.get(observable); if (!promise) { promise = new Promise((resolve, reject) => { const subscription = observable.subscribe({ next: (val) => { resolve(val); // Unsubscribe in next tick because subscription might not be assigned yet queueMicrotask(() => subscription.unsubscribe()); }, error: (err) => reject(err), }); }); promiseCache.set(observable, promise); } const initialValue = usePromise(promise); const value = React.useRef(initialValue); const [error, setError] = React.useState(); const rerender = React.useReducer((x) => x + 1, 0)[1]; // Set the value immediately on every render. // This avoids waiting for effect to run. value.current = valueCache.has(observable) ? valueCache.get(observable)! : initialValue; // Subscribe to live updates until the source observable changes. React.useEffect(() => { const subscription = observable.subscribe({ next: (val) => { if (!Object.is(val, value.current)) { value.current = val; rerender(); } }, error: (err) => setError(err), }); return () => subscription.unsubscribe(); }, [observable]); if (error) throw error; return value.current; }