import { Optional, SafeUint, asSafeUint, pipe } from 'ts-data-forge'; import { SyncChildObservableClass } from '../class/index.mjs'; import { type DropInitialValueOperator, type Observable, type SkipWhileOperatorObservable, type UpdateToken, } from '../types/index.mjs'; /** * Skips values from the source observable while the predicate returns true. * Once the predicate returns false, all subsequent values pass through. * * @template A - The type of values from the source * @param predicate - Function to test each value * @returns An operator that skips values while the predicate is true * * @example * ```ts * // Timeline: * // * // num$ 1 2 3 4 5 6 7 1 2 * // skipped$ 5 6 7 1 2 * // |-------- skip --------| * // * // Explanation: * // - skipWhile skips values while the predicate returns true * // - Once the predicate returns false, all subsequent values pass through * // - Unlike filter, the predicate is never checked again after the first false * * const num$ = source(); * * const skipped$ = num$.pipe(skipWhile((x) => x < 5)); * * const valueHistory: number[] = []; * * skipped$.subscribe((x) => { * valueHistory.push(x); * }); * * num$.next(1); // nothing logged * * num$.next(2); // nothing logged * * num$.next(3); // nothing logged * * num$.next(4); // nothing logged * * num$.next(5); // logs: 5 * * assert.deepStrictEqual(valueHistory, [5]); * * num$.next(6); // logs: 6 * * assert.deepStrictEqual(valueHistory, [5, 6]); * * num$.next(7); // logs: 7 * * assert.deepStrictEqual(valueHistory, [5, 6, 7]); * * num$.next(1); // logs: 1 * * num$.next(2); // logs: 2 * * assert.deepStrictEqual(valueHistory, [5, 6, 7, 1, 2]); * ``` */ export const skipWhile = ( predicate: (value: A, index: SafeUint | -1) => boolean, ): DropInitialValueOperator => (parentObservable) => new SkipWhileObservableClass(parentObservable, predicate); /* implementation */ class SkipWhileObservableClass extends SyncChildObservableClass implements SkipWhileOperatorObservable { readonly #predicate: (value: A, index: SafeUint | -1) => boolean; #mut_index: SafeUint | -1; #mut_skipping: boolean; constructor( parentObservable: Observable, predicate: (value: A, index: SafeUint | -1) => boolean, ) { super({ parents: [parentObservable], initialValue: pipe(parentObservable.getSnapshot()).map((sn) => Optional.isNone(sn) ? Optional.none : predicate(sn.value, -1) ? Optional.none : sn, ).value, }); this.#mut_index = -1; this.#predicate = predicate; this.#mut_skipping = true; } override tryUpdate(updateToken: UpdateToken): void { const par = this.parents[0]; const sn = par.getSnapshot(); if (par.updateToken !== updateToken || Optional.isNone(sn)) { return; // skip update } this.#mut_index = this.#mut_index === -1 ? asSafeUint(0) : SafeUint.add(1, this.#mut_index); if (!this.#predicate(sn.value, this.#mut_index)) { this.#mut_skipping = false; } if (!this.#mut_skipping) { this.setNext(sn.value, updateToken); } } }