/** * @license * Copyright 2022-2024 Matter.js Authors * SPDX-License-Identifier: Apache-2.0 */ import { ImplementationError } from "../MatterError.js"; import { Logger } from "../log/Logger.js"; import "../polyfills/disposable.js"; import { MaybePromise } from "./Promises.js"; const logger = Logger.get("Observable"); /** * A callback function for observables. * * The observer return value effects how an {@link Observable} emits: * * - If an observer returns undefined the {@link Observable} invokes the next observer immediately. * * - If an observer returns a {@link Promise}, the {@link Observable} awaits the return value then continues as * described here. The emitter must then await the {@link Promise} returned by {@link Observable.emit}. * * - Any other return value is returned by {@link Observable.emit} and subsequent observers do not see emission. * * @param payload a list of arguments to be emitted */ export interface Observer { (...payload: T): MaybePromise; [observant]?: boolean; } /** * A discrete event that may be monitored via callback. Could call it "event" but that could be confused with Matter * cluster events and/or DOM events. * * @param T arguments, should be a named tuple */ export interface Observable extends AsyncIterable, PromiseLike { /** * Notify observers. */ emit(...args: T): R | undefined; /** * Add an observer. */ on(observer: Observer): void; /** * Remove an observer. */ off(observer: Observer): void; /** * Add an observer that emits once then is unregistered. */ once(observer: Observer): void; /** * True if there is at least one observer registered. */ isObserved: boolean; /** * Determine whether an observer is registered. */ isObservedBy(observer: Observer): boolean; /** * This flag indicates whether the observable is asynchronous. Any observable that accepts promise returns may * be asynchronous but this information is not available at runtime unless you specify here, typically via * {@link AsyncObservable}. */ isAsync?: boolean; /** * Observable supports standard "for await (const value of observable"). * * Using an observer in this manner limits your listener to the first parameter normally emitted and your observer * cannot return a value. */ [Symbol.asyncIterator](): AsyncIterator; /** * Release resources associated with the observable. */ [Symbol.dispose](): void; } /** * An observer may designate itself as "not observant" for the purposes of {@link Observable.isObserved} by returning * false from this field. */ export const observant = Symbol("consider-observed"); /** * An {@link Observable} that explicitly supports asynchronous observers. */ export interface AsyncObservable extends Observable> { isAsync: true; } function defaultErrorHandler(error: Error) { throw error; } export type ObserverErrorHandler = (error: Error, observer: Observer) => void; /** * A concrete {@link Observable} implementation. */ export class BasicObservable implements Observable { #errorHandler: ObserverErrorHandler; #observers?: Set>; #once?: Set>; #isAsync?: boolean; #joinIteration?: () => Promise>; #removeIterator?: () => void; #stopIteration?: () => void; constructor(errorHandler?: ObserverErrorHandler, isAsync?: boolean) { this.#errorHandler = errorHandler ?? defaultErrorHandler; this.#isAsync = isAsync; } [Symbol.dispose]() { this.#observers = this.#once = undefined; this.#stopIteration?.(); } get isAsync() { return this.#isAsync; } set isAsync(isAsync: boolean | undefined) { this.#isAsync = isAsync; } get isObserved() { if (this.#observers) { for (const observer of this.#observers) { if (observer[observant] !== false) { return true; } } } if (this.#once) { for (const observer of this.#once) { if (observer[observant] !== false) { return true; } } } return false; } isObservedBy(observer: Observer) { return !!this.#observers?.has(observer); } emit(...payload: T): R | undefined { if (!this.#observers) { return; } // Iterate over a clone of observers so we do not trigger new observers added during observation const iterator = [...this.#observers][Symbol.iterator](); const emitNext = (previousEmitResult?: R): R | undefined => { if (previousEmitResult !== undefined) { return previousEmitResult; } for (let iteration = iterator.next(); !iteration.done; iteration = iterator.next()) { let result; const observer = iteration.value; try { result = observer(...payload); } catch (e) { if (e instanceof Error) { this.#errorHandler(e, observer); } else { this.#errorHandler(new Error(`${e}`), observer); } } if (this.#once?.has(observer)) { this.#once.delete(observer); this.#observers?.delete(observer); } if (result === undefined) { continue; } if (MaybePromise.is(result)) { if (!this.isAsync) { let identity: string; if (observer.name) { identity = ` "${observer.name}"`; } else { identity = ""; } result.then(undefined, error => logger.error(`Unhandled error in async observer${identity}:`, error), ); continue; } return result.then(result => { if (result === undefined) { return emitNext(); } return result; }) as R; } return result; } }; return emitNext(); } on(observer: Observer) { if (!this.#observers) { this.#observers = new Set(); } this.#observers.add(observer); } off(observer: Observer) { this.#observers?.delete(observer); } once(observer: Observer) { this.on(observer); if (!this.#once) { this.#once = new Set(); } this.#once.add(observer); } then( onfulfilled?: ((value: T) => TResult1 | PromiseLike) | undefined | null, onrejected?: ((reason: any) => TResult2 | PromiseLike) | undefined | null, ): PromiseLike { return new Promise(resolve => { this.once((...payload): undefined => { resolve(payload); }); }).then(onfulfilled, onrejected); } async *[Symbol.asyncIterator](): AsyncIterator { let promise = this.#addIterator(); try { while (promise) { const next = await promise; if (next) { promise = next.promise; yield next.value; } } } finally { this.#removeIterator?.(); } } #addIterator() { if (this.#joinIteration) { return this.#joinIteration(); } let resolve: (next: Next) => void; let iteratorCount = 1; function newPromise() { return new Promise>(r => (resolve = r)); } let promise = newPromise(); function observer(...args: T): undefined { const oldResolve = resolve; promise = newPromise(); oldResolve({ value: args[0], promise }); } this.on(observer); this.#joinIteration = () => { iteratorCount++; return promise; }; this.#removeIterator = () => { if (!iteratorCount--) { this.#stopIteration?.(); } }; this.#stopIteration = () => { this.off(observer); resolve(undefined); this.#stopIteration = undefined; this.#removeIterator = undefined; }; } } type Next = undefined | { value: T; promise: Promise> }; function constructObservable(errorHandler?: ObserverErrorHandler) { return new BasicObservable(errorHandler); } /** * Create an {@link Observable}. */ export const Observable = constructObservable as unknown as { new (errorHandler?: ObserverErrorHandler): Observable; (errorHandler?: ObserverErrorHandler): Observable; }; function constructAsyncObservable(errorHandler?: ObserverErrorHandler) { return new BasicObservable(errorHandler, true); } /** * Create an {@link AsyncObservable} that explicitly supports asynchronous results */ export const AsyncObservable = constructAsyncObservable as unknown as { new (errorHandler?: ObserverErrorHandler): AsyncObservable; (errorHandler?: ObserverErrorHandler): AsyncObservable; }; function event(emitter: E, name: N) { const observer = (emitter as any)[name]; if (typeof !observer?.on !== "function") { throw new ImplementationError(`Invalid event name ${name}`); } return observer as Observable; } /** * A set of observables. You can bind events using individual observables or the methods emulating a subset Node's * EventEmitter. * * To maintain type safety, implementers define events as observable child properties. */ export class EventEmitter { emit>(this: This, name: N, ...payload: EventEmitter.PayloadOf) { event(this, name).emit(...payload); } addListener>( this: This, name: N, handler: EventEmitter.ObserverOf, ) { event(this, name).on(handler as any); } removeListener>( this: This, name: N, handler: EventEmitter.ObserverOf, ) { event(this, name).off(handler as any); } get eventNames() { return Object.keys(this).filter(k => typeof (this as any)[k]?.on === "function"); } [Symbol.dispose]() { for (const name of this.eventNames) { (this as unknown as Record)[name][Symbol.dispose]?.(); } } } export namespace EventEmitter { /** * Legal event names. If there are no events defined, assume this is an * untyped instance and allow any argument. */ export type NamesOf = [EventNames] extends [never] ? string : EventNames; export type EventNames = string & keyof { [K in keyof This as This[K] extends Observable ? K : never]: true; }; /** * Arguments for an event. If there are no events defined, assume this is * an untyped emitter and allow any argument. */ export type PayloadOf = [EventPayload] extends [never] ? any[] : EventPayload; export type EventPayload = This extends { [K in E]: Observable } ? T : never; export type ObserverOf = Observable>; } /** * An {@link Observable} that proxies to another {@link Observable}. * * Emits emitted here instead emit on the target {@link Observable}. Events emitted on the target emit locally via * a listener installed by the proxy. * * This is useful for managing a subset of {@link Observer}s for an {@link Observable}. * * Note that this "proxy" acts as a proxy but is not a JS {@link Proxy}. */ export class ObservableProxy extends BasicObservable { #target: Observable; #emitter = super.emit.bind(this); constructor(target: Observable) { super(); Object.defineProperty(this.#emitter, observant, { get() { return this.isObserved; }, }); this.#target = target; this.#target.on(this.#emitter); this.emit = this.#target.emit.bind(this.#target); } override [Symbol.dispose]() { this.#target.off(this.#emitter); super[Symbol.dispose](); } override get isAsync() { return this.#target.isAsync; } override get isObserved(): boolean { return this.#target.isObserved; } override emit: (...payload: any) => any | undefined; }