import {
Desc,
endEvent,
Event,
EventStream,
EventStreamSeed,
isProperty,
isPropertySource,
isValue,
Observable,
ObservableSeed,
Observer,
Property,
PropertySeed,
Scope,
Subscribe,
Unsub,
valueEvent,
} from "./abstractions"
import { applyScopeMaybe } from "./applyscope"
import { PropertySeedImpl } from "./property"
import { EventStreamSeedImpl } from "./eventstream"
import { nop, remove } from "./util"
export type FlatMapOptions = {
latest?: boolean
}
export type Spawner = (value: A) => O
export interface FlatMapOp {
(s: EventStream | EventStreamSeed): EventStreamSeed
}
export interface FlatMapOpScoped {
(s: EventStream | EventStreamSeed): EventStream
}
export function flatMap(
fn: Spawner | EventStreamSeed>
): FlatMapOp
export function flatMap(
fn: Spawner | EventStreamSeed>,
scope: Scope
): FlatMapOpScoped
export function flatMap(
fn: Spawner | EventStreamSeed>,
scope?: Scope
): any {
return (s: EventStream | EventStreamSeed) =>
applyScopeMaybe(
new FlatMapStreamSeed([s, "flatMap", [fn]], s, fn, {}),
scope
)
}
export type FlatMapChild> = {
observable: B
unsub?: Unsub
}
export class FlatMapStreamSeed extends EventStreamSeedImpl {
constructor(
desc: Desc,
s: EventStreamSeed,
fn: Spawner | EventStreamSeed>,
options: FlatMapOptions = {}
) {
const [children, subscribe] = flatMapSubscribe(
s.consume().subscribe.bind(s),
fn,
options
)
super(desc, subscribe)
}
}
export class FlatMapPropertySeed extends PropertySeedImpl {
constructor(
desc: Desc,
src: Property | PropertySeed,
fn: Spawner | Property>,
options: FlatMapOptions = {}
) {
const source = isProperty(src) ? src : src.consume()
let initializing = true // Flag used to prevent the initial value from leaking to the external subscriber. Yes, this is hack.
const subscribeWithInitial = (
onValue: Observer,
onEnd: Observer = nop
) => {
const unsub = source.onChange(onValue, onEnd)
onValue(source.get()) // To spawn property for initial value
initializing = false
return unsub
}
const [getLatestChild, subscribe] = flatMapSubscribe(
subscribeWithInitial,
fn,
options
)
const get = () => {
const child = getLatestChild()
if (!child) {
throw Error("Assertion failed: No child created")
}
const observable = child.observable
if (isProperty(observable) || isPropertySource(observable)) {
return (observable as Property).get()
} else {
throw Error(
"Observable returned by the spawner function if flatMapLatest for Properties must return a Property. This one is not a Property: " +
observable
)
}
}
super(desc, get, (onValue, onEnd = nop) =>
subscribe(
(value) => {
if (!initializing) onValue(value)
},
() => {
if (!initializing) onEnd()
}
)
)
}
}
function flatMapSubscribe(
subscribe: Subscribe,
fn: Spawner, any>>,
options: FlatMapOptions
): [() => FlatMapChild> | null, Subscribe] {
const children: FlatMapChild>[] = []
let latestChild: FlatMapChild> | null = null
const getLatestChild = () => latestChild
return [
getLatestChild,
(onValue: Observer, onEnd: Observer = nop) => {
let rootEnded = false
const unsubThis = subscribe(
(rootEvent) => {
if (options.latest) {
for (let child of children) {
child.unsub!()
}
children.splice(0)
}
const child = { observable: fn(rootEvent).consume() } as FlatMapChild<
Observable
>
latestChild = child
children.push(child)
let ended = false
child.unsub = child.observable.subscribe(onValue, () => {
remove(children, child)
if (child.unsub) {
child.unsub()
} else {
ended = true
}
if (children.length === 0 && rootEnded) {
onEnd()
}
})
if (ended) {
child.unsub()
}
},
() => {
rootEnded = true
if (children.length === 0) {
onEnd()
}
}
)
return () => {
unsubThis()
children.forEach((child) => child.unsub && child.unsub())
}
},
]
}