/** * ReaderStream is a ReaderT of Most.js' Stream. * @since 0.9.2 */ import { SeedValue as SV } from '@most/core/dist/combinator/loop' import * as H from '@most/hold' import * as App from 'fp-ts/Applicative' import * as Ap from 'fp-ts/Apply' import * as Ch from 'fp-ts/Chain' import { ChainRec2 } from 'fp-ts/ChainRec' import { Compactable2 } from 'fp-ts/Compactable' import { Either, isLeft, isRight } from 'fp-ts/Either' import { Eq } from 'fp-ts/Eq' import * as Filterable_ from 'fp-ts/Filterable' import * as FIO from 'fp-ts/FromIO' import * as FR from 'fp-ts/FromReader' import * as FT from 'fp-ts/FromTask' import { pipe } from 'fp-ts/function' import * as F from 'fp-ts/Functor' import { IO } from 'fp-ts/IO' import { Monad2 } from 'fp-ts/Monad' import { Pointed2 } from 'fp-ts/Pointed' import { not, Predicate } from 'fp-ts/Predicate' import * as Re from 'fp-ts/Reader' import * as RT from 'fp-ts/ReaderT' import * as RA from 'fp-ts/ReadonlyArray' import { Refinement } from 'fp-ts/Refinement' import { Separated } from 'fp-ts/Separated' import { Task } from 'fp-ts/Task' import { settable } from './Disposable' import * as E from './Env' import { deepEqualsEq } from './Eq' import * as FE from './FromEnv' import * as FRe from './FromResume' import * as FS from './FromStream' import * as FN from './function' import { Intersect } from './HKT' import { MonadRec2 } from './MonadRec' import * as O from './Option' import * as P from './Provide' import { async } from './Resume' import { SchedulerEnv } from './Scheduler' import * as S from './Stream' import * as St from './struct' /** * Env is specialization of Reader> * @since 0.9.2 * @category Model */ export interface ReaderStream extends Re.Reader> {} /** * @since 0.9.2 * @category Type-level */ export type RequirementsOf = [A] extends [ReaderStream] ? R : never /** * @since 0.9.2 * @category Type-level */ export type ValueOf = [A] extends [ReaderStream] ? R : never /** * @since 0.9.2 * @category Combinator */ export const ap = RT.ap(S.Apply) /** * @since 0.9.2 * @category Combinator */ export const apW = ap as ( fa: ReaderStream, ) => (fab: ReaderStream>) => ReaderStream /** * @since 0.9.2 * @category Combinator */ export const chain = RT.chain(S.Chain) /** * @since 0.9.2 * @category Combinator */ export const chainW = chain as ( f: (a: A) => ReaderStream, ) => (ma: ReaderStream) => ReaderStream /** * @since 0.9.2 * @category Combinator */ export const switchMap = RT.chain({ map: S.map, chain: (f) => FN.flow(S.map(f), S.switchLatest), }) /** * @since 0.9.2 * @category Combinator */ export const switchMapW = switchMap as ( f: (a: A) => ReaderStream, ) => (ma: ReaderStream) => ReaderStream /** * @since 0.9.2 * @category Combinator */ export const switchFirst = (second: ReaderStream) => (first: ReaderStream): ReaderStream => (r) => pipe(first, withStream(S.switchFirst(second(r))))(r) /** * @since 0.9.2 * @category Constructor */ export const fromReader: (ma: Re.Reader) => ReaderStream = RT.fromReader( S.Pointed, ) /** * @since 0.9.2 * @category Combinator */ export const map: (f: (a: A) => B) => (fa: ReaderStream) => ReaderStream = RT.map(S.Functor) /** * @since 0.9.2 * @category Combinator */ export const constant = (b: B) => map(() => b) /** * @since 0.9.2 * @category Constructor */ export const of: (a: A) => ReaderStream = RT.of(S.Pointed) /** * @since 0.9.2 * @category Combinator */ export function chainRec( f: (value: A) => ReaderStream>, ): (value: A) => ReaderStream { return (value) => (env) => S.chainRec((a: A) => f(a)(env))(value) } /** * @since 0.9.2 * @category URI */ export const URI = '@typed/fp/ReaderStream' /** * @since 0.9.2 * @category URI */ export type URI = typeof URI declare module 'fp-ts/HKT' { export interface URItoKind2 { [URI]: ReaderStream } } declare module './HKT' { export interface URItoVariance { [URI]: V } } /** * @since 0.9.2 * @category Instance */ export const Pointed: Pointed2 = { of, } /** * @since 0.9.2 * @category Instance */ export const Functor: F.Functor2 = { map, } /** * @since 0.9.2 * @category Combinator */ export const bindTo = F.bindTo(Functor) /** * @since 0.9.2 * @category Combinator */ export const flap = F.flap(Functor) /** * @since 0.9.2 * @category Combinator */ export const tupled = F.tupled(Functor) /** * @since 0.9.2 * @category Instance */ export const Apply: Ap.Apply2 = { ...Functor, ap, } /** * @since 0.9.2 * @category Combinator */ export const apFirst = Ap.apFirst(Apply) /** * @since 0.9.2 * @category Combinator */ export const apFirstW = apFirst as ( second: ReaderStream, ) => (first: ReaderStream) => ReaderStream /** * @since 0.9.2 * @category Combinator */ export const apS = Ap.apS(Apply) /** * @since 0.9.2 * @category Combinator */ export const apSW = apS as ( name: Exclude, fb: ReaderStream, ) => ( fa: ReaderStream, ) => ReaderStream /** * @since 0.9.2 * @category Combinator */ export const apSecond = Ap.apSecond(Apply) /** * @since 0.9.2 * @category Combinator */ export const apSecondW = apSecond as ( second: ReaderStream, ) => (first: ReaderStream) => ReaderStream /** * @since 0.9.2 * @category Combinator */ export const apT = Ap.apT(Apply) /** * @since 0.9.2 * @category Combinator */ export const apTW = apT as ( fb: ReaderStream, ) => ( fas: ReaderStream, ) => ReaderStream /** * @since 0.9.2 * @category Typeclass Constructor */ export const getApplySemigroup = Ap.getApplySemigroup(Apply) /** * @since 0.9.2 * @category Combinator */ export const apSEnv: ( name: Exclude, fb: E.Env, ) => ( fa: ReaderStream, ) => ReaderStream = (name, fb) => apS(name, FN.pipe(fb, fromEnv)) /** * @since 0.9.2 * @category Combinator */ export const apSEnvW = apSEnv as ( name: Exclude, fb: E.Env, ) => ( fa: ReaderStream, ) => ReaderStream /** * @since 0.9.2 * @category Combinator */ export const apTEnvW: ( fb: E.Env, ) => ( fas: ReaderStream, ) => ReaderStream = (fb) => FN.pipe(fb, fromEnv, apTW) /** * @since 0.9.2 * @category Combinator */ export const apTEnv: ( fb: E.Env, ) => ( fas: ReaderStream, ) => ReaderStream = apTEnvW /** * @since 0.9.2 * @category Instance */ export const Applicative: App.Applicative2 = { ...Apply, ...Pointed, } /** * @since 0.9.2 * @category Typeclass Constructor */ export const getApplicativeMonoid = App.getApplicativeMonoid(Applicative) /** * @since 0.9.2 * @category Instance */ export const Chain: Ch.Chain2 = { ...Functor, chain, } /** * @since 0.9.2 * @category Combinator */ export const chainFirst = Ch.chainFirst(Chain) /** * @since 0.9.2 * @category Combinator */ export const chainFirstW = chainFirst as ( f: (a: A) => ReaderStream, ) => (first: ReaderStream) => ReaderStream /** * @since 0.9.2 * @category Combinator */ export const bind = Ch.bind(Chain) /** * @since 0.9.2 * @category Combinator */ export const bindW = bind as ( name: Exclude, f: (a: A) => ReaderStream, ) => ( ma: ReaderStream, ) => ReaderStream /** * @since 0.9.2 * @category Combinator */ export const bindEnv: ( name: Exclude, f: (a: A) => E.Env, ) => ( ma: ReaderStream, ) => ReaderStream = (name, f) => bind(name, FN.flow(f, fromEnv)) /** * @since 0.9.2 * @category Combinator */ export const bindEnvW: ( name: Exclude, f: (a: A) => E.Env, ) => ( ma: ReaderStream, ) => ReaderStream = ( name, f, ) => bindW(name, FN.flow(f, fromEnv)) /** * @since 0.9.2 * @category Instance */ export const Monad: Monad2 = { ...Chain, ...Pointed, } /** * @since 0.9.2 * @category Instance */ export const ChainRec: ChainRec2 = { chainRec, } /** * @since 0.9.2 * @category Instance */ export const MonadRec: MonadRec2 = { ...Monad, chainRec, } /** * @since 0.9.2 * @category Instance */ export const FromReader: FR.FromReader2 = { fromReader, } /** * @since 0.9.2 * @category Constructor */ export const ask = FR.ask(FromReader) /** * @since 0.9.2 * @category Constructor */ export const asks = FR.asks(FromReader) /** * @since 0.9.2 * @category Combinator */ export const chainFirstReaderK = FR.chainFirstReaderK(FromReader, Chain) /** * @since 0.9.2 * @category Combinator */ export const chainReaderK = FR.chainReaderK(FromReader, Chain) /** * @since 0.9.2 * @category Constructor */ export const fromReaderK = FR.fromReaderK(FromReader) /** * @since 0.9.2 * @category Instance */ export const FromResume: FRe.FromResume2 = { fromResume: (r) => () => S.fromResume(r), } /** * @since 0.9.2 * @category Constructor */ export const fromResume = FromResume.fromResume /** * @since 0.9.2 * @category Combinator */ export const chainFirstResumeK = FRe.chainFirstResumeK(FromResume, Chain) /** * @since 0.9.2 * @category Combinator */ export const chainResumeK = FRe.chainResumeK(FromResume, Chain) /** * @since 0.9.2 * @category Constructor */ export const fromResumeK = FRe.fromResumeK(FromResume) /** * @since 0.9.2 * @category Instance */ export const FromEnv: FE.FromEnv2 = { fromEnv: (env) => FN.flow(env, S.fromResume), } /** * @since 0.9.2 * @category Constructor */ export const fromEnv = FromEnv.fromEnv /** * @since 0.9.2 * @category Combinator */ export const chainEnvK = FE.chainEnvK(FromEnv, Chain) /** * @since 0.9.2 * @category Combinator */ export const chainFirstEnvK = FE.chainFirstEnvK(FromEnv, Chain) /** * @since 0.9.2 * @category Constructor */ export const fromEnvK = FE.fromEnvK(FromEnv) /** * @since 0.9.2 * @category Instance */ export const FromIO: FIO.FromIO2 = { fromIO: (io) => () => S.fromIO(io), } /** * @since 0.9.2 * @category Constructor */ export const fromIO = FromIO.fromIO /** * @since 0.9.2 * @category Combinator */ export const chainFirstIOK = FIO.chainFirstIOK(FromIO, Chain) /** * @since 0.9.2 * @category Combinator */ export const chainIOK = FIO.chainIOK(FromIO, Chain) /** * @since 0.9.2 * @category Constructor */ export const fromIOK = FIO.fromIOK(FromIO) /** * @since 0.9.2 * @category Constructor */ export const Do = fromIO((): {} => Object.create(null)) /** * @since 0.9.2 * @category Instance */ export const FromTask: FT.FromTask2 = { ...FromIO, fromTask: (task) => () => S.fromTask(task), } /** * @since 0.9.2 * @category Constructor */ export const fromTask = FromTask.fromTask /** * @since 0.9.2 * @category Combinator */ export const chainFirstTaskK = FT.chainFirstTaskK(FromTask, Chain) /** * @since 0.9.2 * @category Combinator */ export const chainTaskK = FT.chainTaskK(FromTask, Chain) /** * @since 0.9.2 * @category Constructor */ export const fromTaskK = FT.fromTaskK(FromTask) /** * @since 0.9.2 * @category Instance */ export const FromStream: FS.FromStream2 = { fromStream: FN.constant, } /** * @since 0.9.2 * @category Constructor */ export const fromStream = FromStream.fromStream /** * @since 0.9.2 * @category Combinator */ export const chainFirstStreamK = FS.chainFirstStreamK(FromStream, Chain) /** * @since 0.9.2 * @category Combinator */ export const chainStreamK = FS.chainStreamK(FromStream, Chain) /** * @since 0.9.2 * @category Constructor */ export const fromStreamK = FS.fromStreamK(FromStream) /** * @since 0.9.2 * @category Constructor */ export const asksEnv = (f: (e1: E1) => E.Env): ReaderStream => (r) => FN.pipe(r, f(r), S.fromResume) /** * @since 0.9.2 * @category Constructor */ export const asksIO = (f: (e1: E1) => IO): ReaderStream => (r) => FN.pipe(r, f, S.fromIO) /** * @since 0.9.2 * @category Constructor */ export const asksTask = (f: (e1: E1) => Task): ReaderStream => (r) => FN.pipe(r, f, S.fromTask) /** * @since 0.9.2 * @category Combinator */ export function filter( refinement: Refinement, ): (rs: ReaderStream) => ReaderStream export function filter( predicate: Predicate, ): (rs: ReaderStream) => ReaderStream export function filter(predicate: Predicate) { return (rs: ReaderStream): ReaderStream => (r) => FN.pipe(r, rs, S.filter(predicate)) } /** * @since 0.9.2 * @category Combinator */ export function merge(a: ReaderStream) { return (b: ReaderStream): ReaderStream => (r) => FN.pipe(a(r), S.merge(b(r))) } /** * @since 0.9.2 * @category Combinator */ export function mergeFirst(a: ReaderStream) { return (b: ReaderStream): ReaderStream => (r) => FN.pipe(r, b, S.mergeFirst(a(r))) } /** * @since 0.9.2 * @category Combinator */ export function mergeArray>>( streams: A, ): ReaderStream }>, ValueOf> { return (r) => S.mergeArray(streams.map((rs) => rs(r))) } /** * @since 0.9.2 * @category Combinator */ export function concatMap(f: (value: A) => ReaderStream) { return (rs: ReaderStream): ReaderStream => (r) => FN.pipe( r, rs, S.concatMap((a) => FN.pipe(r, f(a))), ) } /** * @since 0.9.2 * @category Combinator */ export const recoverWith = (f: (error: Error) => ReaderStream) => (rs: ReaderStream): ReaderStream => (r) => FN.pipe( r, rs, S.recoverWith((e) => FN.pipe(r, f(e))), ) /** * @since 0.9.2 * @category Constructor */ export const empty = fromStreamK(S.empty) /** * @since 0.9.2 * @category Constructor */ export const never = fromStreamK(S.never) /** * @since 0.9.2 * @category Constructor */ export const periodic = fromStreamK(S.periodic) /** * @since 0.9.2 * @category Combinator */ export const provideSome = (provided: E1) => (rs: ReaderStream): ReaderStream => (e2) => rs({ ...provided, ...e2 }) /** * @since 0.9.2 * @category Combinator */ export const useSome = (provided: E1) => (rs: ReaderStream): ReaderStream => (e2) => rs({ ...e2, ...provided }) /** * @since 0.9.2 * @category Combinator */ export const provideAll = (provided: E1) => (rs: ReaderStream): ReaderStream => (e2) => rs({ ...provided, ...(e2 as any) }) /** * @since 0.9.2 * @category Combinator */ export const useAll = (provided: E1) => (rs: ReaderStream): ReaderStream => () => rs(provided) /** * @since 0.9.2 * @category Instance */ export const ProvideSome: P.ProvideSome2 = { provideSome, } /** * @since 0.9.2 * @category Instance */ export const UseSome: P.UseSome2 = { useSome, } /** * @since 0.9.2 * @category Instance */ export const ProvideAll: P.ProvideAll2 = { provideAll, } /** * @since 0.9.2 * @category Instance */ export const UseAll: P.UseAll2 = { useAll, } /** * @since 0.9.2 * @category Instance */ export const Provide: P.Provide2 = { provideAll, provideSome, useAll, useSome, } /** * @since 0.9.2 * @category Combinator */ export const askAndProvide = P.askAndProvide({ ...ProvideAll, ...Chain, ...FromReader }) /** * @since 0.9.2 * @category Combinator */ export const askAndUse = P.askAndUse({ ...UseAll, ...Chain, ...FromReader }) /** * @since 0.9.2 * @category Combinator */ export const provideAllWith = P.provideAllWith({ ...ProvideAll, ...Chain }) /** * @since 0.9.2 * @category Combinator */ export const provideSomeWith = P.provideSomeWith({ ...ProvideSome, ...Chain }) /** * @since 0.9.2 * @category Combinator */ export const useAllWith = P.useAllWith({ ...UseAll, ...Chain }) /** * @since 0.9.2 * @category Combinator */ export const useSomeWith = P.useSomeWith({ ...UseSome, ...Chain }) /** * @since 0.9.2 * @category Combinator */ export const provideSomeWithEnv = FE.provideSomeWithEnv({ ...FromEnv, ...ProvideSome, ...Chain }) /** * @since 0.9.2 * @category Combinator */ export const provideAllWithEnv = FE.provideAllWithEnv({ ...FromEnv, ...ProvideAll, ...Chain }) /** * @since 0.9.2 * @category Combinator */ export const useSomeWithEnv = FE.useSomeWithEnv({ ...FromEnv, ...UseSome, ...Chain }) /** * @since 0.9.2 * @category Combinator */ export const useAllWithEnv = FE.useAllWithEnv({ ...FromEnv, ...UseAll, ...Chain }) /** * @since 0.9.2 * @category Combinator */ export const combine = (f: (a: A, b: B) => C) => (rsa: ReaderStream) => (rsb: ReaderStream): ReaderStream => (e) => S.combine(f, rsa(e), rsb(e)) /** * @since 0.9.2 * @category Combinator */ export const combineAll = []>( ...rss: A ): ReaderStream< Intersect<{ readonly [K in keyof A]: RequirementsOf }>, { readonly [K in keyof A]: ValueOf } > => (e) => S.combineAll(...rss.map((rs) => rs(e))) /** * @since 0.13.2 * @category Combinator */ export const struct = >>>( props: Props, ) => pipe( combineAll( ...pipe( Object.entries(props), RA.map(([k, stream]) => pipe( stream, map((v) => St.make(k, v)), ), ), ), ), map((o) => Object.assign({}, ...o) as { readonly [K in keyof Props]: ValueOf }), ) /** * @since 0.9.2 * @category Combinator */ export const withStream = (f: (stream: S.Stream) => B) => (rs: ReaderStream): Re.Reader => (e) => FN.pipe(e, rs, f) /** * @since 0.9.2 * @category Combinator */ export const tap = (f: (value: A) => any) => (rs: ReaderStream): ReaderStream => FN.pipe(rs, withStream(S.tap(f))) /** * @since 0.9.2 * @category Combinator */ export const take: (n: number) => (rs: ReaderStream) => ReaderStream = FN.flow( S.take, withStream, ) /** * @since 0.9.2 * @category Combinator */ export const skip: (n: number) => (rs: ReaderStream) => ReaderStream = FN.flow( S.skip, withStream, ) /** * @since 0.9.2 * @category Combinator */ export const startWith = (value: A) => (stream: ReaderStream): ReaderStream => withStream(S.startWith(value))(stream) /** * @since 0.9.2 * @category Combinator */ export const exhaustLatest = (rs: ReaderStream>): ReaderStream => (e) => S.exhaustMapLatest((rs: ReaderStream) => rs(e))(rs(e)) /** * @since 0.9.2 * @category Combinator */ export const exhaustMapLatest = (f: (value: A) => ReaderStream) => (rs: ReaderStream): ReaderStream => (e) => S.exhaustMapLatest((a: A) => f(a)(e))(rs(e)) /** * @since 0.9.2 * @category Combinator */ export const exhaustLatestEnv = (env: E.Env) => (rs: ReaderStream): ReaderStream => exhaustMapLatest(() => fromEnv(env))(rs) /** * @since 0.9.2 * @category Combinator */ export const exhaustMapLatestEnv = (f: (value: A) => E.Env) => (rs: ReaderStream): ReaderStream => exhaustMapLatest((a: A) => fromEnv(f(a)))(rs) /** * @since 0.9.2 * @category Combinator */ export const onDispose = ( disposable: S.Disposable, ): ((rs: ReaderStream) => ReaderStream) => withStream(S.onDispose(disposable)) /** * @since 0.9.2 * @category Combinator */ export const collectEvents = (scheduler: S.Scheduler) => (rs: ReaderStream): Re.Reader> => FN.pipe(rs, withStream(S.collectEvents(scheduler))) /** * @since 0.9.2 * @category Constructor */ export const now = FN.flow(S.now, fromStream) /** * @since 0.9.2 * @category Constructor */ export const at = FN.flow(S.at, fromStream) /** * @since 0.9.2 * @category Combinator */ export const scan = (f: (acc: A, value: B) => A, seed: A) => (rs: ReaderStream): ReaderStream => FN.pipe(rs, withStream(S.scan(f, seed))) /** * @since 0.9.2 * @category Combinator */ export const skipRepeatsWith = (Eq: Eq) => (rs: ReaderStream): ReaderStream => FN.pipe(rs, withStream(S.skipRepeatsWith((a, b) => Eq.equals(a)(b)))) /** * @since 0.9.2 * @category Combinator */ export const skipRepeats: (rs: ReaderStream) => ReaderStream = skipRepeatsWith(deepEqualsEq) /** * @since 0.9.2 * @category Combinator */ export const compact: (rs: ReaderStream>) => ReaderStream = withStream( S.compact, ) /** * @since 0.9.2 * @category Combinator */ export const continueWith = (f: () => ReaderStream) => (rs: ReaderStream): ReaderStream => (e) => FN.pipe( e, rs, S.continueWith(() => f()(e)), ) /** * @since 0.9.2 * @category Combinator */ export const debounce = (delay: S.Time) => (rs: ReaderStream): ReaderStream => FN.pipe(rs, withStream(S.debounce(delay))) /** * @since 0.9.2 * @category Combinator */ export const delay = (delay: S.Time) => (rs: ReaderStream): ReaderStream => FN.pipe(rs, withStream(S.delay(delay))) /** * @since 0.9.2 * @category Combinator */ export const join = (rs: ReaderStream>): ReaderStream => (e) => FN.pipe( e, rs, S.chain((f) => f(e)), ) /** * @since 0.9.2 * @category Combinator */ export const during = (timeWindow: ReaderStream>) => (values: ReaderStream): ReaderStream => (e) => FN.pipe(e, values, S.during(join(timeWindow)(e))) /** * @since 0.9.2 * @category Combinator */ export const filterMap = (f: (a: A) => O.Option) => (fa: ReaderStream): ReaderStream => FN.pipe(fa, withStream(S.filterMap(f))) /** * @since 0.9.2 * @category Combinator */ export const loop = (f: (a: A, b: B) => SV, seed: A) => (fa: ReaderStream): ReaderStream => (e) => FN.pipe(e, fa, S.loop(f, seed)) /** * @since 0.9.2 * @category Combinator */ export const mergeConcurrently = (concurrency: number) => (rs: ReaderStream>): ReaderStream => (e) => FN.pipe( e, rs, S.mergeMapConcurrently((rs) => rs(e), concurrency), ) /** * @since 0.9.2 * @category Combinator */ export const multicast = (rs: ReaderStream): ReaderStream => FN.pipe(rs, withStream(S.multicast)) /** * @since 0.9.2 * @category Combinator */ export const partition = (predicate: Predicate) => (fa: ReaderStream): Separated, ReaderStream> => ({ left: FN.pipe(fa, filter(not(predicate))), right: FN.pipe(fa, filter(predicate)), }) /** * @since 0.9.2 * @category Combinator */ export const partitionMap = (f: (a: A) => Either) => (fa: ReaderStream): Separated, ReaderStream> => ({ left: FN.pipe( fa, map(f), filter(isLeft), map((x) => x.left), ), right: FN.pipe( fa, map(f), filter(isRight), map((x) => x.right), ), }) /** * @since 0.9.2 * @category Combinator */ export const race = (second: ReaderStream) => (first: ReaderStream): ReaderStream => (e) => FN.pipe( e, first, S.race(() => second(e)), ) /** * @since 0.9.2 * @category Combinator */ export const separate = (rs: ReaderStream>) => FN.pipe( rs, partitionMap((e) => e), ) /** * @since 0.9.2 * @category Combinator */ export const since = (timeWindow: ReaderStream) => (values: ReaderStream): ReaderStream => (e) => FN.pipe(e, values, S.since(timeWindow(e))) /** * @since 0.9.2 * @category Combinator */ export const skipAfter = (p: (a: A) => boolean) => (s: ReaderStream): ReaderStream => FN.pipe(s, withStream(S.skipAfter(p))) /** * @since 0.9.2 * @category Combinator */ export const skipWhile = (p: (a: A) => boolean) => (s: ReaderStream): ReaderStream => FN.pipe(s, withStream(S.skipWhile(p))) /** * @since 0.9.2 * @category Combinator */ export const slice = (skip: number, take: number) => (rs: ReaderStream): ReaderStream => FN.pipe(rs, withStream(S.slice(skip, take))) /** * @since 0.9.2 * @category Combinator */ export const switchLatest = (rs: ReaderStream>): ReaderStream => (e) => FN.pipe( e, rs, S.map((f) => f(e)), S.switchLatest, ) /** * @since 0.9.2 * @category Combinator */ export const takeWhile = (p: (a: A) => boolean) => (s: ReaderStream): ReaderStream => FN.pipe(s, withStream(S.takeWhile(p))) /** * @since 0.9.2 * @category Combinator */ export const throttle = (period: number) => (s: ReaderStream): ReaderStream => FN.pipe(s, withStream(S.throttle(period))) /** * @since 0.9.2 * @category Combinator */ export const throwError = fromStreamK(S.throwError) /** * @since 0.9.2 * @category Combinator */ export const until = (timeWindow: ReaderStream) => (values: ReaderStream): ReaderStream => (e) => FN.pipe(e, values, S.until(timeWindow(e))) /** * @since 0.9.2 * @category Combinator */ export const zero = FN.flow(S.zero, fromStream) /** * @since 0.9.2 * @category Instance */ export const Filterable: Filterable_.Filterable2 = { partitionMap, partition, filterMap, filter, } /** * @since 0.9.2 * @category Instance */ export const Compactable: Compactable2 = { compact, separate, } /** * @since 0.9.2 * @category Combinator */ export const keyed = (Eq: Eq) => (rs: ReaderStream): ReaderStream[]> => pipe(rs, withStream(S.keyed(Eq))) /** * @since 0.9.2 * @category Combinator */ export const mergeMapWhen = (Eq: Eq = deepEqualsEq) => (f: (value: V) => ReaderStream) => (values: ReaderStream>): ReaderStream> => (e) => pipe(values, withStream(S.mergeMapWhen(Eq)((v) => f(v)(e))))(e) /** * Listens to the next value of a stream. */ /** * @since 0.9.2 * @category Natural Transformation */ export const toEnv = (rs: ReaderStream): E.Env => (e) => async((resume) => { const disposable = settable() disposable.addDisposable( pipe(e, rs, S.take(1)).run( S.createSink({ event: (_, x) => disposable.addDisposable(resume(x)) }), e.scheduler, ), ) return disposable }) /** * @since 0.9.2 * @category Combinator */ export const hold = (rs: ReaderStream): ReaderStream => pipe(rs, withStream(H.hold))