/** * Stream is an extension of @most/core with additional * fp-ts instances as well as additional combinators for interoperation with other data * structures in @typed/fp and fp-ts. * * A large goal of @typed/fp is to expand the `fp-ts` ecosystem to include * [@most/core](https://github.com/mostjs/core) for a Reactive programming style, including * derivatives such as [ReaderStream](./ReaderStream.ts.md), [ReaderStreamEither](./ReaderStreamEither.ts.md), * [StateReaderStreamEither](./StateReaderStreamEither.ts.md) and a few others. It's the fastest push-based * reactive library in JS period. The performance characteristics are due to it's architecture of getting out of * the way of the computations you need to perform. It's also the first experience I had with FP. For instance, * Most utilizes `Functor` laws to remove unneeded machinery through function composition improving runtime * performance amongst other optimizations. * * See the [@most/core Documentation](https://mostcore.readthedocs.io/en/latest/) for the remaining API * exposed by this module. Both @most/core + @most/types are re-exported from this module * @since 0.9.2 */ import * as M from '@most/core' import { disposeAll, disposeNone } from '@most/disposable' import { asap, schedulerRelativeTo } from '@most/scheduler' import * as types from '@most/types' import * as Alt_ from 'fp-ts/Alt' import { Alternative1 } from 'fp-ts/Alternative' import * as App from 'fp-ts/Applicative' import * as Ap from 'fp-ts/Apply' import * as CH from 'fp-ts/Chain' import { ChainRec1 } from 'fp-ts/ChainRec' import { Compactable1 } from 'fp-ts/Compactable' import { Either, isLeft, isRight, left, match, right } from 'fp-ts/Either' import { Eq } from 'fp-ts/Eq' import { Filterable1 } from 'fp-ts/Filterable' import { FromIO1 } from 'fp-ts/FromIO' import { FromTask1 } from 'fp-ts/FromTask' import { constVoid, flow, pipe } from 'fp-ts/function' import { bindTo as bindTo_, Functor1, tupled as tupled_ } from 'fp-ts/Functor' import { Monad1 } from 'fp-ts/Monad' import { Monoid } from 'fp-ts/Monoid' import * as O from 'fp-ts/Option' import { Pointed1 } from 'fp-ts/Pointed' import { Predicate } from 'fp-ts/Predicate' import * as RA from 'fp-ts/ReadonlyArray' import * as RM from 'fp-ts/ReadonlyMap' import { Separated } from 'fp-ts/Separated' import { Task } from 'fp-ts/Task' import { fst, snd } from 'fp-ts/Tuple2' import { Adapter, create } from './Adapter' import { disposeBoth, settable } from './Disposable' import { deepEqualsEq } from './Eq' import * as FRe from './FromResume' import { Arity1 } from './function' import * as R from './Resume' import * as S from './struct' /** * @since 0.9.2 * @category Model */ export type Stream = types.Stream /** * @since 0.9.2 * @category Type-level */ export type ValueOf = [A] extends [Stream] ? R : never /** * Convert an IO into a Most.js Task * @since 0.9.2 * @category Constructor */ export function createCallbackTask( cb: Arity1, onError?: (error: Error) => void, ): types.Task { const disposable = settable() return { run(t) { if (!disposable.isDisposed()) { disposable.addDisposable(cb(t)) } }, error(_, e) { disposable.dispose() if (onError) { return onError(e) } throw e }, dispose: disposable.dispose, } } /** * @since 0.9.2 * @category URI */ export const URI = '@most/core/Stream' as const /** * @since 0.9.2 * @category URI */ export type URI = typeof URI declare module 'fp-ts/HKT' { export interface URItoKind { [URI]: types.Stream } } /** * Create a Stream monoid where concat is a parallel merge. */ /** * @since 0.9.2 * @category Typeclass Constructor */ export const getMonoid = (): Monoid> => { return { concat: M.merge, empty: M.empty(), } } /** * Filter Option's from within a Stream */ /** * @since 0.9.2 * @category Combinator */ export const compact = (stream: types.Stream>): types.Stream => M.map((s: O.Some) => s.value, M.filter(O.isSome, stream)) /** * Separate left and right values */ /** * @since 0.9.2 * @category Combinator */ export const separate = ( stream: types.Stream>, ): Separated, types.Stream> => { const s = M.multicast(stream) const left = pipe( s, M.filter(isLeft), M.map((l) => l.left), ) const right = pipe( s, M.filter(isRight), M.map((r) => r.right), ) return { left, right } } /** * @since 0.9.2 * @category Combinator */ export const partitionMap = (f: (a: A) => Either) => (fa: types.Stream) => separate(M.map(f, fa)) /** * @since 0.9.2 * @category Combinator */ export const partition = (predicate: Predicate) => partitionMap((a: A) => (predicate(a) ? right(a) : left(a))) /** * @since 0.9.2 * @category Combinator */ export const filterMap = (f: (a: A) => O.Option) => (fa: types.Stream) => compact(M.map(f, fa)) /** * @since 0.9.2 * @category Instance */ export const Functor: Functor1 = { map: M.map, } /** * @since 0.9.2 * @category Instance */ export const Pointed: Pointed1 = { of: M.now, } /** * @since 0.9.2 * @category Constructor */ export const of = Pointed.of /** * @since 0.9.2 * @category Instance */ export const Apply: Ap.Apply1 = { ...Functor, ap: M.ap, } /** * @since 0.9.2 * @category Combinator */ export const apFirst = Ap.apFirst(Apply) /** * @since 0.9.2 * @category Combinator */ export const apS = Ap.apS(Apply) /** * @since 0.9.2 * @category Combinator */ export const apSecond = Ap.apSecond(Apply) /** * @since 0.9.2 * @category Combinator */ export const apT = Ap.apT(Apply) /** * @since 0.9.2 * @category Typeclass Constructor */ export const getApplySemigroup = Ap.getApplySemigroup(Apply) /** * @since 0.9.2 * @category Instance */ export const Applicative: App.Applicative1 = { ...Apply, ...Pointed, } /** * @since 0.9.2 * @category Typeclass Constructor */ export const getApplicativeMonoid = App.getApplicativeMonoid(Applicative) /** * @since 0.9.2 * @category Instance */ export const Chain: CH.Chain1 = { ...Functor, chain: M.chain, } /** * @since 0.9.2 * @category Combinator */ export const chainFirst = CH.chainFirst(Chain) /** * @since 0.9.2 * @category Combinator */ export const bind = CH.bind(Chain) /** * @since 0.9.2 * @category Instance */ export const Monad: Monad1 = { ...Chain, ...Pointed, } /** * @since 0.9.2 * @category Combinator */ export const chainRec = (f: (value: A) => types.Stream>) => (value: A): types.Stream => pipe(value, f, M.delay(0), M.chain(match(flow(chainRec(f)), M.now))) /** * @since 0.9.2 * @category Instance */ export const ChainRec: ChainRec1 = { chainRec, } /** * @since 0.9.2 * @category Combinator */ export const switchRec = (f: (value: A) => types.Stream>) => (value: A): types.Stream => pipe(value, f, M.map(match(switchRec(f), M.now)), M.switchLatest) /** * @since 0.9.2 * @category Instance */ export const SwitchRec: ChainRec1 = { chainRec: switchRec, } /** * @since 0.9.2 * @category Combinator */ export const mergeConcurrentlyRec = (concurrency: number) => (f: (value: A) => types.Stream>) => (value: A): types.Stream => pipe( value, f, M.map(match(mergeConcurrentlyRec(concurrency)(f), M.now)), M.mergeConcurrently(concurrency), ) /** * @since 0.9.2 * @category Typeclass Constructor */ export const getConcurrentChainRec = (concurrency: number): ChainRec1 => ({ chainRec: mergeConcurrentlyRec(concurrency), }) /** * @since 0.9.2 * @category Instance */ export const FromIO: FromIO1 = { fromIO: (f) => Functor.map(f)(M.now(undefined)), } /** * @since 0.9.2 * @category Constructor */ export const fromIO = FromIO.fromIO const applyTask = (task: Task): types.Stream> => M.ap(M.now(task), M.now(void 0)) /** * @since 0.9.2 * @category Instance */ export const FromTask: FromTask1 = { ...FromIO, fromTask: flow(applyTask, M.awaitPromises), } /** * @since 0.9.2 * @category Constructor */ export const fromTask = FromTask.fromTask /** * @since 0.9.2 * @category Instance */ export const FromResume: FRe.FromResume1 = { fromResume: (resume) => M.newStream((sink, scheduler) => { const run = () => pipe( resume, R.start((a) => { sink.event(scheduler.currentTime(), a) sink.end(scheduler.currentTime()) }), ) const onError = (error: Error) => sink.error(scheduler.currentTime(), error) return asap(createCallbackTask(run, onError), scheduler) }), } /** * @since 0.9.2 * @category Constructor */ export const fromResume = FromResume.fromResume /** * @since 0.9.2 * @category Combinator */ export const chainFirstResumeK = FRe.chainFirstResumeK(FromResume, Chain) /** * @since 0.9.2 * @category Combinator */ export const chainResumeK = FRe.chainResumeK(FromResume, Chain) /** * @since 0.9.2 * @category Constructor */ export const fromResumeK = FRe.fromResumeK(FromResume) /** * @since 0.9.2 * @category Instance */ export const Alt: Alt_.Alt1 = { ...Functor, alt: (f) => (fa) => M.take(1, M.merge(fa, f())), // race the 2 streams } /** * @since 0.9.2 * @category Combinator */ export const race = Alt.alt /** * @since 0.9.2 * @category Instance */ export const Alternative: Alternative1 = { ...Alt, zero: M.empty, } /** * @since 0.9.2 * @category Constructor */ export const zero = Alternative.zero /** * @since 0.9.2 * @category Instance */ export const Compactable: Compactable1 = { compact, separate, } /** * @since 0.9.2 * @category Instance */ export const Filterable: Filterable1 = { partitionMap, partition, filterMap, filter: M.filter, } /** * @since 0.9.2 * @category Constructor */ export const Do: types.Stream<{}> = pipe(null, M.now, M.map(Object.create)) /** * @since 0.9.2 * @category Combinator */ export const bindTo = bindTo_(Functor) /** * @since 0.9.2 * @category Combinator */ export const tupled = tupled_(Functor) const emptySink: types.Sink = { event: constVoid, error: constVoid, end: constVoid, } /** * @since 0.9.2 * @category Constructor */ export const createSink = (sink: Partial> = {}): types.Sink => ({ ...emptySink, ...sink, }) /** * @since 0.9.2 * @category Combinator */ export const collectEvents = (scheduler: types.Scheduler) => (stream: types.Stream) => { const events: A[] = [] return M.runEffects( M.tap((a) => events.push(a), stream), scheduler, ).then(() => events as readonly A[]) } /** * @since 0.9.2 * @category Combinator */ export const onDispose = (disposable: types.Disposable) => (stream: types.Stream): types.Stream => M.newStream((sink, scheduler) => disposeBoth(stream.run(sink, scheduler), disposable)) /** * @since 0.9.2 * @category Combinator */ export const combineAll = []>( ...streams: A ): types.Stream<{ readonly [K in keyof A]: ValueOf }> => pipe(streams as any, M.combineArray(Array)) as any /** * @since 0.11.0 * @category Combinator */ export const combineStruct = (streams: { readonly [K in keyof A]: types.Stream }) => pipe( combineAll( ...pipe( Object.entries>(streams), RA.map(([k, stream]) => pipe( stream, M.map((v) => S.make(k, v)), ), ), ), ), M.map((o) => Object.assign({}, ...o) as A), ) /** * @since 0.9.2 * @category Combinator */ export const exhaustLatest = (stream: types.Stream>): types.Stream => new ExhaustLatest(stream) /** * @since 0.9.2 * @category Combinator */ export const exhaustMapLatest = (f: (value: A) => types.Stream) => (stream: types.Stream) => pipe(stream, M.map(f), exhaustLatest) class ExhaustLatest implements types.Stream { constructor(readonly stream: types.Stream>) {} run(sink: types.Sink, scheduler: types.Scheduler) { const s = new ExhaustLatestSink(sink, scheduler) return disposeBoth(this.stream.run(s, scheduler), s) } } class ExhaustLatestSink implements types.Sink>, types.Disposable { protected latest: O.Option> = O.none protected disposable: types.Disposable = disposeNone() protected sampling = false protected shouldResample = false protected finished = false protected innerSink: types.Sink constructor(readonly sink: types.Sink, readonly scheduler: types.Scheduler) { this.innerSink = { event: (t, a) => sink.event(t, a), error: (t, e) => this.error(t, e), end: (t) => { this.sampling = false if (this.shouldResample && O.isSome(this.latest)) { this.event(scheduler.currentTime(), this.latest.value) return } if (this.finished) { this.dispose() sink.end(t) } }, } } event = (t: types.Time, stream: types.Stream) => { this.latest = O.some(stream) if (this.sampling) { this.shouldResample = true return } this.sampling = true this.shouldResample = false this.disposable = stream.run(this.innerSink, schedulerRelativeTo(t, this.scheduler)) } error = (t: types.Time, e: Error) => { this.dispose() this.sink.error(t, e) } end = (_: types.Time) => { this.finished = true } dispose = () => { this.disposable.dispose() this.sampling = false this.shouldResample = false } } /** * Using the provided Eq mergeMapWhen conditionally applies a Kliesli arrow * to the values within an Array when they are added and any values removed * from the array will be disposed of immediately * @since 0.9.2 * @category Combinator */ export const mergeMapWhen = (Eq: Eq = deepEqualsEq) => (f: (value: V) => types.Stream) => (values: Stream>): Stream> => new MergeMapWhen(Eq, f, values) class MergeMapWhen implements types.Stream> { constructor( readonly Eq: Eq, readonly f: (getValue: V) => types.Stream, readonly values: Stream>, ) {} run(sink: types.Sink>, scheduler: types.Scheduler): types.Disposable { const s = new MergeMapWhenSink(sink, scheduler, this) return disposeBoth(this.values.run(s, scheduler), s) } } class MergeMapWhenSink implements types.Sink> { readonly disposables = new Map() readonly values = new Map() readonly findDifference: (second: readonly V[]) => (first: readonly V[]) => readonly V[] readonly findDisposable: (k: V) => O.Option readonly findValue: (k: V) => O.Option current: ReadonlyArray = [] finished = false referenceCount = 0 disposable: types.Disposable = disposeNone() constructor( readonly sink: types.Sink>, readonly scheduler: types.Scheduler, readonly source: MergeMapWhen, ) { this.findDisposable = (k: V) => RM.lookup(source.Eq)(k)(this.disposables) this.findValue = (k: V) => RM.lookup(source.Eq)(k)(this.values) this.findDifference = RA.difference(source.Eq) } event = (t: types.Time, values: ReadonlyArray) => { const removed = pipe(this.current, this.findDifference(values)) const added = pipe(values, this.findDifference(this.current)) this.current = values // Clean up all the removed keys pipe(removed, RA.map(this.findDisposable), RA.compact, disposeAll).dispose() removed.forEach((r) => this.values.delete(r)) // Subscribe to all the newly added values added.forEach((a) => this.disposables.set(a, this.runInner(t, a))) this.emitIfReady() } error = (t: types.Time, error: Error) => { this.dispose() this.sink.error(t, error) } end = (t: types.Time) => { this.finished = true this.endIfReady(t) } dispose = () => { this.finished = true this.disposables.forEach((d) => d.dispose()) this.disposables.clear() this.values.clear() this.current = [] } runInner = (t: types.Time, v: V) => this.source.f(v).run(this.innerSink(v), schedulerRelativeTo(t, this.scheduler)) innerSink = (v: V): types.Sink => { this.referenceCount++ return { event: (_, a) => { this.values.set(v, a) this.emitIfReady() }, error: (t, e) => this.error(t, e), end: (t) => { this.referenceCount-- this.endIfReady(t) }, } } emitIfReady = () => { const values = pipe(this.current, RA.map(this.findValue), RA.compact) if (values.length === this.current.length) { this.sink.event(this.scheduler.currentTime(), values) } } endIfReady = (t: types.Time) => { if (this.finished && this.referenceCount === 0) { this.sink.end(t) this.dispose() } } } /** * @since 0.9.2 * @category Combinator */ export const keyed = (Eq: Eq) => (stream: Stream>): Stream>> => new Keyed(Eq, stream) class Keyed implements Stream>> { constructor(readonly Eq: Eq, readonly stream: Stream>) {} run(sink: types.Sink>>, scheduler: types.Scheduler): types.Disposable { const s = new KeyedSink(this.Eq, sink, scheduler) return disposeBoth(s, this.stream.run(s, scheduler)) } } class KeyedSink implements types.Sink>, types.Disposable { adapters = new Map, endSignal: Adapter]>() current: readonly A[] = [] findDifference: (second: readonly A[]) => (first: readonly A[]) => readonly A[] findAdapter: ( k: A, ) => O.Option< readonly [ values: readonly [(event: A) => void, types.Stream], endSignal: readonly [(event: null) => void, types.Stream], ] > constructor( readonly Eq: Eq, readonly sink: types.Sink>>, readonly scheduler: types.Scheduler, ) { this.findDifference = RA.difference(Eq) this.findAdapter = (k: A) => RM.lookup(Eq)(k)(this.adapters) } event = (t: types.Time, values: readonly A[]) => { // Clean up after any removed values const removed = pipe(this.current, this.findDifference(values)) removed.forEach((r) => { // Send the end signal this.getAdapter(r)[1][0](null) // Delete this.adapters.delete(r) }) this.current = values // Emit our latest set of streams this.sink.event( t, values.map((a) => pipe(a, this.getAdapter, fst, snd)), ) values.forEach((a) => pipe(a, this.getAdapter, fst, fst)(a)) } getAdapter = (k: A) => { return pipe( k, this.findAdapter, O.getOrElseW(() => { const endSignal = create() const values = create(flow(M.startWith(k), M.until(endSignal[1]), M.multicast)) const adapter = [values, endSignal] as const this.adapters.set(k, adapter) return adapter }), ) } error = (t: types.Time, err: Error) => { this.dispose() this.sink.error(t, err) } end = (t: types.Time) => { this.dispose() this.sink.end(t) } dispose = () => { this.current = [] this.adapters.clear() } } /** * @since 0.9.2 * @category Combinator */ export const switchFirst = (second: Stream) => (first: Stream): Stream => pipe( first, M.map((a) => pipe(second, M.constant(a), M.startWith(a))), M.switchLatest, ) /** * @since 0.9.2 * @category Combinator */ export function mergeFirst(a: Stream) { return (b: Stream): Stream => pipe(pipe(a, M.constant(O.none)), M.merge(pipe(b, M.map(O.some))), compact) } export * from '@most/core' export * from '@most/types'