import { Arr, Optional } from 'ts-data-forge'; import { type MutableMap } from 'ts-type-forge'; import { type ChildObservable, type InitializedObservable, type Observable, type ObservableBase, type Operator, type Subscriber, type SubscriberId, type Subscription, type UpdateToken, type WithInitialValueOperator, } from '../types/index.mjs'; import { issueObservableId, issueSubscriberId, issueUpdateToken, toSubscriber, } from '../utils/index.mjs'; export class ObservableBaseClass< A, Kind extends ObservableBase['kind'], Depth extends ObservableBase['depth'], > implements ObservableBase { readonly id; readonly kind: Kind; readonly depth: Depth; #mut_children: readonly ChildObservable[]; readonly #subscribers: MutableMap>; #mut_currentValue: ReturnType['getSnapshot']>; #mut_isCompleted: ObservableBase['isCompleted']; #mut_updateToken: ObservableBase['updateToken']; constructor({ kind, depth, initialValue, }: Readonly<{ kind: Kind; depth: Depth; initialValue: ReturnType['getSnapshot']>; }>) { this.kind = kind; this.depth = depth; this.id = issueObservableId(); this.#mut_currentValue = initialValue; this.#mut_children = []; this.#subscribers = new Map>(); this.#mut_isCompleted = false; this.#mut_updateToken = issueUpdateToken(); } addChild(child: ChildObservable): void { this.#mut_children = Arr.toPushed( this.#mut_children, child as ChildObservable, ); } getSnapshot(): ReturnType['getSnapshot']> { return this.#mut_currentValue; } protected getCurrentValue(): ReturnType['getSnapshot']> { return this.#mut_currentValue; } get isCompleted(): boolean { return this.#mut_isCompleted; } get updateToken(): UpdateToken { return this.#mut_updateToken; } get hasSubscriber(): boolean { return this.#subscribers.size > 0; } get hasChild(): boolean { return Arr.isNonEmpty(this.#mut_children); } hasActiveChild(): boolean { return this.#mut_children.some((c) => !c.isCompleted); } protected setNext(nextValue: A, updateToken: UpdateToken): void { this.#mut_updateToken = updateToken; this.#mut_currentValue = Optional.some(nextValue); for (const s of this.#subscribers.values()) { s.onNext(nextValue); } } // eslint-disable-next-line @typescript-eslint/class-methods-use-this tryUpdate(_updateToken: UpdateToken): void { throw new Error('not implemented'); } tryComplete(): void { if (!this.hasSubscriber && !this.hasActiveChild()) { this.complete(); } } complete(): void { if (this.isCompleted) return; // terminate only once // change state this.#mut_isCompleted = true; // run subscribers for the current value for (const s of this.#subscribers.values()) { s.onComplete(); } // remove all subscribers this.#subscribers.clear(); // propagate to children for (const o of this.#mut_children) { o.tryComplete(); } } pipe(operator: WithInitialValueOperator): InitializedObservable; pipe(operator: Operator): Observable; pipe(operator: Operator): Observable { return operator( // eslint-disable-next-line total-functions/no-unsafe-type-assertion this as unknown as InitializedObservable, ); } subscribe(onNext: (v: A) => void, onComplete?: () => void): Subscription { // first emit const curr = this.getSnapshot(); if (Optional.isSome(curr)) { onNext(curr.value); } if (this.isCompleted) { if (onComplete !== undefined) { onComplete(); } return { unsubscribe: () => {} }; } const id: SubscriberId = this.#addSubscriber( toSubscriber(onNext, onComplete), ); return { unsubscribe: () => { this.#removeSubscriber(id); }, }; } #addSubscriber(s: Subscriber): SubscriberId { // return the id of added subscriber const id = issueSubscriberId(); this.#subscribers.set(id, s); return id; } #removeSubscriber(id: SubscriberId): void { this.#subscribers.delete(id); } }