import { mergeArray } from '@most/core' import { newDefaultScheduler } from '@most/scheduler' import { deepStrictEqual } from 'assert' import { pipe } from './function' import * as R from './Resume' import * as S from './Stream' import { runEffects, tap } from './Stream' export const test = describe(`Stream`, () => { describe(S.fromResume.name, () => { describe(`given a Resume`, () => { it(`returns a Stream of that value`, async () => { const value = 1 const stream = pipe( value, R.of, S.fromResume, tap((x) => deepStrictEqual(x, value)), ) await runEffects(stream, newDefaultScheduler()) }) }) }) describe(S.exhaustLatest.name, () => { describe(`given, Stream>`, () => { it(`subscribes to only one stream at a time`, async () => { let subscriptions = 0 const increment = 1 const numberOfItems = 10 const delay = increment * numberOfItems const stream = mergeArray( Array.from({ length: numberOfItems }, (_, i) => pipe( S.at( i * increment, pipe( S.at(delay, i + 1), S.tap(() => deepStrictEqual(++subscriptions, 1)), S.concatMap(() => { deepStrictEqual(--subscriptions, 0) return S.empty() }), ), ), ), ), ) await pipe(stream, S.exhaustLatest, S.collectEvents(newDefaultScheduler())) }) it(`resamples when ongoing stream completes`, async () => { const increment = 1 const numberOfItems = 10 const delay = increment * numberOfItems const stream = mergeArray( Array.from({ length: numberOfItems }, (_, i) => S.at(i * increment, S.at(delay, i + 1))), ) const actual = await pipe(stream, S.exhaustLatest, S.collectEvents(newDefaultScheduler())) deepStrictEqual(actual, [1, numberOfItems]) }) }) }) })