import { Desc, endEvent, Event, EventStream, EventStreamSeed, isProperty, isPropertySource, isValue, Observable, ObservableSeed, Observer, Property, PropertySeed, Scope, Subscribe, Unsub, valueEvent, } from "./abstractions" import { applyScopeMaybe } from "./applyscope" import { PropertySeedImpl } from "./property" import { EventStreamSeedImpl } from "./eventstream" import { nop, remove } from "./util" export type FlatMapOptions = { latest?: boolean } export type Spawner = (value: A) => O export interface FlatMapOp { (s: EventStream | EventStreamSeed): EventStreamSeed } export interface FlatMapOpScoped { (s: EventStream | EventStreamSeed): EventStream } export function flatMap( fn: Spawner | EventStreamSeed> ): FlatMapOp export function flatMap( fn: Spawner | EventStreamSeed>, scope: Scope ): FlatMapOpScoped export function flatMap( fn: Spawner | EventStreamSeed>, scope?: Scope ): any { return (s: EventStream | EventStreamSeed) => applyScopeMaybe( new FlatMapStreamSeed([s, "flatMap", [fn]], s, fn, {}), scope ) } export type FlatMapChild> = { observable: B unsub?: Unsub } export class FlatMapStreamSeed extends EventStreamSeedImpl { constructor( desc: Desc, s: EventStreamSeed, fn: Spawner | EventStreamSeed>, options: FlatMapOptions = {} ) { const [children, subscribe] = flatMapSubscribe( s.consume().subscribe.bind(s), fn, options ) super(desc, subscribe) } } export class FlatMapPropertySeed extends PropertySeedImpl { constructor( desc: Desc, src: Property | PropertySeed, fn: Spawner | Property>, options: FlatMapOptions = {} ) { const source = isProperty(src) ? src : src.consume() let initializing = true // Flag used to prevent the initial value from leaking to the external subscriber. Yes, this is hack. const subscribeWithInitial = ( onValue: Observer, onEnd: Observer = nop ) => { const unsub = source.onChange(onValue, onEnd) onValue(source.get()) // To spawn property for initial value initializing = false return unsub } const [getLatestChild, subscribe] = flatMapSubscribe( subscribeWithInitial, fn, options ) const get = () => { const child = getLatestChild() if (!child) { throw Error("Assertion failed: No child created") } const observable = child.observable if (isProperty(observable) || isPropertySource(observable)) { return (observable as Property).get() } else { throw Error( "Observable returned by the spawner function if flatMapLatest for Properties must return a Property. This one is not a Property: " + observable ) } } super(desc, get, (onValue, onEnd = nop) => subscribe( (value) => { if (!initializing) onValue(value) }, () => { if (!initializing) onEnd() } ) ) } } function flatMapSubscribe( subscribe: Subscribe, fn: Spawner, any>>, options: FlatMapOptions ): [() => FlatMapChild> | null, Subscribe] { const children: FlatMapChild>[] = [] let latestChild: FlatMapChild> | null = null const getLatestChild = () => latestChild return [ getLatestChild, (onValue: Observer, onEnd: Observer = nop) => { let rootEnded = false const unsubThis = subscribe( (rootEvent) => { if (options.latest) { for (let child of children) { child.unsub!() } children.splice(0) } const child = { observable: fn(rootEvent).consume() } as FlatMapChild< Observable > latestChild = child children.push(child) let ended = false child.unsub = child.observable.subscribe(onValue, () => { remove(children, child) if (child.unsub) { child.unsub() } else { ended = true } if (children.length === 0 && rootEnded) { onEnd() } }) if (ended) { child.unsub() } }, () => { rootEnded = true if (children.length === 0) { onEnd() } } ) return () => { unsubThis() children.forEach((child) => child.unsub && child.unsub()) } }, ] }