import { Arr, Optional, expectType } from 'ts-data-forge'; import { type NonEmptyArray } from 'ts-type-forge'; import { SyncChildObservableClass } from '../class/index.mjs'; import { source } from '../create/index.mjs'; import { withInitialValue } from '../operators/index.mjs'; import { type CombineObservable, type CombineObservableRefined, type InitializedObservable, type InitializedSyncChildObservable, type NonEmptyUnknownList, type Observable, type SyncChildObservable, type UpdateToken, type Wrap, } from '../types/index.mjs'; /** * Combines multiple observables into a single observable that emits an array of their latest values. * Emits whenever any of the source observables emit, but only after all sources have emitted at least once. * * @template OS - Tuple type of source observables * @param parents - Array of observables to combine * @returns A combined observable emitting tuples of values * * @example * ```ts * // Timeline: * // * // name$ "Alice" "Bob" * // age$ 25 30 * // user$ ["Alice",25] ["Bob",25] ["Bob",30] * // * // Explanation: * // - combine waits for all sources to emit at least once * // - Then emits the latest value from all sources whenever any source emits * // - Always emits an array with the latest values from each source * * const name$ = source(); * * const age$ = source(); * * const user$ = combine([name$, age$]); * * const userHistory: (readonly [string, number])[] = []; * * user$.subscribe(([name_, age]) => { * userHistory.push([name_, age]); * }); * * name$.next('Alice'); // nothing logged (age$ hasn't emitted yet) * * assert.deepStrictEqual(userHistory, []); * * age$.next(25); // logs: { name: 'Alice', age: 25 } * * assert.deepStrictEqual(userHistory, [['Alice', 25]]); * * name$.next('Bob'); // logs: { name: 'Bob', age: 25 } * * assert.deepStrictEqual(userHistory, [ * ['Alice', 25], * ['Bob', 25], * ]); * * age$.next(30); // logs: { name: 'Bob', age: 30 } * * assert.deepStrictEqual(userHistory, [ * ['Alice', 25], * ['Bob', 25], * ['Bob', 30], * ]); * ``` */ export const combine = >>( parents: OS, ): CombineObservableRefined => // eslint-disable-next-line total-functions/no-unsafe-type-assertion new CombineObservableClass( parents, ) as unknown as CombineObservableRefined; /** * Alias for `combine`. * @see combine */ export const combineLatest = combine; class CombineObservableClass extends SyncChildObservableClass implements CombineObservable { constructor(parents: Wrap) { const parentsValues = parents.map((p) => p.getSnapshot()); super({ parents, initialValue: parentsValues.every(Optional.isSome) ? Optional.some( // eslint-disable-next-line total-functions/no-unsafe-type-assertion Arr.map(parentsValues, (c) => c.value) as A, ) : Optional.none, }); } override tryUpdate(updateToken: UpdateToken): void { if (this.parents.every((o) => o.updateToken !== updateToken)) return; // all parents are skipped const parentValues = this.parents.map((a) => a.getSnapshot()); if (parentValues.every(Optional.isSome)) { const nextValue = // eslint-disable-next-line total-functions/no-unsafe-type-assertion Arr.map(parentValues, (a) => a.value) as A; this.setNext(nextValue, updateToken); } } } { { const s1: Observable<1> = source<1>(); const s2: Observable<2> = source<2>(); const _d = combine([s1, s2]); expectType>('='); expectType>('<='); } { const s1: InitializedObservable<1> = source<1>().pipe(withInitialValue(1)); const s2: Observable<2> = source<2>(); const _d = combine([s1, s2]); expectType>('='); expectType>('<='); } { const s1: InitializedObservable<1> = source<1>().pipe(withInitialValue(1)); const s2: InitializedObservable<2> = source<2>().pipe(withInitialValue(2)); const _d = combine([s1, s2]); // Returns InitializedObservable if all OS are InitializedObservable expectType>('<='); } const r1 = source(1); const r2 = source('a'); const _c = combine([r1, r2]); const _ci = combine([ r1.pipe(withInitialValue(0)), r2.pipe(withInitialValue(0)), ]); expectType>('<='); expectType< typeof _ci, InitializedSyncChildObservable >('<='); }