import { EventStream, EventStreamSeed, Scope } from "./abstractions" import { GenericTransformOp, Transformer, transform, GenericTransformOpScoped, } from "./transform" import { nop } from "./util" export function takeUntil(stopper: EventStreamSeed): GenericTransformOp export function takeUntil( stopper: EventStreamSeed, scope: Scope ): GenericTransformOpScoped export function takeUntil(stopper: EventStreamSeed, scope?: Scope): any { return transform( ["takeUntil", [stopper]], takeUntilT(stopper), scope as Scope ) } function takeUntilT(stopper: EventStreamSeed): Transformer { return { changes: (subscribe) => (onValue, onEnd = nop) => { let unsubscribed = false let ended = false const end = () => { if (!ended) { ended = true onEnd() } } const unsubSrc = subscribe(onValue, end) const unsubStopper = stopper.forEach(() => { end() unsub() }) const unsub = () => { if (!unsubscribed) { unsubSrc() unsubStopper() unsubscribed = true } } return unsub }, init: (value: A) => { return value }, } }