import { applyScope, createScope, endEvent, Event, EventStream, EventStreamSeed, fromSubscribe, isEnd, isEventStream, isProperty, isValue, Observable, Property, PropertySeed, Scope, sequentially, StatefulEventStream, toEvent, valueEvent, } from "." import { map } from "./map" import GlobalScheduler, { setScheduler } from "./scheduler" import { globalScope } from "./scope" import TickScheduler from "./tickscheduler" import { nop } from "./util" export function wait(delay: number): Promise { return new Promise((resolve) => GlobalScheduler.scheduler.setTimeout(resolve, delay) ) } export const sc = TickScheduler() setScheduler(sc) let seqs: StatefulEventStream[] = [] const verifyCleanup = () => { try { for (let seq of seqs) { expect(hasObservers(seq)).toEqual(false) } } finally { seqs = [] } } export function hasObservers(o: Observable) { return (o as any).dispatcher.hasObservers() } let scope = createScope() export function testScope(): Scope { return scope } function regSrc(source: EventStream) { seqs.push(source as StatefulEventStream) return source } export function series( interval: number, values: (V | Event)[] ): EventStream { return regSrc(sequentially(interval, values, scope)) } export const expectStreamEvents = ( src: () => EventStream | EventStreamSeed, expectedEvents: any[] ): void => { return verifySingleSubscriber(src, expectedEvents) } export const expectStreamTimings = ( src: () => EventStream | EventStreamSeed, expectedEventsAndTimings: [number, V][] ) => { const srcWithRelativeTime = () => { const { now } = sc const t0 = now() const relativeTime = () => Math.floor(now() - t0) const withRelativeTime = (x: V) => [relativeTime(), x] return map(withRelativeTime)(src() as EventStreamSeed) } return expectStreamEvents(srcWithRelativeTime, expectedEventsAndTimings) } const verifySingleSubscriber = ( srcF: () => EventStream | EventStreamSeed, expectedEvents: any[] ): void => { verifyStreamWith( "(single subscriber)", srcF, expectedEvents, (src, events, done) => { const unsub = src.subscribe( (value: any) => { events.push(value) }, () => { unsub && unsub() done() } ) } ) } const verifyStreamWith = ( description: string, srcF: () => EventStream | EventStreamSeed, expectedEvents: Event[], collectF: ( src: Observable, events: Event[], done: () => void ) => any ) => describe(description, () => { let src: EventStream const receivedEvents: Event[] = [] scope = createScope() beforeAll(() => { scope.start() const seed = srcF() src = isEventStream(seed) ? seed : applyScope(scope)(seed) expect(isEventStream(src)).toEqual(true) }) beforeAll((done) => collectF(src, receivedEvents, done)) it("outputs expected values in order", () => { expect(toValues(receivedEvents)).toEqual(toValues(expectedEvents)) }) it("the stream is exhausted", () => verifyExhausted(src)) it("cleans up observers", () => { scope.end() verifyCleanup() }) }) export const expectPropertyEvents = ( srcF: () => Property | PropertySeed, expectedEvents: any[], param: any = {} ) => { const { unstable, semiunstable, extraCheck } = param expect(expectedEvents.length > 0).toEqual(true) verifyPropertyWith( "(single subscriber)", srcF, expectedEvents, ( src: Property, events: Event[], done: (err: Error | void) => any ) => { src.subscribe( (value: any) => { events.push(value) expect(src.get()).toEqual(value) }, () => { done(undefined) } ) }, extraCheck ) it("Can be subscribed to before scope begins", () => { const seed = srcF() const src = isProperty(seed) ? seed.getScope() === globalScope ? seed.applyScope(scope) : seed : (applyScope(scope)(seed) as Property) const values: any[] = [] const unsub = src.subscribe((v) => values.push(v)) expect(values).toEqual([]) unsub() }) } const verifyPropertyWith = ( description: string, srcF: () => Property | PropertySeed, expectedEvents: Event[], collectF: ( src: Property, events: Event[], done: () => void ) => void, extraCheck: Function | undefined = undefined ) => { describe(description, () => { let src: Property const events: Event[] = [] beforeAll(() => { scope.start() const seed = srcF() src = isProperty(seed) ? seed : (applyScope(scope)(seed) as Property) }) beforeAll((done) => collectF(src, events, done)) it("is a Property", () => expect(isProperty(src)).toEqual(true)) it("outputs expected events in order", () => expect(toValues(events)).toEqual(toValues(expectedEvents))) if (expectedEvents.length > 0) { it("has correct final state", () => verifyFinalState(src, expectedEvents[expectedEvents.length - 1])) } it("cleans up observers", () => { scope.end() verifyCleanup() }) if (extraCheck != undefined) { extraCheck() } }) } var verifyFinalState = (property: Property, value: any) => { verifyExhausted(property) expect(property.get()).toEqual(toValue(value)) } const verifyExhausted = (src: Observable) => { const events: Event[] = [] src.subscribe( (value: any) => { events.push(valueEvent(value)) }, () => { events.push(endEvent) } ) if (events.length === 0) { throw new Error("got zero events") } expect(isEnd(events[0])).toEqual(true) } const toValues = (xs: Event[]) => xs.map(toValue) const toValue = (x: Event | any) => { const event = toEvent(x) if (isValue(event)) return event.value return "" } export function atGivenTimes( timesAndValues: [number, V][] ): EventStreamSeed { const startTime = sc.now() return fromSubscribe((onValue, onEnd = nop) => { let shouldStop = false var schedule = (timeOffset: number, index: number) => { const first = timesAndValues[index] const scheduledTime = first[0] const delay = scheduledTime - sc.now() + startTime const push = () => { if (shouldStop) { return } const value = first[1] onValue(value) if (!shouldStop && index + 1 < timesAndValues.length) { return schedule(scheduledTime, index + 1) } else { return onEnd() } } return sc.setTimeout(push, delay) } schedule(0, 0) return () => (shouldStop = true) }) }