import type { Either } from "../../Either"; import * as E from "../../Either"; import { constant, pipe, tuple } from "../../Function"; import type { Option } from "../../Option"; import * as O from "../../Option"; import { nextDouble } from "../Random"; import * as T from "../Task/_core"; import { makeSchedule } from "./constructors"; import type { Decision, StepFunction } from "./Decision"; import { makeContinue, makeDone } from "./Decision"; import type { Schedule } from "./model"; const repeatLoop = ( init: StepFunction, self: StepFunction = init ): StepFunction => (now, i) => T.chain_(self(now, i), (d) => { switch (d._tag) { case "Done": { return repeatLoop(init, self)(now, i); } case "Continue": { return T.pure(makeContinue(d.out, d.interval, repeatLoop(init, d.next))); } } }); const modifyDelayMLoop = ( sf: StepFunction, f: (o: O, d: number) => T.Task ): StepFunction => (now, i) => T.chain_(sf(now, i), (d) => { switch (d._tag) { case "Done": { return T.pure>(makeDone(d.out)); } case "Continue": { const delay = d.interval - now; return T.map_(f(d.out, delay), (n) => makeContinue(d.out, d.interval + n, modifyDelayMLoop(d.next, f))); } } }); /** * Returns a new schedule that loops this one continuously, resetting the state * when this schedule is done. */ export const repeat = (sc: Schedule): Schedule => makeSchedule(repeatLoop(sc.step)); export const modifyDelayM_ = ( sc: Schedule, f: (o: O, d: number) => T.Task ): Schedule => makeSchedule(modifyDelayMLoop(sc.step, f)); export const modifyDelayM = (f: (o: O, d: number) => T.Task) => ( sc: Schedule ): Schedule => modifyDelayM_(sc, f); export const modifyDelay_ = (sc: Schedule, f: (o: O, d: number) => number): Schedule => modifyDelayM_(sc, (o, d) => T.pure(f(o, d))); export const modifyDelay = (f: (o: O, d: number) => number) => (sc: Schedule): Schedule => modifyDelay_(sc, f); /** * Returns a new schedule with the effectfully calculated delay added to every update. */ export const addDelayM_ = ( sc: Schedule, f: (o: O) => T.Task ): Schedule => modifyDelayM_(sc, (o, d) => T.map_(f(o), (i) => i + d)); /** * Returns a new schedule with the effectfully calculated delay added to every update. */ export const addDelayM = (f: (o: O) => T.Task) => ( sc: Schedule ): Schedule => addDelayM_(sc, f); /** * Returns a new schedule with the given delay added to every update. */ export const addDelay_ = (sc: Schedule, f: (o: O) => number) => addDelayM_(sc, (o) => T.pure(f(o))); /** * Returns a new schedule with the given delay added to every update. */ export const addDelay = (f: (o: O) => number) => (sc: Schedule) => addDelay_(sc, f); /** * Returns a new `Schedule` with the specified effectfully computed delay added before the start * of each interval produced by the this `Schedule`. */ export const delayedM_ = (sc: Schedule, f: (d: number) => T.Task) => modifyDelayM_(sc, (_, d) => f(d)); /** * Returns a new `Schedule` with the specified effectfully computed delay added before the start * of each interval produced by the this `Schedule`. */ export const delayedM = (f: (d: number) => T.Task) => (sc: Schedule) => delayedM_(sc, f); /** * Returns a new schedule with the specified computed delay added before the start * of each interval produced by this schedule. */ export const delayed_ = (sc: Schedule, f: (d: number) => number) => delayedM_(sc, (d) => T.pure(f(d))); /** * Returns a new schedule with the specified computed delay added before the start * of each interval produced by this schedule. */ export const delayed = (f: (d: number) => number) => (sc: Schedule) => delayed_(sc, f); export const duration = (n: number) => makeSchedule((now, _: unknown) => T.pure(makeContinue(0, now + n, () => T.pure(makeDone(n))))); /** * Returns a new schedule with the specified computed delay added before the start * of each interval produced by this schedule. */ export const delayedFrom = (sc: Schedule) => addDelay_(sc, (x) => x); const andThenEitherLoop = ( sc: StepFunction, that: StepFunction, onLeft: boolean ): StepFunction> => (now, i) => onLeft ? T.chain_(sc(now, i), (d) => { switch (d._tag) { case "Continue": { return T.pure(makeContinue(E.left(d.out), d.interval, andThenEitherLoop(d.next, that, true))); } case "Done": { return andThenEitherLoop(sc, that, false)(now, i); } } }) : T.map_(that(now, i), (d) => { switch (d._tag) { case "Done": { return makeDone(E.right(d.out)); } case "Continue": { return makeContinue(E.right(d.out), d.interval, andThenEitherLoop(sc, d.next, false)); } } }); /** * Returns a new schedule that first executes this schedule to completion, and then executes the * specified schedule to completion. */ export const andThenEither_ = (sc: Schedule, that: Schedule) => makeSchedule(andThenEitherLoop(sc.step, that.step, true)); /** * Returns a new schedule that first executes this schedule to completion, and then executes the * specified schedule to completion. */ export const andThenEither = (that: Schedule) => (sc: Schedule) => andThenEither_(sc, that); const mapMLoop = ( self: StepFunction, f: (o: O) => T.Task ): StepFunction => (now, i) => T.chain_(self(now, i), (d) => { switch (d._tag) { case "Done": { return T.map_(f(d.out), (o): Decision => makeDone(o)); } case "Continue": { return T.map_(f(d.out), (o) => makeContinue(o, d.interval, mapMLoop(d.next, f))); } } }); export const mapM_ = (sc: Schedule, f: (o: O) => T.Task) => makeSchedule(mapMLoop(sc.step, (o) => f(o))); export const mapM = (f: (o: O) => T.Task) => (sc: Schedule) => mapM_(sc, f); export const map_ = (fa: Schedule, f: (a: A) => B): Schedule => mapM_(fa, (o) => T.pure(f(o))); export const map = (f: (a: A) => B) => (fa: Schedule): Schedule => map_(fa, f); export const as_ = (sc: Schedule, o: O1) => map_(sc, () => o); export const as = (o: O1) => (sc: Schedule) => as_(sc, o); /** * Returns a new schedule that maps the output of this schedule to unit. */ export const unit = (sc: Schedule): Schedule => as(undefined)(sc); const combineWithLoop = ( sc: StepFunction, that: StepFunction, f: (d1: number, d2: number) => number ): StepFunction => (now, i) => { const left = sc(now, i); const right = that(now, i); return T.map_(T.both_(left, right), ([l, r]) => { switch (l._tag) { case "Done": { switch (r._tag) { case "Done": { return makeDone<[O, O1]>([l.out, r.out]); } case "Continue": { return makeDone<[O, O1]>([l.out, r.out]); } } } /* eslint-disable-next-line no-fallthrough */ case "Continue": { switch (r._tag) { case "Done": { return makeDone<[O, O1]>([l.out, r.out]); } case "Continue": { return makeContinue([l.out, r.out], f(l.interval, r.interval), combineWithLoop(l.next, r.next, f)); } } } } }); }; export const combineWith_ = ( sc: Schedule, that: Schedule, f: (d1: number, d2: number) => number ) => makeSchedule(combineWithLoop(sc.step, that.step, f)); export const combineWith = (that: Schedule) => (f: (d1: number, d2: number) => number) => < R, I, O >( sc: Schedule ) => combineWith_(sc, that, f); export const either_ = (sc: Schedule, that: Schedule) => combineWith_(sc, that, (d1, d2) => Math.min(d1, d2)); export const either = (that: Schedule) => (sc: Schedule) => either_(sc, that); export const eitherWith_ = ( sc: Schedule, that: Schedule, f: (o: O, o1: O1) => O2 ) => map_(either_(sc, that), ([o, o1]) => f(o, o1)); export const eitherWith = (that: Schedule) => (f: (o: O, o1: O1) => O2) => ( sc: Schedule ) => eitherWith_(sc, that, f); const bothLoop = ( sc: StepFunction, that: StepFunction ): StepFunction => (now, [in1, in2]) => T.map_(T.both_(sc(now, in1), that(now, in2)), ([d1, d2]) => { switch (d1._tag) { case "Done": { switch (d2._tag) { case "Done": { return makeDone(tuple(d1.out, d2.out)); } case "Continue": { return makeDone(tuple(d1.out, d2.out)); } } } /* eslint-disable-next-line no-fallthrough */ case "Continue": { switch (d2._tag) { case "Done": { return makeDone(tuple(d1.out, d2.out)); } case "Continue": { return makeContinue( tuple(d1.out, d2.out), Math.min(d1.interval, d2.interval), bothLoop(d1.next, d2.next) ); } } } } }); /** * Returns a new schedule that has both the inputs and outputs of this and the specified * schedule. */ export const bothInOut_ = (sc: Schedule, that: Schedule) => makeSchedule(bothLoop(sc.step, that.step)); /** * Returns a new schedule that has both the inputs and outputs of this and the specified * schedule. */ export const bothInOut = (that: Schedule) => (sc: Schedule) => bothInOut_(sc, that); /** * Returns a new schedule that performs a geometric intersection on the intervals defined * by both schedules. */ export const both_ = (sc: Schedule, that: Schedule) => combineWith_(sc, that, (l, r) => Math.max(l, r)); /** * Returns a new schedule that performs a geometric intersection on the intervals defined * by both schedules. */ export const both = (that: Schedule) => ( self: Schedule ): Schedule => both_(self, that); const checkMLoop = ( self: StepFunction, test: (i: I, o: O) => T.Task ): StepFunction => (now, i) => T.chain_(self(now, i), (d) => { switch (d._tag) { case "Done": { return T.pure(makeDone(d.out)); } case "Continue": { return T.map_(test(i, d.out), (b) => b ? makeContinue(d.out, d.interval, checkMLoop(d.next, test)) : makeDone(d.out) ); } } }); /** * Returns a new schedule that passes each input and output of this schedule to the specified * function, and then determines whether or not to continue based on the return value of the * function. */ export const checkM_ = (sc: Schedule, test: (i: I, o: O) => T.Task) => makeSchedule(checkMLoop(sc.step, test)); /** * Returns a new schedule that passes each input and output of this schedule to the specified * function, and then determines whether or not to continue based on the return value of the * function. */ export const checkM = (test: (i: I, o: O) => T.Task) => (sc: Schedule) => checkM_(sc, test); /** * Returns a new schedule that passes each input and output of this schedule to the spefcified * function, and then determines whether or not to continue based on the return value of the * function. */ export const check_ = (sc: Schedule, test: (i: I, o: O) => boolean) => checkM_(sc, (i, o) => T.pure(test(i, o))); /** * Returns a new schedule that passes each input and output of this schedule to the spefcified * function, and then determines whether or not to continue based on the return value of the * function. */ export const check = (test: (i: I, o: O) => boolean) => (sc: Schedule) => check_(sc, test); /** * Returns a new schedule that continues until the specified predicate on the input evaluates * to true. */ export const untilInput_ = (sc: Schedule, f: (i: I) => boolean) => check_(sc, (i) => !f(i)); /** * Returns a new schedule that continues until the specified predicate on the input evaluates * to true. */ export const untilInput = (f: (i: I) => boolean) => (sc: Schedule) => untilInput_(sc, f); /** * Returns a new schedule that continues until the specified predicate on the input evaluates * to true. */ export const untilInputM_ = (sc: Schedule, f: (i: I) => T.Task) => checkM_(sc, (i) => T.map_(f(i), (b) => !b)); /** * Returns a new schedule that continues until the specified predicate on the input evaluates * to true. */ export const untilInputM = (f: (i: I) => T.Task) => (sc: Schedule) => untilInputM_(sc, f); /** * Returns a new schedule that continues until the specified predicate on the input evaluates * to true. */ export const untilOutput_ = (sc: Schedule, f: (o: O) => boolean) => check_(sc, (_, o) => !f(o)); /** * Returns a new schedule that continues until the specified predicate on the input evaluates * to true. */ export const untilOutput = (f: (o: O) => boolean) => (sc: Schedule) => untilOutput_(sc, f); /** * Returns a new schedule that continues until the specified predicate on the input evaluates * to true. */ export const untilOutputM_ = (sc: Schedule, f: (o: O) => T.Task) => checkM_(sc, (_, o) => T.map_(f(o), (b) => !b)); /** * Returns a new schedule that continues until the specified predicate on the input evaluates * to true. */ export const untilOutputM = (f: (o: O) => T.Task) => (sc: Schedule) => untilOutputM_(sc, f); /** * Returns a new schedule that continues for as long the specified effectful predicate on the * input evaluates to true. */ export const whileInput_ = (sc: Schedule, f: (i: I) => boolean) => check_(sc, (i) => f(i)); /** * Returns a new schedule that continues for as long the specified effectful predicate on the * input evaluates to true. */ export const whileInput = (f: (i: I) => boolean) => (sc: Schedule) => whileInput_(sc, f); /** * Returns a new schedule that continues for as long the specified effectful predicate on the * input evaluates to true. */ export const whileInputM_ = (sc: Schedule, f: (i: I) => T.Task) => checkM_(sc, (i) => f(i)); /** * Returns a new schedule that continues for as long the specified effectful predicate on the * input evaluates to true. */ export const whileInputM = (f: (i: I) => T.Task) => (sc: Schedule) => whileInputM_(sc, f); /** * Returns a new schedule that continues for as long the specified effectful predicate on the * input evaluates to true. */ export const whileOutput_ = (sc: Schedule, f: (o: O) => boolean) => check_(sc, (_, o) => f(o)); /** * Returns a new schedule that continues for as long the specified effectful predicate on the * input evaluates to true. */ export const whileOutput = (f: (o: O) => boolean) => (sc: Schedule) => whileOutput_(sc, f); /** * Returns a new schedule that continues for as long the specified effectful predicate on the * input evaluates to true. */ export const whileOutputM_ = (sc: Schedule, f: (o: O) => T.Task) => checkM_(sc, (_, o) => f(o)); /** * Returns a new schedule that continues for as long the specified effectful predicate on the * input evaluates to true. */ export const whileOutputM = (f: (o: O) => T.Task) => (sc: Schedule) => whileOutputM_(sc, f); const chooseLoop = ( sc: StepFunction, that: StepFunction ): StepFunction, Either> => (now, either) => E.fold_( either, (i) => T.map_(sc(now, i), (d) => { switch (d._tag) { case "Done": { return makeDone(E.left(d.out)); } case "Continue": { return makeContinue(E.left(d.out), d.interval, chooseLoop(d.next, that)); } } }), (i2) => T.map_(that(now, i2), (d) => { switch (d._tag) { case "Done": { return makeDone(E.right(d.out)); } case "Continue": { return makeContinue(E.right(d.out), d.interval, chooseLoop(sc, d.next)); } } }) ); /** * Returns a new schedule that allows choosing between feeding inputs to this schedule, or * feeding inputs to the specified schedule. */ export const choose_ = (sc: Schedule, that: Schedule) => makeSchedule(chooseLoop(sc.step, that.step)); /** * Returns a new schedule that allows choosing between feeding inputs to this schedule, or * feeding inputs to the specified schedule. */ export const choose = (that: Schedule) => (sc: Schedule) => choose_(sc, that); /** * Returns a new schedule that allows choosing between feeding inputs to this schedule, or * feeding inputs to the specified schedule. */ export const chooseMerge_ = (sc: Schedule, that: Schedule) => map_(choose_(sc, that), E.merge); /** * Returns a new schedule that allows choosing between feeding inputs to this schedule, or * feeding inputs to the specified schedule. */ export const chooseMerge = (that: Schedule) => (sc: Schedule) => chooseMerge_(sc, that); const ensuringLoop = ( self: StepFunction, finalizer: T.Task ): StepFunction => (now, i) => T.chain_(self(now, i), (d) => { switch (d._tag) { case "Done": { return T.as_(finalizer, makeDone(d.out)); } case "Continue": { return T.pure(makeContinue(d.out, d.interval, ensuringLoop(d.next, finalizer))); } } }); /** * Returns a new schedule that will run the specified finalizer as soon as the schedule is * complete. Note that unlike `Task#ensuring`, this method does not guarantee the finalizer * will be run. The `Schedule` may not initialize or the driver of the schedule may not run * to completion. However, if the `Schedule` ever decides not to continue, then the * finalizer will be run. */ export const ensuring_ = (sc: Schedule, finalizer: T.Task) => makeSchedule(ensuringLoop(sc.step, finalizer)); /** * Returns a new schedule that will run the specified finalizer as soon as the schedule is * complete. Note that unlike `Task#ensuring`, this method does not guarantee the finalizer * will be run. The `Schedule` may not initialize or the driver of the schedule may not run * to completion. However, if the `Schedule` ever decides not to continue, then the * finalizer will be run. */ export const ensuring = (finalizer: T.Task) => (sc: Schedule) => ensuring_(sc, finalizer); /** * A schedule that recurs on a fixed interval. Returns the number of * repetitions of the schedule so far. * * If the action run between updates takes longer than the interval, then the * action will be run immediately, but re-runs will not "pile up". * *
 * |-----interval-----|-----interval-----|-----interval-----|
 * |---------action--------||action|-----|action|-----------|
 * 
*/ export const fixed = (interval: number): Schedule => { type State = { startMillis: number; lastRun: number }; const loop = (startMillis: Option, n: number): StepFunction => (now, _) => T.pure( O.fold_( startMillis, () => makeContinue(n + 1, now + interval, loop(O.some({ startMillis: now, lastRun: now }), n + 1)), ({ lastRun, startMillis }) => { const runningBehind = now > lastRun + interval; const boundary = (now - startMillis) % interval; const sleepTime = boundary === 0 ? now : boundary; const nextRun = runningBehind ? now : now + sleepTime; return makeContinue( n + 1, nextRun, loop( O.some({ startMillis, lastRun: nextRun }), n + 1 ) ); } ) ); return makeSchedule(loop(O.none(), 0)); }; const foldMLoop = ( sf: StepFunction, b: B, f: (b: B, o: O) => T.Task ): StepFunction => (now, i) => T.chain_(sf(now, i), (d) => { switch (d._tag) { case "Done": { return T.pure>(makeDone(b)); } case "Continue": { return T.map_(f(b, d.out), (b2) => makeContinue(b2, d.interval, foldMLoop(d.next, b2, f))); } } }); /** * Returns a new `Schedule` that effectfully folds over the outputs of a `Schedule`. */ export const foldM_ = (sc: Schedule, b: B, f: (b: B, o: O) => T.Task) => makeSchedule(foldMLoop(sc.step, b, f)); /** * Returns a new `Schedule` that effectfully folds over the outputs of a `Schedule`. */ export const foldM = (b: B, f: (b: B, o: O) => T.Task) => (sc: Schedule) => foldM_(sc, b, f); /** * Returns a new `Schedule` that folds over the outputs of a `Schedule`. */ export const fold_ = (sc: Schedule, b: B, f: (b: B, o: O) => B) => foldM_(sc, b, (b, o) => T.pure(f(b, o))); /** * Returns a new `Schedule` that folds over the outputs of a `Schedule`. */ export const fold = (b: B, f: (b: B, o: O) => B) => (sc: Schedule) => fold_(sc, b, f); const unfoldLoop = (a: A, f: (a: A) => A): StepFunction => (now, _) => T.pure(makeContinue(a, now, unfoldLoop(f(a), f))); /** * Unfolds a schedule that repeats one time from the specified state and iterator. */ export const unfold_ = (a: () => A, f: (a: A) => A) => makeSchedule((now) => pipe( T.total(a), T.map((a) => makeContinue(a, now, unfoldLoop(f(a), f))) ) ); /** * Unfolds a schedule that repeats one time from the specified state and iterator. */ export const unfold = (f: (a: A) => A) => (a: () => A) => unfold_(a, f); const unfoldMLoop = (a: A, f: (a: A) => T.Task): StepFunction => (now, _) => T.pure(makeContinue(a, now, (n, i) => T.chain_(f(a), (x) => unfoldMLoop(x, f)(n, i)))); /** * Taskfully unfolds a schedule that repeats one time from the specified state and iterator. */ export const unfoldM_ = (a: A, f: (a: A) => T.Task) => makeSchedule(unfoldMLoop(a, f)); /** * Taskfully unfolds a schedule that repeats one time from the specified state and iterator. */ export const unfoldM = (f: (a: A) => T.Task) => (a: A) => unfoldM_(a, f); /** * A schedule that recurs forever and produces a count of repeats. */ export const forever = unfold_(constant(0), (n) => n + 1); /** * A schedule spanning all time, which can be stepped only the specified number of times before * it terminates. */ export const recur = (n: number) => whileOutput_(forever, (x) => x < n); /** * A `Schedule` that recurs one time. */ export const once = unit(recur(1)); /** * A `Schedule` that stops */ export const stop = unit(recur(0)); /** * Returns a new schedule that randomly modifies the size of the intervals of this schedule. */ export const jittered_ = ( sc: Schedule, { max = 0.1, min = 0 }: { min?: number; max?: number } = {} ) => delayedM_(sc, (d) => T.map_(nextDouble, (r) => d * min * (1 - r) + d * max * r)); /** * Returns a new schedule that randomly modifies the size of the intervals of this schedule. */ export const jittered = ({ max = 0.1, min = 0 }: { min?: number; max?: number } = {}) => ( sc: Schedule ) => jittered_(sc, { min, max }); /** * A schedule that always recurs, but will repeat on a linear time * interval, given by `base * n` where `n` is the number of * repetitions so far. Returns the current duration between recurrences. */ export const linear = (base: number) => delayedFrom(map_(forever, (i) => base * (i + 1))); /** * Returns a new schedule that collects the outputs of a `Schedule` into an array. */ export const collectAll = (sc: Schedule) => fold_(sc, [] as ReadonlyArray, (xs, x) => [...xs, x]); const identityLoop = (): StepFunction => (now, i) => T.pure(makeContinue(i, now, identityLoop())); /** * A schedule that always recurs and returns inputs as outputs. */ export const identity = () => makeSchedule(identityLoop()); /** * Returns a new schedule that makes this schedule available on the `Left` side of an `Either` * input, allowing propagating some type `X` through this channel on demand. */ export const left = () => (sc: Schedule) => choose_(sc, identity()); /** * Returns a new schedule that makes this schedule available on the `Right` side of an `Either` * input, allowing propagating some type `X` through this channel on demand. */ export const right = () => (sc: Schedule) => choose_(identity(), sc); /** * Returns a new schedule that packs the input and output of this schedule into the first * element of a tuple. This allows carrying information through this schedule. */ export const first = () => (sc: Schedule) => bothInOut_(sc, identity()); /** * Returns a new schedule that packs the input and output of this schedule into the second * element of a tuple. This allows carrying information through this schedule. */ export const second = () => (sc: Schedule) => bothInOut_(identity(), sc); /** * A schedule that always recurs, mapping input values through the * specified function. */ export const fromFunction = (f: (a: A) => B) => map_(identity(), f); /** * Returns a schedule that recurs continuously, each repetition spaced the specified duration * from the last run. */ export const spaced = (duration: number) => addDelay_(forever, () => duration); const provideAllLoop = (self: StepFunction, r: R): StepFunction => (now, i) => T.giveAll(r)( T.map_(self(now, i), (d) => { switch (d._tag) { case "Done": { return makeDone(d.out); } case "Continue": { return makeContinue(d.out, d.interval, provideAllLoop(d.next, r)); } } }) ); /** * Returns a new schedule with its environment provided to it, so the resulting * schedule does not require any environment. */ export const provideAll_ = (sc: Schedule, r: R): Schedule => makeSchedule(provideAllLoop(sc.step, r)); /** * Returns a new schedule with its environment provided to it, so the resulting * schedule does not require any environment. */ export const provideAll = (r: R) => (sc: Schedule) => provideAll_(sc, r); const provideSomeLoop = (self: StepFunction, r: (_: R1) => R): StepFunction => ( now, i ) => T.local_( T.map_(self(now, i), (d) => { switch (d._tag) { case "Done": { return makeDone(d.out); } case "Continue": { return makeContinue(d.out, d.interval, provideSomeLoop(d.next, r)); } } }), r ); /** * Returns a new schedule with part of its environment provided to it, so the * resulting schedule does not require any environment. */ export const provideSome_ = (sc: Schedule, r: (_: R1) => R): Schedule => makeSchedule(provideSomeLoop(sc.step, r)); /** * Returns a new schedule with part of its environment provided to it, so the * resulting schedule does not require any environment. */ export const provideSome = (r: (_: R1) => R) => (sc: Schedule) => provideSome_(sc, r); const reconsiderMLoop = ( self: StepFunction, f: (_: Decision) => T.Task> ): StepFunction => (now, i) => T.chain_(self(now, i), (d) => { switch (d._tag) { case "Done": { return T.map_( f(d), E.fold( (o2) => makeDone(o2), ([o2]) => makeDone(o2) ) ); } case "Continue": { return T.map_( f(d), E.fold( (o2) => makeDone(o2), ([o2, int]) => makeContinue(o2, int, reconsiderMLoop(d.next, f)) ) ); } } }); /** * Returns a new schedule that effectfully reconsiders every decision made by this schedule, * possibly modifying the next interval and the output type in the process. */ export const reconsiderM_ = ( sc: Schedule, f: (d: Decision) => T.Task> ): Schedule => makeSchedule(reconsiderMLoop(sc.step, f)); /** * Returns a new schedule that effectfully reconsiders every decision made by this schedule, * possibly modifying the next interval and the output type in the process. */ export const reconsiderM = ( f: (d: Decision) => T.Task> ) => (sc: Schedule) => reconsiderM_(sc, f); /** * Returns a new schedule that reconsiders every decision made by this schedule, * possibly modifying the next interval and the output type in the process. */ export const reconsider_ = ( sc: Schedule, f: (d: Decision) => Either ) => reconsiderM_(sc, (d) => T.pure(f(d))); /** * Returns a new schedule that reconsiders every decision made by this schedule, * possibly modifying the next interval and the output type in the process. */ export const reconsider = (f: (d: Decision) => Either) => ( sc: Schedule ) => reconsider_(sc, f); /** * A schedule that recurs for as long as the effectful predicate evaluates to true. */ export const recurWhileM = (f: (a: A) => T.Task) => whileInputM_(identity(), f); /** * A schedule that recurs for as long as the predicate evaluates to true. */ export const recurWhile = (f: (a: A) => boolean) => whileInput_(identity(), f); /** * A schedule that recurs for as long as the predicate evaluates to true. */ export const recurWhileEqual = (a: A) => whileInput_(identity(), (x) => a === x); /** * A schedule that recurs until the effectful predicate evaluates to true. */ export const recurUntilM = (f: (a: A) => T.Task) => untilInputM_(identity(), f); /** * A schedule that recurs until the predicate evaluates to true. */ export const recurUntil = (f: (a: A) => boolean) => untilInput_(identity(), f); /** * A schedule that recurs until the predicate evaluates to true. */ export const recurUntilEqual = (a: A) => untilInput_(identity(), (x) => x === a); /** * Returns a new schedule that outputs the number of repetitions of this one. */ export const repetitions = (sc: Schedule) => fold_(sc, 0, (n) => n + 1); const resetWhenLoop = ( sc: Schedule, step: StepFunction, f: (o: O) => boolean ): StepFunction => (now, i) => T.chain_(step(now, i), (d) => { switch (d._tag) { case "Done": { return f(d.out) ? sc.step(now, i) : T.pure(makeDone(d.out)); } case "Continue": { return f(d.out) ? sc.step(now, i) : T.pure(makeContinue(d.out, d.interval, resetWhenLoop(sc, d.next, f))); } } }); /** * Resets the schedule when the specified predicate on the schedule output evaluates to true. */ export const resetWhen_ = (sc: Schedule, f: (o: O) => boolean) => makeSchedule(resetWhenLoop(sc, sc.step, f)); /** * Resets the schedule when the specified predicate on the schedule output evaluates to true. */ export const resetWhen = (f: (o: O) => boolean) => (sc: Schedule) => resetWhen_(sc, f); const runLoop = ( self: StepFunction, now: number, xs: readonly I[], acc: readonly O[] ): T.Task => xs.length > 0 ? T.chain_(self(now, xs[0]), (d) => { switch (d._tag) { case "Done": { return T.pure([...acc, d.out]); } case "Continue": { return runLoop(d.next, d.interval, xs, [...acc, d.out]); } } }) : T.pure(acc); export const run_ = (sc: Schedule, now: number, i: Iterable) => runLoop(sc.step, now, Array.from(i), []); export const run = (now: number, i: Iterable) => (sc: Schedule) => run_(sc, now, i); const onDecisionLoop = ( self: StepFunction, f: (d: Decision) => T.Task ): StepFunction => (now, i) => T.chain_(self(now, i), (d) => { switch (d._tag) { case "Done": { return T.as_(f(d), makeDone(d.out)); } case "Continue": { return T.as_(f(d), makeContinue(d.out, d.interval, onDecisionLoop(d.next, f))); } } }); /** * Returns a new schedule that applies the current one but runs the specified effect * for every decision of this schedule. This can be used to create schedules * that log failures, decisions, or computed values. */ export const onDecision_ = (sc: Schedule, f: (d: Decision) => T.Task) => makeSchedule(onDecisionLoop(sc.step, f)); /** * Returns a new schedule that applies the current one but runs the specified effect * for every decision of this schedule. This can be used to create schedules * that log failures, decisions, or computed values. */ export const onDecision = (f: (d: Decision) => T.Task) => ( sc: Schedule ) => onDecision_(sc, f); const tapInputLoop = ( self: StepFunction, f: (i: I) => T.Task ): StepFunction => (now, i) => T.chain_(f(i), () => T.map_(self(now, i), (d) => { switch (d._tag) { case "Done": { return makeDone(d.out); } case "Continue": { return makeContinue(d.out, d.interval, tapInputLoop(d.next, f)); } } }) ); export const tapInput_ = ( sc: Schedule, f: (i: I) => T.Task ): Schedule => makeSchedule(tapInputLoop(sc.step, f)); export const tapInput = (f: (i: I) => T.Task) => (sc: Schedule) => tapInput_(sc, f); const tapOutputLoop = ( self: StepFunction, f: (o: O) => T.Task ): StepFunction => (now, i) => T.chain_(self(now, i), (d) => { switch (d._tag) { case "Done": { return T.as_(f(d.out), makeDone(d.out)); } case "Continue": { return T.as_(f(d.out), makeContinue(d.out, d.interval, tapOutputLoop(d.next, f))); } } }); export const tapOutput_ = ( sc: Schedule, f: (o: O) => T.Task ): Schedule => makeSchedule(tapOutputLoop(sc.step, f)); export const tapOutput = (f: (o: O) => T.Task) => (sc: Schedule) => tapOutput_(sc, f); const windowedLoop = ( interval: number, startMillis: Option, n: number ): StepFunction => (now, _) => T.pure( O.fold_( startMillis, () => makeContinue(n + 1, now + interval, windowedLoop(interval, O.some(now), n + 1)), (startMillis) => makeContinue( n + 1, now + ((now - startMillis) % interval), windowedLoop(interval, O.some(startMillis), n + 1) ) ) ); export const windowed = (interval: number) => makeSchedule(windowedLoop(interval, O.none(), 0)); /** * Returns a new schedule that performs a geometric intersection on the intervals defined * by both schedules. */ export const zip_ = (sc: Schedule, that: Schedule) => combineWith_(sc, that, (d, d2) => Math.max(d, d2)); /** * Returns a new schedule that performs a geometric intersection on the intervals defined * by both schedules. */ export const zip = (that: Schedule) => (sc: Schedule) => zip_(sc, that); /** * Same as zip but ignores the right output. */ export const zipl_ = (sc: Schedule, that: Schedule) => map_(zip_(sc, that), ([_]) => _); /** * Same as zip but ignores the right output. */ export const zipl = (that: Schedule) => (sc: Schedule) => zipl_(sc, that); /** * Same as zip but ignores the left output. */ export const zipr_ = (sc: Schedule, that: Schedule) => map_(zip_(sc, that), ([_, __]) => __); /** * Same as zip but ignores the left output. */ export const zipr = (that: Schedule) => (sc: Schedule) => zipr_(sc, that); /** * Equivalent to `zip` followed by `map`. */ export const zipWith_ = ( sc: Schedule, that: Schedule, f: (o: O, o1: O1) => O2 ) => map_(zip_(sc, that), ([o, o1]) => f(o, o1)); /** * Equivalent to `zip` followed by `map`. */ export const zipWith = (that: Schedule, f: (o: O, o1: O1) => O2) => ( sc: Schedule ) => zipWith_(sc, that, f);