/**
* Stream is an extension of @most/core with additional
* fp-ts instances as well as additional combinators for interoperation with other data
* structures in @typed/fp and fp-ts.
*
* A large goal of @typed/fp is to expand the `fp-ts` ecosystem to include
* [@most/core](https://github.com/mostjs/core) for a Reactive programming style, including
* derivatives such as [ReaderStream](./ReaderStream.ts.md), [ReaderStreamEither](./ReaderStreamEither.ts.md),
* [StateReaderStreamEither](./StateReaderStreamEither.ts.md) and a few others. It's the fastest push-based
* reactive library in JS period. The performance characteristics are due to it's architecture of getting out of
* the way of the computations you need to perform. It's also the first experience I had with FP. For instance,
* Most utilizes `Functor` laws to remove unneeded machinery through function composition improving runtime
* performance amongst other optimizations.
*
* See the [@most/core Documentation](https://mostcore.readthedocs.io/en/latest/) for the remaining API
* exposed by this module. Both @most/core + @most/types are re-exported from this module
* @since 0.9.2
*/
import * as M from '@most/core'
import { disposeAll, disposeNone } from '@most/disposable'
import { asap, schedulerRelativeTo } from '@most/scheduler'
import * as types from '@most/types'
import * as Alt_ from 'fp-ts/Alt'
import { Alternative1 } from 'fp-ts/Alternative'
import * as App from 'fp-ts/Applicative'
import * as Ap from 'fp-ts/Apply'
import * as CH from 'fp-ts/Chain'
import { ChainRec1 } from 'fp-ts/ChainRec'
import { Compactable1 } from 'fp-ts/Compactable'
import { Either, isLeft, isRight, left, match, right } from 'fp-ts/Either'
import { Eq } from 'fp-ts/Eq'
import { Filterable1 } from 'fp-ts/Filterable'
import { FromIO1 } from 'fp-ts/FromIO'
import { FromTask1 } from 'fp-ts/FromTask'
import { constVoid, flow, pipe } from 'fp-ts/function'
import { bindTo as bindTo_, Functor1, tupled as tupled_ } from 'fp-ts/Functor'
import { Monad1 } from 'fp-ts/Monad'
import { Monoid } from 'fp-ts/Monoid'
import * as O from 'fp-ts/Option'
import { Pointed1 } from 'fp-ts/Pointed'
import { Predicate } from 'fp-ts/Predicate'
import * as RA from 'fp-ts/ReadonlyArray'
import * as RM from 'fp-ts/ReadonlyMap'
import { Separated } from 'fp-ts/Separated'
import { Task } from 'fp-ts/Task'
import { fst, snd } from 'fp-ts/Tuple2'
import { Adapter, create } from './Adapter'
import { disposeBoth, settable } from './Disposable'
import { deepEqualsEq } from './Eq'
import * as FRe from './FromResume'
import { Arity1 } from './function'
import * as R from './Resume'
import * as S from './struct'
/**
* @since 0.9.2
* @category Model
*/
export type Stream = types.Stream
/**
* @since 0.9.2
* @category Type-level
*/
export type ValueOf = [A] extends [Stream] ? R : never
/**
* Convert an IO into a Most.js Task
* @since 0.9.2
* @category Constructor
*/
export function createCallbackTask(
cb: Arity1,
onError?: (error: Error) => void,
): types.Task {
const disposable = settable()
return {
run(t) {
if (!disposable.isDisposed()) {
disposable.addDisposable(cb(t))
}
},
error(_, e) {
disposable.dispose()
if (onError) {
return onError(e)
}
throw e
},
dispose: disposable.dispose,
}
}
/**
* @since 0.9.2
* @category URI
*/
export const URI = '@most/core/Stream' as const
/**
* @since 0.9.2
* @category URI
*/
export type URI = typeof URI
declare module 'fp-ts/HKT' {
export interface URItoKind {
[URI]: types.Stream
}
}
/**
* Create a Stream monoid where concat is a parallel merge.
*/
/**
* @since 0.9.2
* @category Typeclass Constructor
*/
export const getMonoid = (): Monoid> => {
return {
concat: M.merge,
empty: M.empty(),
}
}
/**
* Filter Option's from within a Stream
*/
/**
* @since 0.9.2
* @category Combinator
*/
export const compact = (stream: types.Stream>): types.Stream =>
M.map((s: O.Some) => s.value, M.filter(O.isSome, stream))
/**
* Separate left and right values
*/
/**
* @since 0.9.2
* @category Combinator
*/
export const separate = (
stream: types.Stream>,
): Separated, types.Stream> => {
const s = M.multicast(stream)
const left = pipe(
s,
M.filter(isLeft),
M.map((l) => l.left),
)
const right = pipe(
s,
M.filter(isRight),
M.map((r) => r.right),
)
return { left, right }
}
/**
* @since 0.9.2
* @category Combinator
*/
export const partitionMap =
(f: (a: A) => Either) =>
(fa: types.Stream) =>
separate(M.map(f, fa))
/**
* @since 0.9.2
* @category Combinator
*/
export const partition = (predicate: Predicate) =>
partitionMap((a: A) => (predicate(a) ? right(a) : left(a)))
/**
* @since 0.9.2
* @category Combinator
*/
export const filterMap =
(f: (a: A) => O.Option) =>
(fa: types.Stream) =>
compact(M.map(f, fa))
/**
* @since 0.9.2
* @category Instance
*/
export const Functor: Functor1 = {
map: M.map,
}
/**
* @since 0.9.2
* @category Instance
*/
export const Pointed: Pointed1 = {
of: M.now,
}
/**
* @since 0.9.2
* @category Constructor
*/
export const of = Pointed.of
/**
* @since 0.9.2
* @category Instance
*/
export const Apply: Ap.Apply1 = {
...Functor,
ap: M.ap,
}
/**
* @since 0.9.2
* @category Combinator
*/
export const apFirst = Ap.apFirst(Apply)
/**
* @since 0.9.2
* @category Combinator
*/
export const apS = Ap.apS(Apply)
/**
* @since 0.9.2
* @category Combinator
*/
export const apSecond = Ap.apSecond(Apply)
/**
* @since 0.9.2
* @category Combinator
*/
export const apT = Ap.apT(Apply)
/**
* @since 0.9.2
* @category Typeclass Constructor
*/
export const getApplySemigroup = Ap.getApplySemigroup(Apply)
/**
* @since 0.9.2
* @category Instance
*/
export const Applicative: App.Applicative1 = {
...Apply,
...Pointed,
}
/**
* @since 0.9.2
* @category Typeclass Constructor
*/
export const getApplicativeMonoid = App.getApplicativeMonoid(Applicative)
/**
* @since 0.9.2
* @category Instance
*/
export const Chain: CH.Chain1 = {
...Functor,
chain: M.chain,
}
/**
* @since 0.9.2
* @category Combinator
*/
export const chainFirst = CH.chainFirst(Chain)
/**
* @since 0.9.2
* @category Combinator
*/
export const bind = CH.bind(Chain)
/**
* @since 0.9.2
* @category Instance
*/
export const Monad: Monad1 = {
...Chain,
...Pointed,
}
/**
* @since 0.9.2
* @category Combinator
*/
export const chainRec =
(f: (value: A) => types.Stream>) =>
(value: A): types.Stream =>
pipe(value, f, M.delay(0), M.chain(match(flow(chainRec(f)), M.now)))
/**
* @since 0.9.2
* @category Instance
*/
export const ChainRec: ChainRec1 = {
chainRec,
}
/**
* @since 0.9.2
* @category Combinator
*/
export const switchRec =
(f: (value: A) => types.Stream>) =>
(value: A): types.Stream =>
pipe(value, f, M.map(match(switchRec(f), M.now)), M.switchLatest)
/**
* @since 0.9.2
* @category Instance
*/
export const SwitchRec: ChainRec1 = {
chainRec: switchRec,
}
/**
* @since 0.9.2
* @category Combinator
*/
export const mergeConcurrentlyRec =
(concurrency: number) =>
(f: (value: A) => types.Stream>) =>
(value: A): types.Stream =>
pipe(
value,
f,
M.map(match(mergeConcurrentlyRec(concurrency)(f), M.now)),
M.mergeConcurrently(concurrency),
)
/**
* @since 0.9.2
* @category Typeclass Constructor
*/
export const getConcurrentChainRec = (concurrency: number): ChainRec1 => ({
chainRec: mergeConcurrentlyRec(concurrency),
})
/**
* @since 0.9.2
* @category Instance
*/
export const FromIO: FromIO1 = {
fromIO: (f) => Functor.map(f)(M.now(undefined)),
}
/**
* @since 0.9.2
* @category Constructor
*/
export const fromIO = FromIO.fromIO
const applyTask = (task: Task): types.Stream> => M.ap(M.now(task), M.now(void 0))
/**
* @since 0.9.2
* @category Instance
*/
export const FromTask: FromTask1 = {
...FromIO,
fromTask: flow(applyTask, M.awaitPromises),
}
/**
* @since 0.9.2
* @category Constructor
*/
export const fromTask = FromTask.fromTask
/**
* @since 0.9.2
* @category Instance
*/
export const FromResume: FRe.FromResume1 = {
fromResume: (resume) =>
M.newStream((sink, scheduler) => {
const run = () =>
pipe(
resume,
R.start((a) => {
sink.event(scheduler.currentTime(), a)
sink.end(scheduler.currentTime())
}),
)
const onError = (error: Error) => sink.error(scheduler.currentTime(), error)
return asap(createCallbackTask(run, onError), scheduler)
}),
}
/**
* @since 0.9.2
* @category Constructor
*/
export const fromResume = FromResume.fromResume
/**
* @since 0.9.2
* @category Combinator
*/
export const chainFirstResumeK = FRe.chainFirstResumeK(FromResume, Chain)
/**
* @since 0.9.2
* @category Combinator
*/
export const chainResumeK = FRe.chainResumeK(FromResume, Chain)
/**
* @since 0.9.2
* @category Constructor
*/
export const fromResumeK = FRe.fromResumeK(FromResume)
/**
* @since 0.9.2
* @category Instance
*/
export const Alt: Alt_.Alt1 = {
...Functor,
alt: (f) => (fa) => M.take(1, M.merge(fa, f())), // race the 2 streams
}
/**
* @since 0.9.2
* @category Combinator
*/
export const race = Alt.alt
/**
* @since 0.9.2
* @category Instance
*/
export const Alternative: Alternative1 = {
...Alt,
zero: M.empty,
}
/**
* @since 0.9.2
* @category Constructor
*/
export const zero = Alternative.zero
/**
* @since 0.9.2
* @category Instance
*/
export const Compactable: Compactable1 = {
compact,
separate,
}
/**
* @since 0.9.2
* @category Instance
*/
export const Filterable: Filterable1 = {
partitionMap,
partition,
filterMap,
filter: M.filter,
}
/**
* @since 0.9.2
* @category Constructor
*/
export const Do: types.Stream<{}> = pipe(null, M.now, M.map(Object.create))
/**
* @since 0.9.2
* @category Combinator
*/
export const bindTo = bindTo_(Functor)
/**
* @since 0.9.2
* @category Combinator
*/
export const tupled = tupled_(Functor)
const emptySink: types.Sink = {
event: constVoid,
error: constVoid,
end: constVoid,
}
/**
* @since 0.9.2
* @category Constructor
*/
export const createSink = (sink: Partial> = {}): types.Sink => ({
...emptySink,
...sink,
})
/**
* @since 0.9.2
* @category Combinator
*/
export const collectEvents =
(scheduler: types.Scheduler) =>
(stream: types.Stream) => {
const events: A[] = []
return M.runEffects(
M.tap((a) => events.push(a), stream),
scheduler,
).then(() => events as readonly A[])
}
/**
* @since 0.9.2
* @category Combinator
*/
export const onDispose =
(disposable: types.Disposable) =>
(stream: types.Stream): types.Stream =>
M.newStream((sink, scheduler) => disposeBoth(stream.run(sink, scheduler), disposable))
/**
* @since 0.9.2
* @category Combinator
*/
export const combineAll = []>(
...streams: A
): types.Stream<{ readonly [K in keyof A]: ValueOf }> =>
pipe(streams as any, M.combineArray(Array)) as any
/**
* @since 0.11.0
* @category Combinator
*/
export const combineStruct = (streams: { readonly [K in keyof A]: types.Stream }) =>
pipe(
combineAll(
...pipe(
Object.entries>(streams),
RA.map(([k, stream]) =>
pipe(
stream,
M.map((v) => S.make(k, v)),
),
),
),
),
M.map((o) => Object.assign({}, ...o) as A),
)
/**
* @since 0.9.2
* @category Combinator
*/
export const exhaustLatest = (stream: types.Stream>): types.Stream =>
new ExhaustLatest(stream)
/**
* @since 0.9.2
* @category Combinator
*/
export const exhaustMapLatest =
(f: (value: A) => types.Stream) =>
(stream: types.Stream) =>
pipe(stream, M.map(f), exhaustLatest)
class ExhaustLatest implements types.Stream {
constructor(readonly stream: types.Stream>) {}
run(sink: types.Sink, scheduler: types.Scheduler) {
const s = new ExhaustLatestSink(sink, scheduler)
return disposeBoth(this.stream.run(s, scheduler), s)
}
}
class ExhaustLatestSink implements types.Sink>, types.Disposable {
protected latest: O.Option> = O.none
protected disposable: types.Disposable = disposeNone()
protected sampling = false
protected shouldResample = false
protected finished = false
protected innerSink: types.Sink
constructor(readonly sink: types.Sink, readonly scheduler: types.Scheduler) {
this.innerSink = {
event: (t, a) => sink.event(t, a),
error: (t, e) => this.error(t, e),
end: (t) => {
this.sampling = false
if (this.shouldResample && O.isSome(this.latest)) {
this.event(scheduler.currentTime(), this.latest.value)
return
}
if (this.finished) {
this.dispose()
sink.end(t)
}
},
}
}
event = (t: types.Time, stream: types.Stream) => {
this.latest = O.some(stream)
if (this.sampling) {
this.shouldResample = true
return
}
this.sampling = true
this.shouldResample = false
this.disposable = stream.run(this.innerSink, schedulerRelativeTo(t, this.scheduler))
}
error = (t: types.Time, e: Error) => {
this.dispose()
this.sink.error(t, e)
}
end = (_: types.Time) => {
this.finished = true
}
dispose = () => {
this.disposable.dispose()
this.sampling = false
this.shouldResample = false
}
}
/**
* Using the provided Eq mergeMapWhen conditionally applies a Kliesli arrow
* to the values within an Array when they are added and any values removed
* from the array will be disposed of immediately
* @since 0.9.2
* @category Combinator
*/
export const mergeMapWhen =
(Eq: Eq = deepEqualsEq) =>
(f: (value: V) => types.Stream) =>
(values: Stream>): Stream> =>
new MergeMapWhen(Eq, f, values)
class MergeMapWhen implements types.Stream> {
constructor(
readonly Eq: Eq,
readonly f: (getValue: V) => types.Stream,
readonly values: Stream>,
) {}
run(sink: types.Sink>, scheduler: types.Scheduler): types.Disposable {
const s = new MergeMapWhenSink(sink, scheduler, this)
return disposeBoth(this.values.run(s, scheduler), s)
}
}
class MergeMapWhenSink implements types.Sink> {
readonly disposables = new Map()
readonly values = new Map()
readonly findDifference: (second: readonly V[]) => (first: readonly V[]) => readonly V[]
readonly findDisposable: (k: V) => O.Option
readonly findValue: (k: V) => O.Option
current: ReadonlyArray = []
finished = false
referenceCount = 0
disposable: types.Disposable = disposeNone()
constructor(
readonly sink: types.Sink>,
readonly scheduler: types.Scheduler,
readonly source: MergeMapWhen,
) {
this.findDisposable = (k: V) => RM.lookup(source.Eq)(k)(this.disposables)
this.findValue = (k: V) => RM.lookup(source.Eq)(k)(this.values)
this.findDifference = RA.difference(source.Eq)
}
event = (t: types.Time, values: ReadonlyArray) => {
const removed = pipe(this.current, this.findDifference(values))
const added = pipe(values, this.findDifference(this.current))
this.current = values
// Clean up all the removed keys
pipe(removed, RA.map(this.findDisposable), RA.compact, disposeAll).dispose()
removed.forEach((r) => this.values.delete(r))
// Subscribe to all the newly added values
added.forEach((a) => this.disposables.set(a, this.runInner(t, a)))
this.emitIfReady()
}
error = (t: types.Time, error: Error) => {
this.dispose()
this.sink.error(t, error)
}
end = (t: types.Time) => {
this.finished = true
this.endIfReady(t)
}
dispose = () => {
this.finished = true
this.disposables.forEach((d) => d.dispose())
this.disposables.clear()
this.values.clear()
this.current = []
}
runInner = (t: types.Time, v: V) =>
this.source.f(v).run(this.innerSink(v), schedulerRelativeTo(t, this.scheduler))
innerSink = (v: V): types.Sink => {
this.referenceCount++
return {
event: (_, a) => {
this.values.set(v, a)
this.emitIfReady()
},
error: (t, e) => this.error(t, e),
end: (t) => {
this.referenceCount--
this.endIfReady(t)
},
}
}
emitIfReady = () => {
const values = pipe(this.current, RA.map(this.findValue), RA.compact)
if (values.length === this.current.length) {
this.sink.event(this.scheduler.currentTime(), values)
}
}
endIfReady = (t: types.Time) => {
if (this.finished && this.referenceCount === 0) {
this.sink.end(t)
this.dispose()
}
}
}
/**
* @since 0.9.2
* @category Combinator
*/
export const keyed =
(Eq: Eq) =>
(stream: Stream>): Stream>> =>
new Keyed(Eq, stream)
class Keyed implements Stream>> {
constructor(readonly Eq: Eq, readonly stream: Stream>) {}
run(sink: types.Sink>>, scheduler: types.Scheduler): types.Disposable {
const s = new KeyedSink(this.Eq, sink, scheduler)
return disposeBoth(s, this.stream.run(s, scheduler))
}
}
class KeyedSink implements types.Sink>, types.Disposable {
adapters = new Map, endSignal: Adapter]>()
current: readonly A[] = []
findDifference: (second: readonly A[]) => (first: readonly A[]) => readonly A[]
findAdapter: (
k: A,
) => O.Option<
readonly [
values: readonly [(event: A) => void, types.Stream],
endSignal: readonly [(event: null) => void, types.Stream],
]
>
constructor(
readonly Eq: Eq,
readonly sink: types.Sink>>,
readonly scheduler: types.Scheduler,
) {
this.findDifference = RA.difference(Eq)
this.findAdapter = (k: A) => RM.lookup(Eq)(k)(this.adapters)
}
event = (t: types.Time, values: readonly A[]) => {
// Clean up after any removed values
const removed = pipe(this.current, this.findDifference(values))
removed.forEach((r) => {
// Send the end signal
this.getAdapter(r)[1][0](null)
// Delete
this.adapters.delete(r)
})
this.current = values
// Emit our latest set of streams
this.sink.event(
t,
values.map((a) => pipe(a, this.getAdapter, fst, snd)),
)
values.forEach((a) => pipe(a, this.getAdapter, fst, fst)(a))
}
getAdapter = (k: A) => {
return pipe(
k,
this.findAdapter,
O.getOrElseW(() => {
const endSignal = create()
const values = create(flow(M.startWith(k), M.until(endSignal[1]), M.multicast))
const adapter = [values, endSignal] as const
this.adapters.set(k, adapter)
return adapter
}),
)
}
error = (t: types.Time, err: Error) => {
this.dispose()
this.sink.error(t, err)
}
end = (t: types.Time) => {
this.dispose()
this.sink.end(t)
}
dispose = () => {
this.current = []
this.adapters.clear()
}
}
/**
* @since 0.9.2
* @category Combinator
*/
export const switchFirst =
(second: Stream) =>
(first: Stream): Stream =>
pipe(
first,
M.map((a) => pipe(second, M.constant(a), M.startWith(a))),
M.switchLatest,
)
/**
* @since 0.9.2
* @category Combinator
*/
export function mergeFirst(a: Stream) {
return (b: Stream): Stream =>
pipe(pipe(a, M.constant(O.none)), M.merge(pipe(b, M.map(O.some))), compact)
}
export * from '@most/core'
export * from '@most/types'