import { identity, map, merge, Observable, Subject, Subscription } from 'rxjs'; import { Controller } from './controller'; import { EffectError, EffectNotification, EffectResult, EffectState, } from './effectState'; import { createStore, InternalStoreOptions } from './store'; const GLOBAL_EFFECT_UNHANDLED_ERROR_SUBJECT = new Subject< EffectError >(); export const GLOBAL_EFFECT_UNHANDLED_ERROR$ = GLOBAL_EFFECT_UNHANDLED_ERROR_SUBJECT.asObservable(); function emitGlobalUnhandledError( effectError: EffectError, ): void { if (GLOBAL_EFFECT_UNHANDLED_ERROR_SUBJECT.observed) { GLOBAL_EFFECT_UNHANDLED_ERROR_SUBJECT.next(effectError); } else { console.error('Uncaught error in Effect', effectError); } } export type EffectController = Controller<{ state: EffectState; start: () => void; next: (result: EffectResult) => void; complete: () => void; error: (error: EffectError) => void; }>; const increaseCount = (count: number): number => count + 1; const decreaseCount = (count: number): number => (count > 0 ? count - 1 : 0); export function createEffectController< Event, Result, ErrorType = Error, >(): EffectController { const subscriptions = new Subscription(); const event$: Subject = new Subject(); const done$: Subject> = new Subject(); const error$: Subject> = new Subject(); const pendingCount = createStore(0, { internal: true, } as InternalStoreOptions); subscriptions.add(() => { event$.complete(); done$.complete(); error$.complete(); pendingCount.destroy(); }); const notifications$: Observable< EffectNotification > = merge( done$.pipe( map< EffectResult, EffectNotification >((entry) => ({ type: 'result', ...entry })), ), error$.pipe( map< EffectError, EffectNotification >((entry) => ({ type: 'error', ...entry })), ), ); return { state: { done$: done$.asObservable(), result$: done$.pipe(map(({ result }) => result)), error$: error$.asObservable(), final$: notifications$, pending: pendingCount.query((count) => count > 0), pendingCount: pendingCount.query(identity), }, start: () => pendingCount.update(increaseCount), next: (result) => done$.next(result), complete: () => pendingCount.update(decreaseCount), error: (effectError) => { if (effectError.origin === 'handler') { pendingCount.update(decreaseCount); } if (error$.observed) { error$.next(effectError); } else { emitGlobalUnhandledError(effectError); } }, destroy: () => { subscriptions.unsubscribe(); }, }; }