import { BehaviorSubject, empty, Observable, of, Subscription } from "rxjs"; import { finalize, mergeMap } from "rxjs/operators"; /** * Operator that caches a source observable, and refresh it when it expires. * Refreshing occurs on subscription (as opposed to on a timer). * * @param timeout Flush the cache after having no subscribers for this duration * (milliseconds). * * Example: * * ```ts * apiService.fetchExpiringUrl.pipe(refreshing(url => url.expires < new Date())) * ``` */ export const refreshing = (isExpired: (value: T) => boolean, timeout?: number) => (source: Observable): Observable => { // A value that indicates the subject is 'empty'. const sentinel = Symbol(); const subject = new BehaviorSubject(sentinel); const observable = subject.pipe( // Filter out the sentinel so that it's not exposed to the subscriber. mergeMap(x => (x !== sentinel ? of(x as T) : empty())) ); let fetching: Subscription | undefined; let expireTimer: NodeJS.Timer | undefined; let subscribers = 0; return new Observable(subscriber => { if (expireTimer !== undefined) { clearTimeout(expireTimer); expireTimer = undefined; } subscribers += 1; if ((subject.value === sentinel || isExpired(subject.value)) && fetching === undefined) { subject.next(sentinel); fetching = source .pipe( finalize(() => { fetching = undefined; }) ) .subscribe(value => { subject.next(value); }); } const subscription = observable.subscribe(subscriber); return () => { subscribers -= 1; if (timeout !== undefined && subscribers === 0) { expireTimer = setTimeout(() => { subject.next(sentinel); }, timeout); } subscription.unsubscribe(); }; }); }; /** * Operator that acts like `takeWhile`, except that **includes** the value * matching the predicate rather than rejecting it. * * Example: * * ```ts * from([1, 2, 3, 4, 5]).pipe(takeUntilFound(num => num === 3)) * // 1, 2, 3 * ``` */ export const takeUntilFound = (predicate: (value: T) => boolean) => (source: Observable): Observable => { return new Observable(subscriber => { const subscription = source.subscribe( value => { subscriber.next(value); if (predicate(value)) { subscriber.complete(); } }, error => { subscriber.error(error); }, () => subscriber.complete() ); return () => subscription.unsubscribe(); }); };