import { abortable } from "@wopjs/disposable"; import { type AdaptiveSet, add, remove, size } from "adaptive-set"; import { ANY_EVENT, ERROR_EVENT } from "./constants"; import { type RemitterListenerInternal, type AllRemitterEventNames, type AnyRemitterListener, type ErrorRemitterListener, type Fn, type RemitterDatalessEventName, type RemitterDisposer, type RemitterEventNames, type RemitterListener, } from "./interface"; import { isPromise } from "./utils"; export type EventReceiver = Omit< Remitter, "emit" | "remit" | "remitAny" >; interface RelayListener { readonly eventName_: AllRemitterEventNames; disposer_?: | null | Promise | RemitterDisposer | undefined; readonly start_: (remitter: Remitter) => RemitterDisposer; } export class Remitter { /** * @internal */ private _listeners_?: Map, AdaptiveSet>; /** * @internal */ private _onceListeners_?: WeakMap< RemitterListenerInternal, RemitterListenerInternal >; /** * @internal */ private _relayListeners_?: AdaptiveSet>; /** * Remove all listeners from the eventName or all events. * @param eventName Optional eventName to clear. */ public clear>( eventName?: TEventName ): void; /** * @internal */ public clear>( eventName?: TEventName ): void; public clear>( eventName?: TEventName ): void { if (this._listeners_) { if (eventName) { this._listeners_.delete(eventName); } else { this._listeners_ = undefined; } this._tryStopAllRelay_(); } } /** * Remove all listeners from `ANY_EVENT`. */ public clearAny(): void { this.clear(ANY_EVENT); } /** * Remove all listeners from `ERROR_EVENT`. */ public clearError(): void { return this.clear(ERROR_EVENT); } public dispose(): void { this.clear(); this._relayListeners_ = undefined; } /** * Emit an event to `eventName` listeners. */ public emit>( eventName: TEventName ): void; /** * Emit an event with payload to `eventName` listeners. */ public emit>( eventName: TEventName, eventData: TConfig[TEventName] ): void; /** * Emit an event with payload to `eventName` listeners. */ public emit>( event: TEventName, data?: TConfig[TEventName] ): void { this._emit_(event, data); if (event !== ANY_EVENT) { this._emit_(ANY_EVENT, { data, event }); } } /** * If the eventName has any listener. * @param eventName Optional eventName to check. * @returns `true` if the eventName has any listener, `false` otherwise. If no eventName is provided, returns `true` if the Remitter has any listener. */ public has>( eventName?: TEventName ): boolean; /** * @internal */ public has>( eventName?: TEventName ): boolean; public has>( eventName?: TEventName ): boolean { return eventName ? !!this._listeners_?.get(eventName) : (this._listeners_?.size as number) > 0; } /** * If the `ANY_EVENT` has any listener. * @returns `true` if the `ANY_EVENT` has any listener, `false` otherwise. */ public hasAny(): boolean { return this.has(ANY_EVENT); } /** * If the `ERROR_EVENT` has any listener. * @returns `true` if the `ERROR_EVENT` has any listener, `false` otherwise. */ public hasError(): boolean { return this.has(ERROR_EVENT); } /** * Remove a listener from the eventName. */ public off>( eventName: TEventName, listener: Fn ): void; /** * @internal */ public off>( eventName: TEventName, listener: Fn ): void; public off>( eventName: TEventName, listener: Fn ): void { let listeners = this._listeners_?.get(eventName); if (listeners) { listeners = remove(listeners, listener); if (listeners) { const onceListener = this._onceListeners_?.get(listener); if (onceListener) { listeners = remove(listeners, onceListener); } } if (size(listeners)) { this._listeners_!.set(eventName, listeners); } else { this._listeners_!.delete(eventName); this._tryStopAllRelay_(); } } } /** * Remove a listener from `ANY_EVENT`. */ public offAny(listener: AnyRemitterListener): void { this.off(ANY_EVENT, listener); } /** * Remove a listener from `ERROR_EVENT`. */ public offError(listener: ErrorRemitterListener): void { this.off(ERROR_EVENT, listener); } /** * Add an `ANY_EVENT` listener to receive all events. * @internal */ public on( eventName: ANY_EVENT, listener: AnyRemitterListener ): RemitterDisposer; /** * Add an `ERROR_EVENT` listener to receive unhandled subscriber errors. * @internal */ public on( eventName: ERROR_EVENT, listener: ErrorRemitterListener ): RemitterDisposer; /** * Add a listener to the eventName. */ public on>( eventName: TEventName, listener: RemitterListener ): RemitterDisposer; /** * Add a listener to the eventName. * @internal */ public on>( eventName: TEventName | ANY_EVENT, listener: RemitterListenerInternal ): RemitterDisposer; /** * Add a listener to the eventName. */ public on>( eventName: TEventName | ANY_EVENT, listener: RemitterListenerInternal ): RemitterDisposer { const listeners = (this._listeners_ ||= new Map< AllRemitterEventNames, AdaptiveSet >()).get(eventName); const oldSize = size(listeners); this._listeners_.set(eventName, add(listeners, listener)); if (!oldSize && this._relayListeners_) { for (const listener of this._relayListeners_) { if ( !listener.disposer_ && (listener.eventName_ === ANY_EVENT || this.has(listener.eventName_) || this.has(ANY_EVENT)) ) { this._startRelay_(listener); } } } return () => { this.off(eventName, listener); }; } /** * Add an `ANY_EVENT` listener to receive all events. */ public onAny(listener: AnyRemitterListener): RemitterDisposer { return this.on(ANY_EVENT, listener); } /** * Add a one-time listener to `ANY_EVENT` to receive all events. * @internal */ public once( eventName: ANY_EVENT, listener: AnyRemitterListener ): RemitterDisposer; /** * Add a one-time listener to `ERROR_EVENT` to receive unhandled subscriber errors. * @internal */ public once( eventName: ERROR_EVENT, listener: ErrorRemitterListener ): RemitterDisposer; /** * Add a one-time listener to the eventName. */ public once>( eventName: TEventName, listener: RemitterListener ): RemitterDisposer; /** * Add a one-time listener to the eventName. */ public once>( eventName: TEventName | ANY_EVENT, listener: RemitterListenerInternal ): RemitterDisposer { const off = abortable(() => this.off(eventName, onceListener)); const onceListener = (eventData => ( off(), listener(eventData) )) as RemitterListenerInternal; (this._onceListeners_ ||= new WeakMap()).set(listener, onceListener); this.on(eventName, onceListener); return off; } /** * Add a one-time listener to `ANY_EVENT` to receive all events. */ public onceAny(listener: AnyRemitterListener): RemitterDisposer { return this.once(ANY_EVENT, listener); } /** * Add a one-time listener to `ERROR_EVENT` to receive unhandled subscriber errors. */ public onceError(listener: ErrorRemitterListener): RemitterDisposer { return this.once(ERROR_EVENT, listener); } /** * Add an `ERROR_EVENT` listener to receive unhandled subscriber errors. */ public onError(listener: ErrorRemitterListener): RemitterDisposer { return this.on(ERROR_EVENT, listener); } /** * Start a side effect when the eventName has a first listener. * Dispose the side effect when the eventName has no listeners. * Useful for tapping into other events. * * @param eventName * @param start A function that is called when listener count of `eventName` grows from 0 to 1. * Returns a disposer when listener count of `eventName` drops from 1 to 0. */ public remit>( eventName: TEventName, start: (remitter: Remitter) => RemitterDisposer ): RemitterDisposer; /** * @internal */ public remit>( eventName: TEventName, start: (remitter: Remitter) => RemitterDisposer ): RemitterDisposer; public remit>( eventName: TEventName, start: (remitter: Remitter) => RemitterDisposer ): RemitterDisposer { const relayListener: RelayListener = { eventName_: eventName, start_: start, }; this._relayListeners_ = add(this._relayListeners_, relayListener); if ( eventName === ANY_EVENT ? this.has() : this.has(eventName) || this.has(ANY_EVENT) ) { this._startRelay_(relayListener); } return () => { this._relayListeners_ = remove(this._relayListeners_, relayListener); this._stopRelay_(relayListener); }; } /** * Start a side effect when the first listener. * Dispose the side effect when the eventName has no listeners. * Useful for tapping into other events. * * @param start A function that is called when all listener count grows from 0 to 1. * Returns a disposer when all listener count drops from 1 to 0. */ public remitAny( start: (remitter: Remitter) => RemitterDisposer ): RemitterDisposer { return this.remit(ANY_EVENT, start); } /** * @internal */ private _emit_>( event: TEventName, data: any ): void { const listeners = this._listeners_?.get(event); if (listeners) { for (const listener of listeners) { this._tryCall_(listener, data); } } } /** * @internal */ private _handleError_ = (e: unknown) => { if (this.has(ERROR_EVENT)) { this._emit_(ERROR_EVENT, e); } else { console.error(e); } }; /** * @internal */ private async _startRelay_(listener: RelayListener) { listener.disposer_ = this._tryCall_(listener.start_, this) || Promise.resolve(); } /** * @internal */ private async _stopRelay_(listener: RelayListener): Promise { const pDisposer = listener.disposer_; if (pDisposer) { listener.disposer_ = null; const disposer = isPromise(pDisposer) ? await pDisposer : pDisposer; if (disposer) { this._tryCall_(disposer); } } } /** * @internal */ private _tryCall_( fn: () => TReturn ): Promise; /** * @internal */ private _tryCall_( fn: (arg: TArg) => TReturn, arg: TArg ): Promise; /** * @internal */ private _tryCall_( fn: (arg?: TArg) => Promise | TReturn, arg?: TArg ): Promise | TReturn | undefined { try { const p = fn(arg); return isPromise(p) ? p.catch(this._handleError_) : p; } catch (e) { this._handleError_(e); } } /** * @internal */ private _tryStopAllRelay_() { if (this._relayListeners_) { for (const listener of this._relayListeners_) { if ( listener.disposer_ && (listener.eventName_ === ANY_EVENT ? !this.has() : !this.has(ANY_EVENT) && !this.has(listener.eventName_)) ) { this._stopRelay_(listener); } } } } }