import { Arr, type Some } from 'ts-data-forge'; import { type MutableSet } from 'ts-type-forge'; import { isChildObservable, isManagerObservable, type AsyncChildObservable, type ChildObservable, type InitializedObservable, type InitializedSyncChildObservable, type KeepInitialValueOperator, type NonEmptyUnknownList, type Observable, type ObservableId, type Operator, type SyncChildObservable, type WithInitialValueOperator, type Wrap, } from '../types/index.mjs'; import { binarySearch, issueUpdateToken, maxDepth } from '../utils/index.mjs'; import { ObservableBaseClass } from './observable-base-class.mjs'; /** * Detects circular dependencies by walking the full ancestor chain of the * given parents and checking whether `child` already appears among them. * * @throws {Error} if a circular dependency is detected */ const hasCircularDependencyFrom = ( node: Observable, mut_visited: MutableSet, mut_inPath: MutableSet, ): boolean => { if (mut_inPath.has(node.id)) return true; if (mut_visited.has(node.id)) return false; mut_visited.add(node.id); mut_inPath.add(node.id); if (isChildObservable(node)) { for (const parent of node.parents) { if (hasCircularDependencyFrom(parent, mut_visited, mut_inPath)) { return true; } } } mut_inPath.delete(node.id); return false; }; const detectCircularDependency = ( child: ChildObservable, parents: readonly Observable[], ): void => { const mut_visited = new Set(); const mut_inPath = new Set([child.id]); for (const parent of parents) { if (hasCircularDependencyFrom(parent, mut_visited, mut_inPath)) { throw new Error( 'Circular dependency detected in observable graph: a child observable cannot be its own ancestor.', ); } } }; const registerChild = ( child: ChildObservable, parents: ChildObservable['parents'], ): void => { detectCircularDependency(child, parents); for (const p of parents) { p.addChild(child); } // register child to all reachable ManagerObservables const mut_rest = Array.from(parents); while (Arr.isNonEmpty(mut_rest)) { const p = mut_rest.pop(); if (p === undefined) break; if (isManagerObservable(p)) { p.addDescendant(child); } else { // trace back dependency graph mut_rest.push(...p.parents); } } }; const tryComplete = ({ hasSubscriber, hasActiveChild, parents, complete, }: Readonly<{ hasSubscriber: boolean; hasActiveChild: boolean; parents: ChildObservable['parents']; complete: () => void; }>): void => { // If there is no working parent node if (parents.every((r) => r.isCompleted)) { complete(); return; } // If there are no active child node if (!hasSubscriber && !hasActiveChild) { complete(); } // propagate to parents for (const par of parents) { par.tryComplete(); } }; export class AsyncChildObservableClass extends ObservableBaseClass implements AsyncChildObservable { readonly parents; #mut_propagationOrder: readonly ChildObservable[]; protected readonly descendantsIdSet: MutableSet; constructor({ parents, depth = 1 + maxDepth(parents), initialValue, }: Readonly<{ parents: Wrap

; depth?: number; initialValue: ReturnType['getSnapshot']>; }>) { super({ kind: 'async child', depth, initialValue, }); this.parents = parents; this.#mut_propagationOrder = []; this.descendantsIdSet = new Set(); registerChild(this, parents); } // overload addDescendant(child: ChildObservable): void { if (this.descendantsIdSet.has(child.id)) return; this.descendantsIdSet.add(child.id); const insertPos = binarySearch( this.#mut_propagationOrder.map((a) => a.depth), child.depth, ); this.#mut_propagationOrder = Arr.toInserted( this.#mut_propagationOrder, insertPos, child, ); } startUpdate(nextValue: A): void { const updateToken = issueUpdateToken(); this.setNext(nextValue, updateToken); for (const p of this.#mut_propagationOrder) { p.tryUpdate(updateToken); } } override complete(): void { super.complete(); // propagate to parents for (const par of this.parents) { par.tryComplete(); } } override tryComplete(): void { tryComplete({ complete: () => { this.complete(); }, hasActiveChild: this.hasActiveChild(), hasSubscriber: this.hasSubscriber, parents: this.parents, }); } } export class SyncChildObservableClass extends ObservableBaseClass implements SyncChildObservable { readonly parents; constructor({ parents, depth = 1 + maxDepth(parents), initialValue, }: Readonly<{ parents: Wrap

; depth?: number; initialValue: ReturnType['getSnapshot']>; }>) { super({ kind: 'sync child', depth, initialValue, }); this.parents = parents; registerChild(this, parents); } override complete(): void { super.complete(); for (const par of this.parents) { par.tryComplete(); } } override tryComplete(): void { tryComplete({ complete: () => { this.complete(); }, hasActiveChild: this.hasActiveChild(), hasSubscriber: this.hasSubscriber, parents: this.parents, }); } } export class InitializedSyncChildObservableClass< A, const P extends NonEmptyUnknownList, > extends SyncChildObservableClass implements InitializedSyncChildObservable { constructor({ parents, depth = 1 + maxDepth(parents), initialValue, }: Readonly<{ parents: Wrap

; depth?: number; initialValue: ReturnType['getSnapshot']>; }>) { super({ parents, depth, initialValue }); } override getSnapshot(): Some { // eslint-disable-next-line total-functions/no-unsafe-type-assertion return super.getCurrentValue() as Some; } override pipe( operator: KeepInitialValueOperator | WithInitialValueOperator, ): InitializedObservable; override pipe(operator: Operator): Observable; override pipe(operator: Operator): Observable { return operator(this); } }