import { Optional } from 'ts-data-forge'; import { SyncChildObservableClass } from '../class/index.mjs'; import { type KeepInitialValueOperator, type Observable, type TakeUntilOperatorObservable, type UpdateToken, } from '../types/index.mjs'; /** * Emits values from the source until the notifier observable emits. * When the notifier emits, this observable completes. * * @template A - The type of values from the source * @param notifier - An observable that signals when to complete * @returns An operator that takes values until notifier emits * * @example * ```ts * // Timeline: * // * // num$ 1 2 stop 3 (ignored) * // stopNotifier X * // limited$ 1 2 |------- (completed) * // * // Explanation: * // - takeUntil completes the observable when the notifier emits * // - After stop() is called, no further values are emitted * // - Useful for cleanup and cancellation patterns * * const num$ = source(); * * const [stopNotifier, stop_] = createEventEmitter(); * * const limited$ = num$.pipe(takeUntil(stopNotifier)); * * const valueHistory: number[] = []; * * limited$.subscribe((x) => { * valueHistory.push(x); * }); * * num$.next(1); // logs: 1 * * assert.deepStrictEqual(valueHistory, [1]); * * num$.next(2); // logs: 2 * * assert.deepStrictEqual(valueHistory, [1, 2]); * * stop_(); * * num$.next(3); // nothing logged (completed) * * assert.deepStrictEqual(valueHistory, [1, 2]); * ``` */ export const takeUntil = ( notifier: Observable, ): KeepInitialValueOperator => // eslint-disable-next-line total-functions/no-unsafe-type-assertion ((parentObservable) => new TakeUntilObservableClass( parentObservable, notifier, )) as KeepInitialValueOperator; class TakeUntilObservableClass extends SyncChildObservableClass implements TakeUntilOperatorObservable { constructor(parentObservable: Observable, notifier: Observable) { super({ parents: [parentObservable], initialValue: parentObservable.getSnapshot(), }); notifier.subscribe( () => { this.complete(); }, () => { this.complete(); }, ); } override tryUpdate(updateToken: UpdateToken): void { const par = this.parents[0]; const sn = par.getSnapshot(); if (par.updateToken !== updateToken || Optional.isNone(sn)) { return; // skip update } this.setNext(sn.value, updateToken); } }