/*!
* Copyright (c) 2017-2018 by The Funfix Project Developers.
* Some rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { HK, Monad } from "funland"
import {
Either,
Try,
Success,
Failure,
Throwable,
TimeoutError,
Option, Some, None,
coreInternals
} from "funfix-core"
import {
ICancelable,
Cancelable,
StackedCancelable,
Scheduler,
Future, ExecutionModel,
execInternals, Duration
} from "funfix-exec"
import {
IteratorLike,
iteratorOf
} from "./internals"
/**
* `IO` represents a specification for a possibly lazy or
* asynchronous computation, which when executed will produce an `A`
* as a result, along with possible side-effects.
*
* Compared with Funfix's
* [Future](https://funfix.org/api/exec/classes/future.html) (see
* [funfix-exec](https://funfix.org/api/exec/)) or JavaScript's
* [Promise](https://promisesaplus.com/),
* `IO` does not represent a running computation or a value detached
* from time, as `IO` does not execute anything when working with its
* builders or operators and it does not submit any work into the
* [Scheduler](https://funfix.org/api/exec/classes/scheduler.html) or any
* run-loop for execution, the execution eventually
* taking place only after {@link IO.run} is called and not before
* that.
*
* In order to understand `IO`, here's the design space:
*
* | | Strict | Lazy |
* |------------------|:--------------------------:|:----------------------------------:|
* | **Synchronous** | `A` | `() => A` |
* | | | [Eval<A>]{@link Eval} |
* | **Asynchronous** | `(Try => void) => void` | `() => ((Try => void) => void)` |
* | | `Future` / `Promise` | [IO<A>]{@link IO} |
*
* JavaScript is a language (and runtime) that's strict by default,
* meaning that expressions are evaluated immediately instead of
* being evaluated on a by-need basis, like in Haskell.
*
* So a value `A` is said to be strict. To turn an `A` value into a lazy
* value, you turn that expression into a parameterless function of
* type `() => A`, also called a "thunk".
*
* A [Future](https://funfix.org/api/exec/classes/future.html) is a
* value that's produced by an asynchronous process, but it is said
* to have strict behavior, meaning that when you receive a `Future`
* reference, whatever process that's supposed to complete the
* `Future` has probably started already. This goes for
* [JavaScript's Promise](https://promisesaplus.com) as well.
*
* But there are cases where we don't want strict values, but lazily
* evaluated ones. In some cases we want functions, or
* `Future`-generators. Because we might want better handling of
* parallelism, or we might want to suspend *side effects*. As
* without suspending *side effects* we don't have *referential
* transparency*, which really helps with reasoning about the code,
* being the essence of *functional programming*.
*
* This `IO` type is thus the complement to `Future`, a lazy, lawful
* monadic type that can describe any side effectful action, including
* asynchronous ones, also capable of suspending side effects.
*
* ## Getting Started
*
* To build an `IO` from a parameterless function returning a value
* (a thunk), we can use `IO.of`:
*
* ```typescript
* const hello = IO.of(() => "Hello ")
* const world = IO.of(() => "World!")
* ```
*
* Nothing gets executed yet, as `IO` is lazy, nothing executes
* until you trigger [run]{@link IO.run} on it.
*
* To combine `IO` values we can use `map` and `flatMap`, which
* describe sequencing and this time is in a very real sense because
* of the laziness involved:
*
* ```typescript
* const sayHello = hello
* .flatMap(h => world.map(w => h + w))
* .map(console.info)
* ```
*
* This `IO` reference will trigger a side effect on evaluation, but
* not yet. To make the above print its message:
*
* ```typescript
* const f: Future = sayHello.run()
*
* //=> Hello World!
* ```
*
* The returned type is a
* [Future](https://funfix.org/api/exec/classes/future.html), a value
* that can be completed already or might be completed at some point
* in the future, once the running asynchronous process finishes.
* It's the equivalent of JavaScript's `Promise`, only better and
* cancelable, see next topic.
*
* ## Laziness
*
* The fact that `IO` is lazy, whereas `Future` and `Promise` are not
* has real consequences. For example with `IO` you can do this:
*
* ```typescript
* function retryOnFailure(times: number, io: IO): IO {
* return source.recoverWith(err => {
* // No more retries left? Re-throw error:
* if (times <= 0) return IO.raise(err)
* // Recursive call, yes we can!
* return retryOnFailure(times - 1, io)
* // Adding 500 ms delay for good measure
* .delayExecution(500)
* })
* }
* ```
*
* `Future` being a strict value-wannabe means that the actual value
* gets "memoized" (means cached), however `IO` is basically a function
* that can be repeated for as many times as you want. `IO` can also
* do memoization of course:
*
* ```typescript
* io.memoize()
* ```
*
* The difference between this and just calling `run()` is that
* `memoize()` still returns an `IO` and the actual memoization
* happens on the first `run()` (with idempotency guarantees of
* course).
*
* But here's something else that `Future` or your favorite
* `Promise`-like data type cannot do:
*
* ```typescript
* io.memoizeOnSuccess()
* ```
*
* This keeps repeating the computation for as long as the result is a
* failure and caches it only on success. Yes we can!
*
* ### Parallelism
*
* Because of laziness, invoking {@link IO.sequence} will not work like
* it does for `Future.sequence` or `Promise.all`, the given `IO` values
* being evaluated one after another, in *sequence*, not in *parallel*.
* If you want parallelism, then you need to use {@link IO.gather} and
* thus be explicit about it.
*
* This is great because it gives you the possibility of fine tuning the
* execution. For example, say you want to execute things in parallel,
* but with a maximum limit of 30 tasks being executed in parallel.
* One way of doing that is to process your list in batches.
*
* This sample assumes you have [lodash](https://lodash.com/) installed,
* for manipulating our array:
*
* ```typescript
* import * as _ from "lodash"
* import { IO } from "funfix"
*
* // Some array of IOs, you come up with something good :-)
* const list: IO[] = ???
*
* // Split our list in chunks of 30 items per chunk,
* // this being the maximum parallelism allowed
* const chunks = _.chunks(list, 30)
* // Specify that each batch should process stuff in parallel
* const batchedIOs = _.map(chunks, chunk => IO.gather(chunk))
* // Sequence the batches
* const allBatches = IO.sequence(batchedIOs)
*
* // Flatten the result, within the context of IO
* const all: IO =
* allBatches.map(batches => _.flatten(batches))
* ```
*
* Note that the built `IO` reference is just a specification at this point,
* or you can view it as a function, as nothing has executed yet, you need
* to call {@link IO.run .run} explicitly.
*
* ## Cancellation
*
* The logic described by an `IO` task could be cancelable, depending
* on how the `IO` gets built. This is where the `IO`-`Future`
* symbiosis comes into play.
*
* Futures can also be canceled, in case the described computation can
* be canceled. When describing `IO` tasks with `IO.of` nothing can be
* cancelled, since there's nothing about a plain function that you
* can cancel, but, we can build cancelable tasks with
* {@link IO.async}:
*
* ```typescript
* import { Cancelable, Success, IO } from "funfix"
*
* const delayedHello = IO.async((scheduler, callback) => {
* const task = scheduler.scheduleOnce(1000, () => {
* console.info("Delayed Hello!")
* // Signaling successful completion
* // ("undefined" inhabits type "void")
* callback(Success(undefined))
* })
*
* return Cancelable.of(() => {
* console.info("Cancelling!")
* task.cancel()
* })
* })
* ```
*
* The sample above prints a message with a delay, where the delay
* itself is scheduled with the injected `Scheduler`. The `Scheduler`
* is in fact an optional parameter to {@link IO.run} and if one
* isn't explicitly provided, then `Scheduler.global` is assumed.
*
* This action can be cancelled, because it specifies cancellation
* logic. If we wouldn't return an explicit `Cancelable` there,
* then cancellation wouldn't work. But for this `IO` reference
* it does:
*
* ```typescript
* // Triggering execution, which sends a task to execute by means
* // of JavaScript's setTimeout (under the hood):
* const f: Future = delayedHello.run()
*
* // If we change our mind before the timespan has passed:
* f.cancel()
* ```
*
* Also, given an `IO` task, we can specify actions that need to be
* triggered in case of cancellation:
*
* ```typescript
* const io = IO.of(() => console.info("Hello!"))
* .executeForked()
*
* io.doOnCancel(IO.of(() => {
* console.info("A cancellation attempt was made!")
* })
*
* const f: Future = io.run()
*
* // Note that in this case cancelling the resulting Future
* // will not stop the actual execution, since it doesn't know
* // how, but it will trigger our on-cancel callback:
*
* f.cancel()
* //=> A cancellation attempt was made!
* ```
*
* ## Note on the ExecutionModel
*
* `IO` is conservative in how it introduces async boundaries.
* Transformations like `map` and `flatMap` for example will default
* to being executed on the current call stack on which the
* asynchronous computation was started. But one shouldn't make
* assumptions about how things will end up executed, as ultimately
* it is the implementation's job to decide on the best execution
* model. All you are guaranteed is asynchronous execution after
* executing `run`.
*
* Currently the default `ExecutionModel` specifies batched execution
* by default and `IO` in its evaluation respects the injected
* `ExecutionModel`. If you want a different behavior, you need to
* execute the `IO` reference with a different scheduler.
*
* In order to configure a different execution model, this config
* can be injected by means of a custom scheduler:
*
* ```typescript
* import { Scheduler, ExecutionModel } from "funfix"
*
* const ec = Scheduler.global.get()
* .withExecutionModel(ExecutionModel.alwaysAsync())
*
* // ...
* io.run(ec)
* ```
*
* Or you can configure an `IO` reference to execute with a certain
* execution model that overrides the configuration of the injected
* scheduler, by means of {@link IO.executeWithModel}:
*
* ```typescript
* io.executeWithModel(ExecutionModel.batched(256))
* ```
*
* ## Versus Eval
*
* For dealing with lazy evaluation, the other alternative is
* the {@link Eval} data type.
*
* Differences between `Eval` and `IO`:
*
* 1. `IO` is capable of describing asynchronous computations as well
* 2. `IO` is capable of error handling (it implements `MonadError`),
* whereas `Eval` does not provide error handling capabilities,
* being meant to be used for pure expressions (it implements
* `Comonad`, which is incompatible with `MonadError`)
* 3. You cannot rely on `IO` to produce a value immediately, since
* we cannot block threads on top of JavaScript engines
*
* So if you need error handling capabilities
* (i.e. `MonadError`), or if you need to describe
* asynchronous processes, then `IO` is for you. {@link Eval}
* is a simpler data type with the sole purpose of controlling the
* evaluation of expressions (i.e. strict versus lazy).
*
* ## Credits
*
* This type is inspired by `cats.effect.IO` from
* {@link http://typelevel.org/cats/|Typelevel Cats},
* by `monix.eval.Task` from {@link https://monix.io|Monix}, by
* `scalaz.effect.IO` from [Scalaz](https://github.com/scalaz/scalaz),
* which are all inspired by Haskell's `IO` data type.
*
* @final
*/
export class IO implements HK<"funfix/io", A> {
/**
* Triggers the asynchronous execution.
*
* Without invoking `run` on a `IO`, nothing gets evaluated, as an
* `IO` has lazy behavior.
*
* ```typescript
* // Describing a side effect
* const io = IO.of(() => console.log("Hello!"))
* // Delaying it for 1 second, for didactical purposes
* .delayExecution(1000)
*
* // Nothing executes until we call run on it, which gives
* // us a Future in return:
* const f: Future = io.run()
*
* // The given Future is cancelable, in case the logic
* // decribed by our IO is cancelable, so we can do this:
* f.cancel()
* ```
*
* Note that `run` takes a
* [Scheduler](https://funfix.org/api/exec/classes/scheduler.html)
* as an optional parameter and if one isn't provided, then the
* default scheduler gets used. The `Scheduler` is in charge
* of scheduling asynchronous boundaries, executing tasks
* with a delay (e.g. `setTimeout`) or of reporting failures
* (with `console.error` by default).
*
* Also see {@link IO.runOnComplete} for a version that takes a
* callback as parameter.
*
* @return a `Future` that will eventually complete with the
* result produced by this `IO` on evaluation
*/
run(ec: Scheduler = Scheduler.global.get()): Future {
return taskToFutureRunLoop(this, ec)
}
/**
* Triggers the asynchronous execution.
*
* Without invoking `run` on a `IO`, nothing gets evaluated, as an
* `IO` has lazy behavior.
*
* `runComplete` starts the evaluation and takes a callback which
* will be triggered when the computation is complete.
*
* Compared with JavaScript's `Promise.then` the provided callback
* is a function that receives a
* [Try](https://funfix.org/api/core/classes/try.html) value, a data
* type which is what's called a "logical disjunction", or a "tagged
* union type", a data type that can represent both successful
* results and failures. This is because in Funfix we don't work
* with `null`.
*
* Also the returned value is an
* [ICancelable](https://funfix.org/api/exec/interfaces/icancelable.html)
* reference, which can be used to cancel the running computation,
* in case the logic described by our `IO` is cancelable (note that
* some procedures cannot be cancelled, it all depends on how the
* `IO` value was described, see {@link IO.async} for how cancelable
* `IO` values can be built).
*
* Example:
*
* ```typescript
* // Describing a side effect
* const io = IO.of(() => console.log("Hello!"))
* .delayExecution(1000)
*
* // Nothing executes until we explicitly run our `IO`:
* const c: ICancelable = io.runOnComplete(r =>
* r.fold(
* err => console.error(err),
* _ => console.info("Done!")
* ))
*
* // In case we change our mind and the logic described by
* // our `IO` is cancelable, we can cancel it:
* c.cancel()
* ```
*
* Note that `runOnComplete` takes a
* [Scheduler](https://funfix.org/api/exec/classes/scheduler.html)
* as an optional parameter and if one isn't provided, then the
* default scheduler gets used. The `Scheduler` is in charge
* of scheduling asynchronous boundaries, executing tasks
* with a delay (e.g. `setTimeout`) or of reporting failures
* (with `console.error` by default).
*
* Also see {@link IO.run} for a version that returns a `Future`,
* which might be easier to work with, especially since a `Future`
* is `Promise`-like.
*
* @param cb is the callback that will be eventually called with
* the final result, or error, when the evaluation completes
*
* @param ec is the scheduler that controls the triggering of
* asynchronous boundaries (e.g. `setTimeout`)
*
* @return a cancelable action that can be triggered to cancel
* the running computation, assuming that the implementation
* of the source `IO` can be cancelled
*/
runOnComplete(
cb: (result: Try) => void,
ec: Scheduler = Scheduler.global.get()): ICancelable {
const ref = ioGenericRunLoop(this, ec, null, cb, null, null, null)
return ref || Cancelable.empty()
}
/**
* Handle errors by lifting results into `Either` values.
*
* If there's an error, then a `Left` value will be signaled. If
* there is no error, then a `Right` value will be signaled instead.
*
* The returned type is an
* [Either](https://funfix.org/api/core/classes/either.html) value,
* which is what's called a "logical disjunction" or a "tagged union
* type", representing a choice between two values, in this case
* errors on the "Left" and successful results on the "Right".
*
* ```typescript
* // Describing an IO that can fail on execution:
* const io: IO = IO.of(() => {
* const n = Math.random() * 1000
* const m = n & n // to integer
* if (m % 2) throw new Error("No odds please!")
* return m
* })
*
* // By using attempt() we can observe and use errors
* // in `map` and `flatMap` transformations:
* io.attempt().map(either =>
* either.fold(
* err => "odd",
* val => "even"
* ))
* ```
*
* For other error handling capabilities, see {@link IO.recoverWith}
* and {@link IO.transformWith}.
*/
attempt(): IO> {
return this.transform(
_ => Either.left(_),
Either.right)
}
/**
* Introduces an asynchronous boundary at the current stage in the
* asynchronous processing pipeline (after the source has been
* evaluated).
*
* Consider the following example:
*
* ```typescript
* const readPath: () => "path/to/file"
*
* const io = IO.of(readPath)
* .asyncBoundary()
* .map(fs.readFileSync)
* ```
*
* Between reading the path and then reading the file from that
* path, we schedule an async boundary (it usually happens with
* JavaScript's `setTimeout` under the hood).
*
* This is equivalent with:
*
* ```typescript
* self.flatMap(a => IO.shift(ec).map(_ => a))
*
* // ... or ...
*
* self.forEffect(IO.shift(ec))
* ```
*
* Also see {@link IO.shift} and {@link IO.fork}.
*
* @param ec is an optional `Scheduler` implementation that can
* be used for scheduling the async boundary, however if
* not specified, the `IO`'s default scheduler (the one
* passed to `run()`) gets used
*/
asyncBoundary(ec?: Scheduler): IO {
return this.flatMap(a => IO.shift(ec).map(() => a))
}
/**
* Alias for {@link IO.flatMap .flatMap}.
*/
chain(f: (a: A) => IO): IO {
return this.flatMap(f)
}
/**
* Delays the evaluation of this `IO` by the specified duration.
*
* ```typescript
* const fa = IO.of(() => "Hello")
*
* // Delays the evaluation by 1 second
* fa.delayExecution(1000)
* ```
*
* @param delay is the duration to wait before signaling the
* final result
*/
delayExecution(delay: number | Duration): IO {
return IO.delayedTick(delay).flatMap(() => this)
}
/**
* Delays signaling the result of this `IO` on evaluation by the
* specified duration.
*
* It works for successful results:
*
* ```typescript
* const fa = IO.of(() => "Alex")
*
* // Delays the signaling by 1 second
* fa.delayResult(1000)
* ```
*
* And for failures as well:
*
* ```typescript
* Future.raise(new TimeoutError()).delayResult(1000)
* ```
*
* @param delay is the duration to wait before signaling the
* final result
*/
delayResult(delay: number | Duration): IO {
return this.transformWith(
err => IO.delayedTick(delay).flatMap(() => IO.raise(err)),
a => IO.delayedTick(delay).map(() => a)
)
}
/**
* Returns a new `IO` in which `f` is scheduled to be run on
* completion. This would typically be used to release any
* resources acquired by this `IO`.
*
* The returned `IO` completes when both the source and the task
* returned by `f` complete.
*
* NOTE: The given function is only called when the task is
* complete. However the function does not get called if the task
* gets canceled. Cancellation is a process that's concurrent with
* the execution of a task and hence needs special handling.
*
* See {@link IO.doOnCancel} for specifying a callback to call on
* canceling a task.
*/
doOnFinish(f: (e: Option) => IO): IO {
return this.transformWith(
e => f(Some(e)).flatMap(() => IO.raise(e)),
a => f(None).map(() => a)
)
}
/**
* Returns a new `IO` that will mirror the source, but that will
* execute the given `callback` if the task gets canceled before
* completion.
*
* This only works for premature cancellation. See
* {@link IO.doOnFinish} for triggering callbacks when the
* source finishes.
*
* @param callback is the `IO` value to execute if the task gets
* canceled prematurely
*/
doOnCancel(callback: IO): IO {
return IO.asyncUnsafe((ctx, cb) => {
const ec = ctx.scheduler
ec.trampoline(() => {
const conn = ctx.connection
conn.push(Cancelable.of(() => callback.run(ec)))
IO.unsafeStart(this, ctx, ioSafeCallback(ec, conn, cb))
})
})
}
/**
* Ensures that an asynchronous boundary happens before the
* execution, managed by the provided scheduler.
*
* Alias for {@link IO.fork}.
*
* Calling this is equivalent with:
*
* ```typescript
* IO.shift(ec).flatMap(_ => self)
*
* // ... or ...
*
* IO.shift(ec).followedBy(self)
* ```
*
* See {@link IO.fork}, {@link IO.asyncBoundary} and {@link IO.shift}.
*/
executeForked(ec?: Scheduler): IO {
return IO.fork(this, ec)
}
/**
* Override the `ExecutionModel` of the default scheduler.
*
* ```typescript
* import { ExecutionModel } from "funfix"
*
* io.executeWithModel(ExecutionModel.alwaysAsync())
* ```
*/
executeWithModel(em: ExecutionModel): IO {
return IO.asyncUnsafe((ctx, cb) => {
const ec = ctx.scheduler.withExecutionModel(em)
const ctx2 = new IOContext(ec, ctx.connection, ctx.options)
ec.trampoline(() => IO.unsafeStart(this, ctx2, cb))
})
}
/**
* Returns a new `IO` that upon evaluation will execute with the
* given set of {@link IOOptions}, allowing for tuning the run-loop.
*
* This allows for example making run-loops "auto-cancelable",
* an option that's off by default due to safety concerns:
*
* ```typescript
* io.executeWithOptions({
* autoCancelableRunLoops: true
* })
* ```
*/
executeWithOptions(set: IOOptions): IO {
return IO.asyncUnsafe((ctx, cb) => {
const ec = ctx.scheduler
const ctx2 = new IOContext(ec, ctx.connection, set)
ec.trampoline(() => IO.unsafeStart(this, ctx2, cb))
})
}
/**
* Creates a new `IO` by applying a function to the successful
* result of the source, and returns a new instance equivalent to
* the result of the function.
*
* ```typescript
* const rndInt = IO.of(() => {
* const nr = Math.random() * 1000000
* return nr & nr
* })
*
* const evenInt = () =>
* rndInt.flatMap(int => {
* if (i % 2 == 0)
* return IO.now(i)
* else // Retry until we have an even number!
* return evenInt()
* })
* ```
*/
flatMap(f: (a: A) => IO): IO {
return new IOFlatMap(this, f)
}
/**
* `Applicative` apply operator.
*
* Resembles {@link map}, but the passed mapping function is
* lifted in the `Either` context.
*/
ap(ff: IO<(a: A) => B>): IO {
return ff.flatMap(f => this.map(f))
}
/**
* Sequentially compose two `IO` actions, discarding any value
* produced by the first.
*
* So this:
*
* ```typescript
* ioA.followedBy(ioB)
* ```
*
* Is equivalent with this:
*
* ```typescript
* ioA.flatMap(_ => fb)
* ```
*/
followedBy(fb: IO): IO {
return this.flatMap(() => fb)
}
/**
* Returns a new `IO` that upon evaluation will execute the given
* function for the generated element, transforming the source into
* an `IO`.
*/
forEach(cb: (a: A) => void): IO {
return this.map(cb)
}
/**
* Sequentially compose two actions, discarding any value
* produced by the second.
*
* So this:
*
* ```typescript
* ioA.forEffect(ioB)
* ```
*
* Is equivalent with this:
*
* ```typescript
* ioA.flatMap(a => ioB.map(_ => a))
* ```
*/
forEffect(fb: IO): IO {
return this.flatMap(a => fb.map(() => a))
}
/**
* Returns a new `IO` that applies the mapping function to the
* successful result emitted by the source.
*
* ```typescript
* IO.now(111).map(_ => _ * 2).get() // 222
* ```
*
* Note there's a correspondence between `flatMap` and `map`:
*
* ```typescript
* fa.map(f) <-> fa.flatMap(x => IO.pure(f(x)))
* ```
*/
map(f: (a: A) => B): IO {
return new IOFlatMap(this, (a: A) => IO.now(f(a)))
}
/**
* Memoizes (caches) the result of the source `IO` and reuses it on
* subsequent invocations of `run`.
*
* The resulting task will be idempotent, meaning that
* evaluating the resulting task multiple times will have the
* same effect as evaluating it once.
*
* @see {@link IO.memoizeOnSuccess} for a version that only caches
* successful results.
*/
memoize(): IO {
switch (this._tag) {
case "pure":
return this
case "always":
const always = (this as any) as IOAlways
return new IOOnce(always.thunk, false)
case "memoize":
const mem = (this as any) as IOMemoize
if (!mem.onlySuccess) return mem
return new IOMemoize(this, false)
default: // flatMap | async
return new IOMemoize(this, false)
}
}
/**
* Memoizes (caches) the successful result of the source task
* and reuses it on subsequent invocations of `run`.
* Thrown exceptions are not cached.
*
* The resulting task will be idempotent, but only if the
* result is successful.
*
* @see {@link IO.memoize} for a version that caches both successful
* results and failures
*/
memoizeOnSuccess(): IO {
switch (this._tag) {
case "pure":
case "once":
case "memoize":
return this
case "always":
const always = (this as any) as IOAlways
return new IOOnce(always.thunk, true)
default: // flatMap | async
return new IOMemoize(this, true)
}
}
/**
* Creates a new `IO` that will mirror the source on success,
* but on failure it will try to recover and yield a successful
* result by applying the given function `f` to the thrown error.
*
* This function is the equivalent of a `try/catch` statement,
* or the equivalent of {@link IO.map .map} for errors.
*
* ```typescript
* io.recover(err => {
* console.error(err)
* fallback
* })
* ```
*/
recover(f: (e: Throwable) => AA): IO {
return this.recoverWith(a => IO.now(f(a)))
}
/**
* Creates a new `IO` that will mirror the source on success,
* but on failure it will try to recover and yield a successful
* result by applying the given function `f` to the thrown error.
*
* This function is the equivalent of a `try/catch` statement,
* or the equivalent of {@link IO.flatMap .flatMap} for errors.
*
* Note that because of `IO`'s laziness, this can describe retry
* loop:
*
* ```typescript
* function retryOnFailure(times: number, io: IO): IO {
* return source.recoverWith(err => {
* // No more retries left? Re-throw error:
* if (times <= 0) return IO.raise(err)
* // Recursive call, yes we can!
* return retryOnFailure(times - 1, io)
* // Adding 500 ms delay for good measure
* .delayExecution(500)
* })
* }
* ```
*/
recoverWith(f: (e: Throwable) => IO): IO {
return this.transformWith(f, IO.now as any)
}
/**
* Returns an `IO` that mirrors the source in case the result of
* the source is signaled within the required `after` duration
* on evaluation, otherwise it fails with a `TimeoutError`,
* cancelling the source.
*
* ```typescript
* const fa = IO.of(() => 1).delayResult(10000)
*
* // Will fail with a TimeoutError on run()
* fa.timeout(1000)
* ```
*
* @param after is the duration to wait until it triggers
* the timeout error
*/
timeout(after: number | Duration): IO {
const fb = IO.raise(new TimeoutError(Duration.of(after).toString()))
return this.timeoutTo(after, fb)
}
/**
* Returns an `IO` value that mirrors the source in case the result
* of the source is signaled within the required `after` duration
* when evaluated (with `run()`), otherwise it triggers the
* execution of the given `fallback` after the duration has passed,
* cancelling the source.
*
* This is literally the implementation of {@link IO.timeout}:
*
* ```typescript
* const fa = IO.of(() => 1).delayResult(10000)
*
* fa.timeoutTo(1000, IO.raise(new TimeoutError()))
* ```
*
* @param after is the duration to wait until it triggers the `fallback`
* @param fallback is a fallback `IO` to timeout to
*/
timeoutTo(after: number | Duration, fallback: IO): IO {
const other = IO.delayedTick(after).flatMap(() => fallback)
const lst: IO[] = [this, other]
return IO.firstCompletedOf(lst)
}
/**
* Creates a new `IO` by applying the 'success' function to the
* successful result of the source, or the 'error' function to the
* potential errors that might happen.
*
* This function is similar with {@link IO.map .map}, except that
* it can also transform errors and not just successful results.
*
* @param success is a function for transforming a successful result
* @param failure is a function for transforming failures
*/
transform(failure: (e: Throwable) => R, success: (a: A) => R): IO {
return this.transformWith(
e => IO.now(failure(e)),
a => IO.now(success(a))
)
}
/**
* Creates a new `IO` by applying the 'success' function to the
* successful result of the source, or the 'error' function to the
* potential errors that might happen.
*
* This function is similar with {@link IO.flatMap .flatMap},
* except that it can also transform errors and not just successful
* results.
*
* @param success is a function for transforming a successful result
* @param failure is a function for transforming failures
*/
transformWith(failure: (e: Throwable) => IO, success: (a: A) => IO): IO {
return new IOFlatMap(this, success, failure)
}
/**
* Identifies the `IO` reference type, useful for debugging and
* for pattern matching in the implementation.
*
* @hidden
*/
readonly _tag!: "pure" | "always" | "once" | "flatMap" | "async" | "memoize"
// Implements HK
/** @hidden */ readonly _URI!: "funfix/io"
/** @hidden */ readonly _A!: A
// Implements Constructor
/** @hidden */ static readonly _Class: IO
/**
* Promote a `thunk` function to an `IO`, catching exceptions in
* the process.
*
* Note that since `IO` is not memoized by global, this will
* recompute the value each time the `IO` is executed.
*
* ```typescript
* const io = IO.always(() => { console.log("Hello!") })
*
* io.run()
* //=> Hello!
* io.run()
* //=> Hello!
* io.run()
* //=> Hello!
* ```
*/
static always(thunk: () => A): IO {
return new IOAlways(thunk)
}
/**
* Create a `IO` from an asynchronous computation, which takes
* the form of a function with which we can register a callback.
*
* This can be used to translate from a callback-based API to a
* straightforward monadic version.
*/
static async(register: (ec: Scheduler, cb: (a: Try) => void) => ICancelable | void): IO {
return IO.asyncUnsafe((ctx, cb) => {
const ec = ctx.scheduler
const conn = ctx.connection
// Forcing a light asynchronous boundary, otherwise
// stack overflows are possible
ec.trampoline(() => {
// Wrapping the callback in a safe implementation that
// provides idempotency guarantees and that pops from
// the given `StackedCancelable` at the right time
const safe = ioSafeCallback(ec, conn, cb)
try {
const ref = register(ec, safe)
// This `push` can be executed after `register`, even the
// `safe` callback gets executed immediately, because of
// the light async boundary in `ioSafeCallback`
conn.push(ref || Cancelable.empty())
} catch (e) {
safe(Failure(e))
}
})
})
}
/**
* Constructs a lazy [[IO]] instance whose result will be computed
* asynchronously.
*
* **WARNING:** Unsafe to use directly, only use if you know
* what you're doing. For building `IO` instances safely
* see {@link IO.async}.
*
* Rules of usage:
*
* - the received `StackedCancelable` can be used to store
* cancelable references that will be executed upon cancel;
* every `push` must happen at the beginning, before any
* execution happens and `pop` must happen afterwards
* when the processing is finished, before signaling the
* result
* - before execution, an asynchronous boundary is recommended,
* to avoid stack overflow errors, but can happen using the
* scheduler's facilities for trampolined execution
* - on signaling the result (`Success` or `Failure`),
* another async boundary is necessary, but can also
* happen with the scheduler's facilities for trampolined
* execution (e.g. `Scheduler.trampoline`)
*
* **WARNING:** note that not only is this builder unsafe, but also
* unstable, as the {@link IORegister} callback type is exposing
* volatile internal implementation details. This builder is meant
* to create optimized asynchronous tasks, but for normal usage
* prefer {@link IO.async}.
*/
static asyncUnsafe(register: IORegister): IO {
return new IOAsync(register)
}
/**
* Promote a `thunk` function generating `IO` results to an `IO`
* of the same type.
*
* Alias for {@link IO.suspend}.
*/
static defer(thunk: () => IO): IO {
return IO.unit().flatMap(() => thunk())
}
/**
* Defers the creation of an `IO` by using the provided function,
* which has the ability to inject a needed `Scheduler`.
*
* Example:
*
* ```typescript
* function measureLatency(source: IO): IO<[A, Long]> {
* return IO.deferAction<[A, Long]>(s => {
* // We have our Scheduler, which can inject time, we
* // can use it for side-effectful operations
* const start = s.currentTimeMillis()
*
* return source.map(a => {
* const finish = s.currentTimeMillis()
* return [a, finish - start]
* })
* })
* }
* ```
*
* @param f is the function that's going to be called when the
* resulting `IO` gets evaluated
*/
static deferAction(f: (ec: Scheduler) => IO): IO {
return IO.asyncUnsafe((ctx, cb) => {
const ec = ctx.scheduler
let ioa: IO
try { ioa = f(ec) } catch (e) { ioa = IO.raise(e) }
ec.trampoline(() => IO.unsafeStart(ioa, ctx, cb))
})
}
/**
* Given a `thunk` that produces `Future` values, suspends it
* in the `IO` context, evaluating it on demand whenever the
* resulting `IO` gets evaluated.
*
* See {@link IO.fromFuture} for the strict version.
*/
static deferFuture(thunk: () => Future): IO {
return IO.suspend(() => IO.fromFuture(thunk()))
}
/**
* Wraps calls that generate `Future` results into `IO`, provided
* a callback with an injected `Scheduler`.
*
* This builder helps with wrapping `Future`-enabled APIs that need
* a `Scheduler` to work.
*
* @param f is the function that's going to be executed when the task
* gets evaluated, generating the wrapped `Future`
*/
static deferFutureAction(f: (ec: Scheduler) => Future): IO {
return IO.deferAction(ec => IO.fromFuture(f(ec)))
}
/**
* Returns an `IO` that on evaluation will complete after the
* given `delay`.
*
* This can be used to do delayed execution. For example:
*
* ```typescript
* IO.delayedTick(1000).flatMap(_ =>
* IO.of(() => console.info("Hello!"))
* )
* ```
*
* @param delay is the duration to wait before signaling the tick
*/
static delayedTick(delay: number | Duration): IO {
return IO.asyncUnsafe((ctx, cb) => {
const conn = ctx.connection
const task = ctx.scheduler.scheduleOnce(delay, () => {
conn.pop()
cb(Try.unit())
})
conn.push(task)
})
}
/**
* Creates a race condition between multiple `IO` values, on
* evaluation returning the result of the first one that completes,
* cancelling the rest.
*
* ```typescript
* const failure = IO.raise(new TimeoutError()).delayResult(2000)
*
* // Will yield 1
* const fa1 = IO.of(() => 1).delayResult(1000)
* IO.firstCompletedOf([fa1, failure])
*
* // Will yield a TimeoutError
* const fa2 = IO.of(() => 1).delayResult(10000)
* IO.firstCompletedOf([fa2, failure])
* ```
*
* @param list is the list of `IO` values for which the
* race is started
*
* @return a new `IO` that will evaluate to the result of the first
* in the list to complete, the rest being cancelled
*/
static firstCompletedOf(list: IO[] | Iterable>): IO {
return ioListToFutureProcess(list, Future.firstCompletedOf)
}
/**
* Converts any strict `Future` value into an {@link IO}.
*
* Note that this builder does not suspend any side effects, since
* the given parameter is strict (and not a function) and because
* `Future` has strict behavior.
*
* See {@link IO.deferFuture} for an alternative that evaluates
* lazy thunks that produce future results.
*/
static fromFuture(fa: Future): IO {
if (!fa.value().isEmpty())
return IO.fromTry(fa.value().get() as any)
return IO.asyncUnsafe((ctx, cb) => {
ctx.connection.push(fa)
fa.onComplete(result => {
ctx.connection.pop()
cb(result as any)
})
})
}
/**
* Returns a `IO` reference that will signal the result of the
* given `Try` reference upon evaluation.
*/
static fromTry(a: Try): IO { return new IOPure(a) }
/**
* Mirrors the given source `IO`, but before execution trigger
* an asynchronous boundary (usually by means of `setTimeout` on
* top of JavaScript, depending on the provided `Scheduler`
* implementation).
*
* If a `Scheduler` is not explicitly provided, the implementation
* ends up using the one provided in {@link IO.run}.
*
* Note that {@link IO.executeForked} is the method version of this
* function (e.g. `io.executeForked() == IO.fork(this)`).
*
* ```typescript
* IO.of(() => fs.readFileSync(path))
* .executeForked()
* ```
*
* Also see {@link IO.shift} and {@link IO.asyncBoundary}.
*
* @param fa is the task that will get executed asynchronously
*
* @param ec is the `Scheduler` used for triggering the async
* boundary, or if not provided it will default to the
* scheduler passed on evaluation in {@link IO.run}
*/
static fork(fa: IO, ec?: Scheduler): IO {
return IO.shift(ec).flatMap(() => fa)
}
/**
* Maps 2 `IO` values by the mapping function, returning a new
* `IO` reference that completes with the result of mapping that
* function to the successful values of the futures, or in failure in
* case either of them fails.
*
* This is a specialized {@link IO.sequence} operation and as such
* on cancellation or failure all pending tasks get cancelled.
*
* ```typescript
* const fa1 = IO.of(() => 1)
* const fa2 = IO.of(() => 2)
*
*
* // Yields Success(3)
* IO.map2(fa1, fa2, (a, b) => a + b)
*
* // Yields Failure, because the second arg is a Failure
* IO.map2(fa1, IO.raise("error"),
* (a, b) => a + b
* )
* ```
*
* This operation is the `Applicative.map2`.
*/
static map2(
fa1: IO, fa2: IO,
f: (a1: A1, a2: A2) => R): IO {
const fl: IO = IO.sequence([fa1, fa2] as any[])
return fl.map(lst => f(lst[0], lst[1]))
}
/**
* Maps 3 `IO` values by the mapping function, returning a new
* `IO` reference that completes with the result of mapping that
* function to the successful values of the futures, or in failure in
* case either of them fails.
*
* This is a specialized {@link IO.sequence} operation and as such
* on cancellation or failure all pending tasks get cancelled.
*
* ```typescript
* const fa1 = IO.of(() => 1)
* const fa2 = IO.of(() => 2)
* const fa3 = IO.of(() => 3)
*
*
* // Yields Success(6)
* IO.map3(fa1, fa2, fa3, (a, b, c) => a + b + c)
*
* // Yields Failure, because the second arg is a Failure
* IO.map3(
* fa1, fa2, IO.raise("error"),
* (a, b, c) => a + b + c
* )
* ```
*/
static map3(
fa1: IO, fa2: IO, fa3: IO,
f: (a1: A1, a2: A2, a3: A3) => R): IO {
const fl: IO = IO.sequence([fa1, fa2, fa3] as any[])
return fl.map(lst => f(lst[0], lst[1], lst[2]))
}
/**
* Maps 4 `IO` values by the mapping function, returning a new
* `IO` reference that completes with the result of mapping that
* function to the successful values of the futures, or in failure in
* case either of them fails.
*
* This is a specialized {@link IO.sequence} operation and as such
* on cancellation or failure all pending tasks get cancelled.
*
* ```typescript
* const fa1 = IO.of(() => 1)
* const fa2 = IO.of(() => 2)
* const fa3 = IO.of(() => 3)
* const fa4 = IO.of(() => 4)
*
* // Yields Success(10)
* IO.map4(fa1, fa2, fa3, fa4, (a, b, c, d) => a + b + c + d)
*
* // Yields Failure, because the second arg is a Failure
* IO.map4(
* fa1, fa2, fa3, IO.raise("error"),
* (a, b, c, d) => a + b + c + d
* )
* ```
*/
static map4(
fa1: IO, fa2: IO, fa3: IO, fa4: IO,
f: (a1: A1, a2: A2, a3: A3, a4: A4) => R): IO {
const fl: IO = IO.sequence([fa1, fa2, fa3, fa4] as any[])
return fl.map(lst => f(lst[0], lst[1], lst[2], lst[3]))
}
/**
* Maps 5 `IO` values by the mapping function, returning a new
* `IO` reference that completes with the result of mapping that
* function to the successful values of the futures, or in failure in
* case either of them fails.
*
* This is a specialized {@link IO.sequence} operation and as such
* on cancellation or failure all pending tasks get cancelled.
*
* ```typescript
* const fa1 = IO.of(() => 1)
* const fa2 = IO.of(() => 2)
* const fa3 = IO.of(() => 3)
* const fa4 = IO.of(() => 4)
* const fa5 = IO.of(() => 5)
*
* // Yields Success(15)
* IO.map5(fa1, fa2, fa3, fa4, fa5,
* (a, b, c, d, e) => a + b + c + d + e
* )
*
* // Yields Failure, because the second arg is a Failure
* IO.map5(
* fa1, fa2, fa3, fa4, IO.raise("error"),
* (a, b, c, d, e) => a + b + c + d + e
* )
* ```
*/
static map5(
fa1: IO, fa2: IO, fa3: IO, fa4: IO, fa5: IO,
f: (a1: A1, a2: A2, a3: A3, a4: A4, a5: A5) => R): IO {
const fl: IO = IO.sequence([fa1, fa2, fa3, fa4, fa5] as any[])
return fl.map(lst => f(lst[0], lst[1], lst[2], lst[3], lst[4]))
}
/**
* Maps 6 `IO` values by the mapping function, returning a new
* `IO` reference that completes with the result of mapping that
* function to the successful values of the futures, or in failure in
* case either of them fails.
*
* This is a specialized {@link IO.sequence} operation and as such
* on cancellation or failure all pending tasks get cancelled.
*
* ```typescript
* const fa1 = IO.of(() => 1)
* const fa2 = IO.of(() => 2)
* const fa3 = IO.of(() => 3)
* const fa4 = IO.of(() => 4)
* const fa5 = IO.of(() => 5)
* const fa6 = IO.of(() => 6)
*
* // Yields Success(21)
* IO.map6(
* fa1, fa2, fa3, fa4, fa5, fa6,
* (a, b, c, d, e, f) => a + b + c + d + e + f
* )
*
* // Yields Failure, because the second arg is a Failure
* IO.map6(
* fa1, fa2, fa3, fa4, fa5, IO.raise("error"),
* (a, b, c, d, e, f) => a + b + c + d + e + f
* )
* ```
*/
static map6(
fa1: IO, fa2: IO, fa3: IO, fa4: IO, fa5: IO, fa6: IO,
f: (a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6) => R): IO {
const fl: IO = IO.sequence([fa1, fa2, fa3, fa4, fa5, fa6] as any[])
return fl.map(lst => f(lst[0], lst[1], lst[2], lst[3], lst[4], lst[5]))
}
/**
* Returns an `IO` that on execution is always successful,
* emitting the given strict value.
*/
static now(value: A): IO { return new IOPure(Success(value)) }
/**
* Alias for {@link IO.always}.
*/
static of(thunk: () => A): IO {
return IO.always(thunk)
}
/**
* Promote a `thunk` function to a `Coeval` that is memoized on the
* first evaluation, the result being then available on subsequent
* evaluations.
*
* Note this is equivalent with:
*
* ```typescript
* IO.always(thunk).memoize()
* ```
*/
static once(thunk: () => A): IO {
return new IOOnce(thunk, false)
}
/**
* Maps 2 `IO` values evaluated nondeterministically, returning a new
* `IO` reference that completes with the result of mapping that
* function to the successful values of the futures, or in failure in
* case either of them fails.
*
* This is a specialized {@link IO.gather} operation. As such
* the `IO` operations are potentially executed in parallel
* (if the operations are asynchronous) and on cancellation or
* failure all pending tasks get cancelled.
*
* ```typescript
* const fa1 = IO.of(() => 1)
* const fa2 = IO.of(() => 2)
*
*
* // Yields Success(3)
* IO.parMap2(fa1, fa2, (a, b) => a + b)
*
* // Yields Failure, because the second arg is a Failure
* IO.parMap2(fa1, IO.raise("error"),
* (a, b) => a + b
* )
* ```
*/
static parMap2(
fa1: IO, fa2: IO,
f: (a1: A1, a2: A2) => R): IO {
const fl: IO = IO.gather([fa1, fa2] as any[])
return fl.map(lst => f(lst[0], lst[1]))
}
/**
* Maps 3 `IO` values evaluated nondeterministically, returning a new
* `IO` reference that completes with the result of mapping that
* function to the successful values of the futures, or in failure in
* case either of them fails.
*
* This is a specialized {@link IO.gather} operation. As such
* the `IO` operations are potentially executed in parallel
* (if the operations are asynchronous) and on cancellation or
* failure all pending tasks get cancelled.
*
* ```typescript
* const fa1 = IO.of(() => 1)
* const fa2 = IO.of(() => 2)
* const fa3 = IO.of(() => 3)
*
*
* // Yields Success(6)
* IO.parMap3(fa1, fa2, fa3, (a, b, c) => a + b + c)
*
* // Yields Failure, because the second arg is a Failure
* IO.parMap3(
* fa1, fa2, IO.raise("error"),
* (a, b, c) => a + b + c
* )
* ```
*/
static parMap3(
fa1: IO, fa2: IO, fa3: IO,
f: (a1: A1, a2: A2, a3: A3) => R): IO {
const fl: IO = IO.gather([fa1, fa2, fa3] as any[])
return fl.map(lst => f(lst[0], lst[1], lst[2]))
}
/**
* Maps 4 `IO` values evaluated nondeterministically, returning a new
* `IO` reference that completes with the result of mapping that
* function to the successful values of the futures, or in failure in
* case either of them fails.
*
* This is a specialized {@link IO.gather} operation. As such
* the `IO` operations are potentially executed in parallel
* (if the operations are asynchronous) and on cancellation or
* failure all pending tasks get cancelled.
*
* ```typescript
* const fa1 = IO.of(() => 1)
* const fa2 = IO.of(() => 2)
* const fa3 = IO.of(() => 3)
* const fa4 = IO.of(() => 4)
*
* // Yields Success(10)
* IO.parMap4(fa1, fa2, fa3, fa4, (a, b, c, d) => a + b + c + d)
*
* // Yields Failure, because the second arg is a Failure
* IO.parMap4(
* fa1, fa2, fa3, IO.raise("error"),
* (a, b, c, d) => a + b + c + d
* )
* ```
*/
static parMap4(
fa1: IO, fa2: IO, fa3: IO, fa4: IO,
f: (a1: A1, a2: A2, a3: A3, a4: A4) => R): IO {
const fl: IO = IO.gather([fa1, fa2, fa3, fa4] as any[])
return fl.map(lst => f(lst[0], lst[1], lst[2], lst[3]))
}
/**
* Maps 5 `IO` values evaluated nondeterministically, returning a new
* `IO` reference that completes with the result of mapping that
* function to the successful values of the futures, or in failure in
* case either of them fails.
*
* This is a specialized {@link IO.gather} operation. As such
* the `IO` operations are potentially executed in parallel
* (if the operations are asynchronous) and on cancellation or
* failure all pending tasks get cancelled.
*
* ```typescript
* const fa1 = IO.of(() => 1)
* const fa2 = IO.of(() => 2)
* const fa3 = IO.of(() => 3)
* const fa4 = IO.of(() => 4)
* const fa5 = IO.of(() => 5)
*
* // Yields Success(15)
* IO.parMap5(fa1, fa2, fa3, fa4, fa5,
* (a, b, c, d, e) => a + b + c + d + e
* )
*
* // Yields Failure, because the second arg is a Failure
* IO.parMap5(
* fa1, fa2, fa3, fa4, IO.raise("error"),
* (a, b, c, d, e) => a + b + c + d + e
* )
* ```
*/
static parMap5(
fa1: IO, fa2: IO, fa3: IO, fa4: IO, fa5: IO,
f: (a1: A1, a2: A2, a3: A3, a4: A4, a5: A5) => R): IO {
const fl: IO = IO.gather([fa1, fa2, fa3, fa4, fa5] as any[])
return fl.map(lst => f(lst[0], lst[1], lst[2], lst[3], lst[4]))
}
/**
* Maps 6 `IO` values evaluated nondeterministically, returning a new
* `IO` reference that completes with the result of mapping that
* function to the successful values of the futures, or in failure in
* case either of them fails.
*
* This is a specialized {@link IO.gather} operation. As such
* the `IO` operations are potentially executed in parallel
* (if the operations are asynchronous) and on cancellation or
* failure all pending tasks get cancelled.
*
* ```typescript
* const fa1 = IO.of(() => 1)
* const fa2 = IO.of(() => 2)
* const fa3 = IO.of(() => 3)
* const fa4 = IO.of(() => 4)
* const fa5 = IO.of(() => 5)
* const fa6 = IO.of(() => 6)
*
* // Yields Success(21)
* IO.parMap6(
* fa1, fa2, fa3, fa4, fa5, fa6,
* (a, b, c, d, e, f) => a + b + c + d + e + f
* )
*
* // Yields Failure, because the second arg is a Failure
* IO.parMap6(
* fa1, fa2, fa3, fa4, fa5, IO.raise("error"),
* (a, b, c, d, e, f) => a + b + c + d + e + f
* )
* ```
*/
static parMap6(
fa1: IO, fa2: IO, fa3: IO, fa4: IO, fa5: IO, fa6: IO,
f: (a1: A1, a2: A2, a3: A3, a4: A4, a5: A5, a6: A6) => R): IO {
const fl: IO = IO.gather([fa1, fa2, fa3, fa4, fa5, fa6] as any[])
return fl.map(lst => f(lst[0], lst[1], lst[2], lst[3], lst[4], lst[5]))
}
/**
* Lifts a value into the `IO` context.
*
* Alias for {@link IO.now}.
*/
static pure(value: A): IO { return IO.now(value) }
/**
* Returns an `IO` that on execution is always finishing in error
* emitting the specified exception.
*/
static raise(e: Throwable): IO { return new IOPure(Failure(e)) }
/**
* Transforms a list of `IO` values into an `IO` of a list,
* ordering both results and side effects.
*
* This operation would be the equivalent of `Promise.all` or of
* `Future.sequence`, however because of the laziness of `IO`
* the given values are processed in order.
*
* Sequencing means that on evaluation the tasks won't get processed
* in parallel. If parallelism is desired, see {@link IO.gather}.
*
* Sample:
*
* ```typescript
* const io1 = IO.of(() => 1)
* const io2 = IO.of(() => 2)
* const io3 = IO.of(() => 3)
*
* // Yields [1, 2, 3]
* const all: IO = IO.sequence([f1, f2, f3])
* ```
*/
static sequence(list: IO[] | Iterable>): IO {
return ioSequence(list)
}
/**
* Nondeterministically gather results from the given collection of
* tasks, returning a task that will signal the same type of
* collection of results once all tasks are finished.
*
* This function is the nondeterministic analogue of `sequence`
* and should behave identically to `sequence` so long as there is
* no interaction between the effects being gathered. However,
* unlike `sequence`, which decides on a total order of effects,
* the effects in a `gather` are unordered with respect to each
* other.
*
* In other words `gather` can execute `IO` tasks in parallel,
* whereas {@link IO.sequence} forces an execution order.
*
* Although the effects are unordered, the order of results matches
* the order of the input sequence.
*
* ```typescript
* const io1 = IO.of(() => 1)
* const io2 = IO.of(() => 2)
* const io3 = IO.of(() => 3)
*
* // Yields [1, 2, 3]
* const all: IO = IO.gather([f1, f2, f3])
* ```
*/
static gather(list: IO[] | Iterable>): IO {
return ioListToFutureProcess(list, Future.sequence)
}
/**
* Shifts the bind continuation of the `IO` onto the specified
* scheduler, for triggering asynchronous execution.
*
* Asynchronous actions cannot be shifted, since they are scheduled
* rather than run. Also, no effort is made to re-shift synchronous
* actions which *follow* asynchronous actions within a bind chain;
* those actions will remain on the continuation call stack inherited
* from their preceding async action. The only computations which
* are shifted are those which are defined as synchronous actions and
* are contiguous in the bind chain *following* the `shift`.
*
* For example this sample forces an asynchronous boundary
* (which usually means that the continuation is scheduled
* for asynchronous execution with `setTimeout`) before the
* file will be read synchronously:
*
* ```typescript
* IO.shift().flatMap(_ => fs.readFileSync(path))
* ```
*
* On the other hand in this example the asynchronous boundary
* is inserted *after* the file has been read:
*
* ```typescript
* IO.of(() => fs.readFileSync(path)).flatMap(content =>
* IO.shift().map(_ => content))
* ```
*
* The definition of {@link IO.async} is literally:
*
* ```typescript
* source.flatMap(a => IO.shift(ec).map(_ => a))
* ```
*
* And the definition of {@link IO.fork} is:
*
* ```typescript
* IO.shift(ec).flatMap(_ => source)
* ```
*
* @param ec is the `Scheduler` used for triggering the async
* boundary, or if not provided it will default to the
* scheduler passed on evaluation in {@link IO.run}
*/
static shift(ec?: Scheduler): IO {
if (!ec) return ioShiftDefaultRef
return ioShift(ec)
}
/**
* Promote a `thunk` function generating `IO` results to an `IO`
* of the same type.
*/
static suspend(thunk: () => IO): IO {
return IO.unit().flatMap(() => thunk())
}
/**
* Keeps calling `f` until a `Right(b)` is returned.
*
* Based on Phil Freeman's
* [Stack Safety for Free]{@link http://functorial.com/stack-safety-for-free/index.pdf}.
*
* Described in `FlatMap.tailRecM`.
*/
static tailRecM(a: A, f: (a: A) => IO>): IO {
try {
return f(a).flatMap(either => {
if (either.isRight()) {
return IO.now(either.get())
} else {
// Recursive call
return IO.tailRecM(either.swap().get(), f)
}
})
} catch (e) {
return IO.raise(e)
}
}
/**
* Shorthand for `now(undefined as void)`, always returning
* the same reference as optimization.
*/
static unit(): IO {
return ioUnitRef
}
/**
* Unsafe utility - starts the execution of an `IO`.
*
* This function allows for specifying a custom {@link IOContext}
* when evaluating the `IO` reference.
*
* DO NOT use directly, as it is UNSAFE to use, unless you know
* what you're doing. Prefer {@link IO.run} instead.
*/
static unsafeStart(source: IO, context: IOContext, cb: (r: Try) => void): void | ICancelable {
return ioGenericRunLoop(source, context.scheduler, context, cb, null, null, null)
}
}
/**
* `Pure` is an internal `IO` state that wraps any strict
* value in an `IO` reference. Returned by {@link IO.now}
* and {@link IO.raise}.
*
* @private
*/
class IOPure extends IO {
readonly _tag: "pure" = "pure"
/**
* @param value is the value that's going to be returned
* when `get()` is called.
*/
constructor(public value: Try) { super() }
}
/**
* Reusable reference, to use in {@link IO.unit}.
*
* @private
*/
const ioUnitRef: IOPure = new IOPure(Try.unit())
/**
* `Once` is an internal `IO` state that executes the given `thunk`
* only once, upon calling `get()` and then memoize its result for
* subsequent invocations.
*
* Returned by [[IO.once]].
*
* @private
*/
class IOOnce extends IO {
readonly _tag: "once" = "once"
private _thunk: () => A
public cache!: Try
public onlyOnSuccess: boolean
constructor(thunk: () => A, onlyOnSuccess: boolean) {
super()
this._thunk = thunk
this.onlyOnSuccess = onlyOnSuccess
}
memoize(): IO {
if (this.onlyOnSuccess && this._thunk)
return new IOOnce(this._thunk, false)
else
return this
}
runTry(): Try {
if (this._thunk) {
const result = Try.of(this._thunk)
if (result.isSuccess() || !this.onlyOnSuccess) {
// GC purposes
delete this._thunk
delete this.onlyOnSuccess
this.cache = result
}
return result
}
return this.cache
}
}
/**
* `Always` is an internal `IO` state that executes the given `thunk`
* every time the call to `get()` happens. Returned by [[IO.always]].
*
* @private
*/
class IOAlways extends IO {
readonly _tag: "always" = "always"
constructor(public thunk: () => A) { super() }
}
/**
* `FlatMap` is an internal `IO` state that represents a
* [[IO.flatMap .flatMap]], [[IO.map .map]], [[IO.transform .transform]]
* or a [[IO.transformWith .transformWith]] operation, all of them
* being expressed with this state.
*
* @private
*/
class IOFlatMap extends IO {
readonly _tag: "flatMap" = "flatMap"
constructor(
public readonly source: IO,
public readonly f: ((a: A) => IO),
public readonly g?: ((e: Throwable) => IO)) { super() }
}
/**
* Type alias representing registration callbacks for tasks
* created with `asyncUnsafe`, that are going to get executed
* when the asynchronous task gets evaluated.
*/
export type IORegister =
(context: IOContext, callback: (result: Try) => void) => void
/**
* Constructs a lazy [[IO]] instance whose result will
* be computed asynchronously.
*
* Unsafe to build directly, only use if you know what you're doing.
* For building `Async` instances safely, see {@link IO.async}.
*
* @private
* @hidden
*/
class IOAsync extends IO {
readonly _tag: "async" = "async"
constructor(public readonly register: IORegister) { super() }
}
class IOMemoize extends IO {
readonly _tag: "memoize" = "memoize"
public result: Try | Future | null
public source?: IO
public readonly onlySuccess: boolean
constructor(source: IO, onlySuccess: boolean) {
super()
this.source = source
this.result = null
this.onlySuccess = onlySuccess
}
}
/**
* The `Context` under which {@link IO} is supposed to be executed.
*
* This definition is of interest only when creating
* tasks with {@link IO.asyncUnsafe}, which exposes internals and
* is considered unsafe to use.
*
* @final
*/
export class IOContext {
/**
* The `Scheduler` in charge of evaluating asynchronous boundaries
* on `run`.
*/
public readonly scheduler: Scheduler
/**
* Is the `StackedCancelable` that accumulates cancelable
* actions, to be triggered if cancellation happens.
*/
public readonly connection: StackedCancelable
/**
* Options passed to the run-loop implementation, determining
* its behavior. See {@link IOOptions} for the available
* options.
*/
public readonly options: IOOptions
constructor(
scheduler: Scheduler,
connection: StackedCancelable = new StackedCancelable(),
options: IOOptions = { autoCancelableRunLoops: false }) {
this.scheduler = scheduler
this.options = options
this.connection = connection
// Enables auto-cancelable run-loops
if (options.autoCancelableRunLoops)
this.shouldCancel = () => connection.isCanceled()
}
/**
* Resets the stored `frameIndex`.
*
* Calling this method inside the logic of a {@link IO.asyncUnsafe}
* lets the run-loop know that an async boundary happened. This
* works in tandem with the logic for `ExecutionModel.batched(n)`,
* for better detection of synchronous cycles, to avoid introducing
* forced async boundaries where not needed.
*/
markAsyncBoundary(): void {
this.scheduler.batchIndex = 0
}
/**
* Returns `true` in case the run-loop should be canceled,
* but this can only happen if `autoCancelableRunLoops` is
* set to `true`.
*/
shouldCancel(): boolean { return false }
}
/**
* Set of options for customizing IO's behavior.
*
* @param autoCancelableRunLoops should be set to `true` in
* case you want `flatMap` driven loops to be
* auto-cancelable. Defaults to `false` because of
* safety concerns.
*/
export type IOOptions = {
autoCancelableRunLoops: boolean
}
/**
* Type enumerating the type classes implemented by `Io`.
*/
export type IOTypes = Monad<"funfix/io">
/**
* Type-class implementations, compatible with the `static-land`
* specification.
*/
export const IOModule: IOTypes = {
// Functor
map: (f: (a: A) => B, fa: IO) =>
fa.map(f),
// Apply
ap: (ff: IO<(a: A) => B>, fa: IO): IO =>
fa.ap(ff),
// Applicative
of: IO.pure,
// Chain
chain: (f: (a: A) => IO, fa: IO): IO =>
fa.flatMap(f),
// ChainRec
chainRec: (f: (next: (a: A) => C, done: (b: B) => C, a: A) => IO, a: A): IO =>
IO.tailRecM(a, a => f(Either.left as any, Either.right as any, a))
}
// Registers Fantasy-Land compatible symbols
coreInternals.fantasyLandRegister(IO, IOModule)
/** @hidden */
function ioShift(ec?: Scheduler): IO {
return IO.asyncUnsafe((ctx, cb) => {
(ec || ctx.scheduler).executeAsync(() => cb(Try.unit()))
})
}
/** @hidden */
const ioShiftDefaultRef: IO = ioShift()
/** @hidden */
type Current = IO
/** @hidden */
type Bind = ((a: any) => IO)
/** @hidden */
type BindT = Bind | [Bind, Bind]
/** @hidden */
type CallStack = Array
/** @hidden */
function _ioPopNextBind(bFirst: BindT | null, bRest: CallStack | null): Bind | null {
let f: Bind | [Bind, Bind] | null | undefined = undefined
if (bFirst) f = bFirst
else if (bRest && bRest.length > 0) f = bRest.pop()
if (f) return typeof f === "function" ? f : f[0]
return null
}
/** @hidden */
function _ioFindErrorHandler(bFirst: BindT | null, bRest: CallStack | null): Bind | null {
let cursor: any = bFirst
do {
if (cursor && typeof cursor !== "function") return cursor[1]
cursor = bRest ? bRest.pop() : null
} while (cursor)
return null
}
/**
* We need to build a callback on each cycle involving an `IOAsync`
* state. This class builds a mutable callback to reuse on each
* cycle in order to reduce GC pressure.
*
* @hidden
* @final
*/
class RestartCallback {
private canCall = false
private bFirst: BindT | null = null
private bRest: CallStack | null = null
public readonly asFunction: (result: Try) => void
constructor(
private context: IOContext,
private callback: (r: Try) => void) {
this.asFunction = this.signal.bind(this)
}
prepare(bFirst: BindT | null, bRest: CallStack | null) {
this.bFirst = bFirst
this.bRest = bRest
this.canCall = true
}
signal(result: Try): void {
if (this.canCall) {
this.canCall = false
ioGenericRunLoop(
new IOPure(result),
this.context.scheduler,
this.context,
this.callback,
this,
this.bFirst,
this.bRest
)
} else if (result.isFailure()) {
this.context.scheduler.reportFailure(result.failed().get())
}
}
}
/** @hidden */
function ioExecuteAsync(
register: IORegister,
context: IOContext,
cb: (result: Try) => void,
rcb: RestartCallback | null,
bFirst: BindT | null,
bRest: CallStack | null,
frameIndex: number) {
if (!context.shouldCancel()) {
context.scheduler.batchIndex = frameIndex
const restart = rcb || new RestartCallback(context, cb)
restart.prepare(bFirst, bRest)
register(context, restart.asFunction)
}
}
/** @hidden */
function ioRestartAsync(
start: IO,
context: IOContext,
cb: (result: Try) => void,
rcb: RestartCallback | null,
bFirstInit: BindT | null,
bRestInit: CallStack | null): void {
if (!context.shouldCancel())
context.scheduler.executeAsync(() => {
ioGenericRunLoop(start, context.scheduler, context, cb, rcb, bFirstInit, bRestInit)
})
}
/** @hidden */
function ioGenericRunLoop(
start: IO,
scheduler: Scheduler,
context: IOContext | null,
cb: (result: Try) => void,
rcb: RestartCallback | null,
bFirstInit: BindT | null,
bRestInit: CallStack | null): ICancelable | void {
let current: Current | Try = start
let bFirst: BindT | null = bFirstInit
let bRest: CallStack | null = bRestInit
const modulus = scheduler.executionModel.recommendedBatchSize - 1
let frameIndex = scheduler.batchIndex
while (true) {
if (current instanceof Try) {
if (current.isSuccess()) {
const bind = _ioPopNextBind(bFirst, bRest)
if (!bind) {
scheduler.batchIndex = frameIndex
return cb(current)
}
try {
current = bind(current.get())
} catch (e) {
current = Try.failure(e)
}
} else {
const bind = _ioFindErrorHandler(bFirst, bRest)
if (!bind) {
scheduler.batchIndex = frameIndex
return cb(current)
}
try {
current = bind((current as Try).failed().get())
} catch (e) {
current = Try.failure(e)
}
}
bFirst = null
const nextIndex = (frameIndex + 1) & modulus
// Should we force an asynchronous boundary?
if (nextIndex) {
frameIndex = nextIndex
} else {
const ctx = context || new IOContext(scheduler)
/* istanbul ignore next */
const boxed = current instanceof Try ? new IOPure(current) : current
ioRestartAsync(boxed, ctx, cb, rcb, bFirst, bRest)
return ctx.connection
}
}
else switch (current._tag) {
case "pure":
current = (current as IOPure).value
break
case "always":
current = Try.of((current as IOAlways).thunk)
break
case "once":
current = (current as IOOnce).runTry()
break
case "flatMap":
const flatM: IOFlatMap = current as any
if (bFirst) {
if (!bRest) bRest = []
bRest.push(bFirst)
}
bFirst = !flatM.g ? flatM.f : [flatM.f, flatM.g]
current = flatM.source
break
case "async":
const async: IOAsync = current as any
const ctx = context || new IOContext(scheduler)
ioExecuteAsync(async.register, ctx, cb, rcb, bFirst, bRest, frameIndex)
return ctx.connection
case "memoize":
const mem: IOMemoize = current as any
return ioStartMemoize(mem, scheduler, context, cb, bFirst, bRest, frameIndex)
}
}
}
/** @hidden */
function ioToFutureGoAsync(
start: IO,
scheduler: Scheduler,
bFirst: BindT | null,
bRest: CallStack | null,
forcedAsync: boolean): Future {
return Future.create(cb => {
const ctx = new IOContext(scheduler)
if (forcedAsync)
ioRestartAsync(start as any, ctx, cb as any, null, bFirst, bRest)
else
ioGenericRunLoop(start as any, scheduler, ctx, cb as any, null, bFirst, bRest)
return ctx.connection
})
}
/** @hidden */
function taskToFutureRunLoop(
start: IO,
scheduler: Scheduler): Future {
let current: Current | Try = start
let bFirst: BindT | null = null
let bRest: CallStack | null = null
const modulus = scheduler.executionModel.recommendedBatchSize - 1
let frameIndex = scheduler.batchIndex
while (true) {
if (current instanceof Try) {
if (current.isSuccess()) {
const bind = _ioPopNextBind(bFirst, bRest)
if (!bind) {
scheduler.batchIndex = frameIndex
return Future.pure(current.get())
}
try {
current = bind(current.get())
} catch (e) {
current = new IOPure(Try.failure(e))
}
} else {
const err = (current as Try).failed().get()
const bind = _ioFindErrorHandler(bFirst, bRest)
if (!bind) {
scheduler.batchIndex = frameIndex
return Future.raise(err)
}
try {
current = bind(err)
} catch (e) {
current = new IOPure(Try.failure(e))
}
}
bFirst = null
const nextIndex = (frameIndex + 1) & modulus
// Should we force an asynchronous boundary?
if (nextIndex) {
frameIndex = nextIndex
} else {
return ioToFutureGoAsync(current, scheduler, bFirst, bRest, true)
}
}
else switch (current._tag) {
case "pure":
current = (current as IOPure).value
break
case "always":
current = Try.of((current as IOAlways).thunk)
break
case "once":
current = (current as IOOnce).runTry()
break
case "flatMap":
const flatM: IOFlatMap = current as any
if (bFirst) {
if (!bRest) bRest = []
bRest.push(bFirst)
}
bFirst = !flatM.g ? flatM.f : [flatM.f, flatM.g]
current = flatM.source
break
case "async":
case "memoize":
return ioToFutureGoAsync(current, scheduler, bFirst, bRest, false)
}
}
}
/**
* Internal utility used in the implementation of `IO.async`.
*
* @hidden
*/
function ioSafeCallback(
ec: Scheduler,
conn: StackedCancelable,
cb: (r: Try) => void): ((r: Try) => void) {
let called = false
return (r: Try) => {
if (!called) {
called = true
// Inserting a light async boundary, otherwise we can have
// stack overflow issues, but also ordering issues with
// StackedCancelable.push in IO.async!
ec.trampoline(() => {
conn.pop()
cb(r)
})
} else if (r.isFailure()) {
ec.reportFailure(r.failed().get())
}
}
}
/** @hidden */
function ioStartMemoize(
fa: IOMemoize,
ec: Scheduler,
context: IOContext | null,
cb: (r: Try) => void,
bFirstInit: BindT | null,
bRestInit: CallStack | null,
frameIndex: number): ICancelable | void {
// Storing the current frameIndex because invoking this
// function effectively ends the current run-loop
ec.batchIndex = frameIndex
// The state that we'll use for subscribing listeners below
let state: Try | Future
// The first evaluation has to trigger the initial run-loop that
// will eventually set our completed state
if (fa.result) {
state = fa.result
} else {
// NOTE this isn't using the passed `IOContext`, or the bindings
// stack because it would be wrong. This has to be executed
// independently, within its own context.
const f = ioToFutureGoAsync(fa.source as any, ec, null, null, false)
if (f.value().isEmpty()) {
fa.result = f
state = f
f.onComplete(r => {
if (r.isSuccess() || !fa.onlySuccess) {
// Caching result for subsequent listeners
fa.result = r as any
// GC purposes
delete fa.source
} else {
// Reverting the state to the original IO reference, such
// that it can be retried again
fa.result = null
}
})
} else {
state = (f.value().get() as any) as Try
// Not storing the state on memoizeOnSuccess if it's a failure
if (state.isSuccess() || !fa.onlySuccess)
fa.result = state as any
}
}
// We have the IOMemoize in an already completed state,
// so running with it
const io: IO = state instanceof Try
? new IOPure(state)
: IO.fromFuture(state)
ioGenericRunLoop(io, ec, context, cb, null, bFirstInit, bRestInit)
}
/**
* Implementation for `IO.sequence`.
* @hidden
*/
function ioSequence(list: IO[] | Iterable>): IO {
return IO.of(() => iteratorOf(list))
.flatMap(cursor => ioSequenceLoop([], cursor))
}
/**
* Recursive loop that goes through the given `cursor`, element by
* element, gathering the results of all generated `IO` elements.
*
* @hidden
*/
function ioSequenceLoop(acc: A[], cursor: IteratorLike>): IO {
while (true) {
const elem = cursor.next()
const isDone = elem.done
if (elem.value) {
const io: IO = elem.value
return io.flatMap(a => {
acc.push(a)
if (isDone) return IO.pure(acc)
return ioSequenceLoop(acc, cursor)
})
} else {
/* istanbul ignore else */
if (isDone) return IO.pure(acc)
}
}
}
/** @hidden */
function ioListToFutureProcess(list: IO[] | Iterable>, f: (list: Future[], ec: Scheduler) => Future): IO {
return IO.asyncUnsafe((ctx, cb) => {
ctx.scheduler.trampoline(() => {
let streamErrors = true
try {
const futures: Future[] = []
const array: IO[] = execInternals.iterableToArray(list)
streamErrors = false
for (let i = 0; i < array.length; i++) {
const io = array[i]
const f = io.run(ctx.scheduler)
futures.push(f)
}
const all = f(futures, ctx.scheduler)
ctx.connection.push(all)
all.onComplete(ioSafeCallback(ctx.scheduler, ctx.connection, cb) as any)
} catch (e) {
/* istanbul ignore else */
if (streamErrors) cb(Failure(e))
else ctx.scheduler.reportFailure(e)
}
})
})
}