// ets_tracing: off /* eslint-disable prefer-const */ import * as Tp from "../Collections/Immutable/Tuple/index.js" import { _A, _E, _R, _U } from "../Effect/commons.js" import * as E from "../Either/index.js" import { pipe } from "../Function/index.js" import type { Option } from "../Option/index.js" import { Stack } from "../Stack/index.js" import type * as U from "../Utils/index.js" /** * `Async[R, E, A]` is a purely functional description of an async computation * that requires an environment `R` and may either fail with an `E` or succeed * with an `A`. */ export interface Async extends U.HasUnify {} export abstract class Async { readonly [_U]!: "Async"; readonly [_E]!: () => E; readonly [_A]!: () => A; readonly [_R]!: (_: R) => void } export interface UIO extends Async {} export interface RIO extends Async {} export interface IO extends Async {} /** * @ets_optimize identity */ function concrete(_: Async): Concrete { return _ as any } class ISucceed extends Async { readonly _asyncTag = "Succeed" constructor(readonly a: A) { super() } } class ISuspend extends Async { readonly _asyncTag = "Suspend" constructor(readonly f: () => Async) { super() } } class IFail extends Async { readonly _asyncTag = "Fail" constructor(readonly e: E) { super() } } class IFlatMap extends Async { readonly _asyncTag = "FlatMap" constructor( readonly value: Async, readonly cont: (a: A) => Async ) { super() } } class IFold extends Async { readonly _asyncTag = "Fold" constructor( readonly value: Async, readonly failure: (e: E1) => Async, readonly success: (a: A) => Async ) { super() } } class IAccess extends Async { readonly _asyncTag = "Access" constructor(readonly access: (r: R) => Async) { super() } } class IProvide extends Async { readonly _asyncTag = "Provide" constructor(readonly r: R, readonly cont: Async) { super() } } class IPromise extends Async { readonly _asyncTag = "Promise" constructor( readonly promise: (onInterrupt: (f: () => void) => void) => Promise, readonly onError: (u: unknown) => E ) { super() } } class IDone extends Async { readonly _asyncTag = "Done" constructor(readonly exit: Exit) { super() } } type Concrete = | ISucceed | IFail | IFlatMap | IFold | IAccess | IProvide | ISuspend | IPromise | IDone class FoldFrame { readonly _asyncTag = "FoldFrame" constructor( readonly failure: (e: any) => Async, readonly apply: (e: any) => Async ) {} } class ApplyFrame { readonly _asyncTag = "ApplyFrame" constructor(readonly apply: (e: any) => Async) {} } type Frame = FoldFrame | ApplyFrame /** * Models the state of interruption, allows for listening to interruption events & firing interruption events */ export class InterruptionState { private isInterrupted = false readonly listeners = new Set<() => void>() // listen to an interruption event listen(f: () => void) { this.listeners.add(f) return () => { // stop listening this.listeners.delete(f) } } get interrupted() { return this.isInterrupted } interrupt() { if (!this.isInterrupted) { // set to interrupted this.isInterrupted = true // notify this.listeners.forEach((i) => { i() }) } } } export interface Failure { readonly _tag: "Failure" e: E } export interface Interrupt { readonly _tag: "Interrupt" } export interface Success { readonly _tag: "Success" a: A } export type Rejection = Failure | Interrupt export type Exit = Rejection | Success export const failExit = (e: E): Rejection => ({ _tag: "Failure", e }) export const interruptExit = >{ _tag: "Interrupt" } export const successExit = (a: A): Exit => ({ _tag: "Success", a }) /** * Models a cancellable promise */ class CancelablePromise { // holds the type information of E readonly _E!: () => E // gets called with a Rejection, any here is to not break covariance imposed by _E private rejection: ((e: Rejection) => void) | undefined = undefined // holds the current running promise private current: Promise | undefined = undefined constructor( // creates the promise readonly promiseFactory: (onInterrupt: (f: () => void) => void) => Promise, // listens for interruption events readonly is: InterruptionState ) {} // creates the computation linking it to the interruption state readonly promise: () => Promise = () => { if (this.current) { throw new Error("Bug: promise() have been called twice") } else if (this.is.interrupted) { throw new Error("Bug: trying to create a promise already interrupted") } else { const onInterrupt = <(() => void)[]>[] // we record the current interrupt in the interruption registry const removeListener = this.is.listen(() => { onInterrupt.forEach((f) => { f() }) this.interrupt() }) const p = new Promise((res, rej) => { // set the rejection handler this.rejection = rej // creates the underlying promise this.promiseFactory((f) => { onInterrupt.push(f) }) .then((a) => { // removes the call to interrupt from the interruption registry removeListener() // if not interrupted we continue if (!this.is.interrupted) { res(a) } }) .catch((e) => { // removes the call to interrupt from the interruption registry removeListener() // if not interrupted we continue if (!this.is.interrupted) { rej(e) } }) }) // track the current running promise to avoid re-creation this.current = p // return the promise return p } } readonly interrupt = () => { // triggeres a promise rejection on the current promise with an interrupt exit this.rejection?.(interruptExit as any) } } export class Tracer { private running = new Set>() constructor() { this.traced = this.traced.bind(this) this.wait = this.wait.bind(this) this.clear = this.clear.bind(this) } // tracks a lazy promise lifetime traced(promise: () => Promise) { return async () => { const p = promise() this.running.add(p) try { const a = await p this.running.delete(p) return Promise.resolve(a) } catch (e) { this.running.delete(p) return Promise.reject(e) } } } // awaits for all the running promises to complete async wait(): Promise[]> { const t = await Promise.all( Array.from(this.running).map((p) => p.then((a) => successExit(a)).catch((e) => Promise.resolve(e)) ) ) return await new Promise((r) => { setTimeout(() => { r(t) }, 0) }) } // clears itself clear() { this.running.clear() } } // create the root tracing context export const tracingContext = new Tracer() /** * Runs this computation with the specified initial state, returning either a * failure or the updated state and the result */ export function runPromiseExitEnv( self: Async, ri: R, is: InterruptionState = new InterruptionState() ): Promise> { return tracingContext.traced(async () => { let stack: Stack | undefined = undefined let a = null let r = ri let failed = false let curAsync = self as Async | undefined let cnt = 0 let interruptedLocal = false function isInterruted() { return interruptedLocal || is.interrupted } function pop() { const nextInstr = stack if (nextInstr) { stack = stack?.previous } return nextInstr?.value } function push(cont: Frame) { stack = new Stack(cont, stack) } function findNextErrorHandler() { let unwinding = true while (unwinding) { const nextInstr = pop() if (nextInstr == null) { unwinding = false } else { if (nextInstr._asyncTag === "FoldFrame") { unwinding = false push(new ApplyFrame(nextInstr.failure)) } } } } while (curAsync != null && !isInterruted()) { if (cnt > 10_000) { await new Promise((r) => { setTimeout(() => { r(undefined) }, 0) }) cnt = 0 } cnt += 1 const xp = concrete(curAsync) switch (xp._asyncTag) { case "FlatMap": { const nested = concrete(xp.value) const continuation = xp.cont switch (nested._asyncTag) { case "Succeed": { curAsync = continuation(nested.a) break } default: { curAsync = nested push(new ApplyFrame(continuation)) } } break } case "Suspend": { curAsync = xp.f() break } case "Succeed": { a = xp.a const nextInstr = pop() if (nextInstr) { curAsync = nextInstr.apply(a) } else { curAsync = undefined } break } case "Fail": { findNextErrorHandler() const nextInst = pop() if (nextInst) { curAsync = nextInst.apply(xp.e) } else { failed = true a = xp.e curAsync = undefined } break } case "Fold": { curAsync = xp.value push(new FoldFrame(xp.failure, xp.success)) break } case "Done": { switch (xp.exit._tag) { case "Failure": { curAsync = new IFail(xp.exit.e) break } case "Interrupt": { interruptedLocal = true curAsync = undefined break } case "Success": { curAsync = new ISucceed(xp.exit.a) break } } break } case "Access": { curAsync = xp.access(r) break } case "Provide": { r = xp.r curAsync = xp.cont break } case "Promise": { try { curAsync = new ISucceed( await new CancelablePromise( (s) => xp.promise(s).catch((e) => Promise.reject(failExit(xp.onError(e)))), is ).promise() ) } catch (e) { const e_ = >e switch (e_._tag) { case "Failure": { curAsync = new IFail(e_.e) break } case "Interrupt": { interruptedLocal = true curAsync = undefined break } } } break } } } if (is.interrupted) { return interruptExit } if (failed) { return failExit(a) } return successExit(a) })() } export function runPromiseExit( self: Async, is: InterruptionState = new InterruptionState() ): Promise> { return runPromiseExitEnv(self, {}, is) } // runs as a Promise of an Exit export async function runPromise( task: Async, is = new InterruptionState() ): Promise { return runPromiseExit(task, is).then((e) => e._tag === "Failure" ? Promise.reject(e.e) : e._tag === "Interrupt" ? Promise.reject(e) : Promise.resolve(e.a) ) } // runs as a Cancellable export function runAsync( task: Async, cb?: (e: Exit) => void ) { const is = new InterruptionState() runPromiseExit(task, is).then(cb) return () => { is.interrupt() } } // runs as a Cancellable export function runAsyncEnv( task: Async, r: R, cb?: (e: Exit) => void ) { const is = new InterruptionState() runPromiseExitEnv(task, r, is).then(cb) return () => { is.interrupt() } } /** * Extends this computation with another computation that depends on the * result of this computation by running the first computation, using its * result to generate a second computation, and running that computation. * * @ets_data_first chain_ */ export function chain(f: (a: A) => Async) { return (self: Async): Async => new IFlatMap(self, f) } /** * Extends this computation with another computation that depends on the * result of this computation by running the first computation, using its * result to generate a second computation, and running that computation. */ export function chain_( self: Async, f: (a: A) => Async ): Async { return new IFlatMap(self, f) } /** * Returns a computation that effectfully "peeks" at the success of this one. * * @ets_data_first tap_ */ export function tap(f: (a: A) => Async) { return (self: Async): Async => tap_(self, f) } /** * Returns a computation that effectfully "peeks" at the success of this one. */ export function tap_( self: Async, f: (a: A) => Async ): Async { return chain_(self, (a) => map_(f(a), () => a)) } /** * Constructs a computation that always succeeds with the specified value. */ export function succeed(a: A): Async { return new ISucceed(a) } /** * Constructs a computation that always succeeds with the specified value, * passing the state through unchanged. */ export function fail(a: E): Async { return new IFail(a) } /** * Extends this computation with another computation that depends on the * result of this computation by running the first computation, using its * result to generate a second computation, and running that computation. */ export function map_(self: Async, f: (a: A) => B) { return chain_(self, (a) => succeed(f(a))) } /** * Extends this computation with another computation that depends on the * result of this computation by running the first computation, using its * result to generate a second computation, and running that computation. * * @ets_data_first map_ */ export function map(f: (a: A) => B) { return (self: Async) => map_(self, f) } /** * Recovers from errors by accepting one computation to execute for the case * of an error, and one computation to execute for the case of success. */ export function foldM_( self: Async, failure: (e: E) => Async, success: (a: A) => Async ): Async { return new IFold( self as Async, failure as (e: E) => Async, success ) } /** * Recovers from errors by accepting one computation to execute for the case * of an error, and one computation to execute for the case of success. * * @ets_data_first foldM_ */ export function foldM( failure: (e: E) => Async, success: (a: A) => Async ) { return (self: Async) => foldM_(self, failure, success) } /** * Folds over the failed or successful results of this computation to yield * a computation that does not fail, but succeeds with the value of the left * or right function passed to `fold`. * * @ets_data_first fold_ */ export function fold(failure: (e: E) => B, success: (a: A) => C) { return (self: Async) => fold_(self, failure, success) } /** * Folds over the failed or successful results of this computation to yield * a computation that does not fail, but succeeds with the value of the left * or righr function passed to `fold`. */ export function fold_( self: Async, failure: (e: E) => B, success: (a: A) => C ): Async { return foldM_( self, (e) => succeed(failure(e)), (a) => succeed(success(a)) ) } /** * Recovers from all errors. * * @ets_data_first catchAll_ */ export function catchAll(failure: (e: E) => Async) { return (self: Async) => catchAll_(self, failure) } /** * Recovers from all errors. */ export function catchAll_( self: Async, failure: (e: E) => Async ) { return foldM_(self, failure, (a) => succeed(a)) } /** * Returns a computation whose error and success channels have been mapped * by the specified functions, `f` and `g`. * * @ets_data_first bimap_ */ export function bimap(f: (e: E) => E1, g: (a: A) => A1) { return (self: Async) => bimap_(self, f, g) } /** * Returns a computation whose error and success channels have been mapped * by the specified functions, `f` and `g`. */ export function bimap_( self: Async, f: (e: E) => E1, g: (a: A) => A1 ) { return foldM_( self, (e) => fail(f(e)), (a) => succeed(() => g(a)) ) } /** * Transforms the error type of this computation with the specified * function. * * @ets_data_first mapError_ */ export function mapError(f: (e: E) => E1) { return (self: Async) => mapError_(self, f) } /** * Transforms the error type of this computation with the specified * function. */ export function mapError_(self: Async, f: (e: E) => E1) { return catchAll_(self, (e) => fail(f(e))) } /** * Constructs a computation that always returns the `Unit` value, passing the * state through unchanged. */ export const unit = succeed(undefined) /** * Transforms the initial state of this computation` with the specified * function. */ export function provideSome(f: (s: R0) => R1) { return (self: Async) => accessM((r: R0) => provideAll(f(r))(self)) } /** * Provides this computation with its required environment. * * @ets_data_first provideAll_ */ export function provideAll(r: R) { return (self: Async): Async => new IProvide(r, self) } /** * Provides this computation with its required environment. */ export function provideAll_(self: Async, r: R): Async { return new IProvide(r, self) } /** * Provides some of the environment required to run this effect, * leaving the remainder `R0` and combining it automatically using spread. */ export function provide(r: R) { return (next: Async): Async => provideSome((r0: R0) => ({ ...r0, ...r }))(next) } /** * Access the environment monadically */ export function accessM( f: (_: R) => Async ): Async { return new IAccess(f) } /** * Access the environment with the function f */ export function access(f: (_: R) => A): Async { return accessM((r: R) => succeed(f(r))) } /** * Access the environment */ export function environment(): Async { return accessM((r: R) => succeed(r)) } /** * Returns a computation whose failure and success have been lifted into an * `Either`. The resulting computation cannot fail, because the failure case * has been exposed as part of the `Either` success case. */ export function either(self: Async): Async> { return fold_(self, E.left, E.right) } /** * Executes this computation and returns its value, if it succeeds, but * otherwise executes the specified computation. * * @ets_data_first orElseEither_ */ export function orElseEither(that: () => Async) { return (self: Async): Async> => orElseEither_(self, that) } /** * Executes this computation and returns its value, if it succeeds, but * otherwise executes the specified computation. */ export function orElseEither_( self: Async, that: () => Async ): Async> { return foldM_( self, () => map_(that(), (a) => E.right(a)), (a) => succeed(E.left(a)) ) } /** * Combines this computation with the specified computation, passing the * updated state from this computation to that computation and combining the * results of both using the specified function. * * @ets_data_first zipWith_ */ export function zipWith(that: Async, f: (a: A, b: B) => C) { return (self: Async): Async => zipWith_(self, that, f) } /** * Combines this computation with the specified computation, passing the * updated state from this computation to that computation and combining the * results of both using the specified function. */ export function zipWith_( self: Async, that: Async, f: (a: A, b: B) => C ) { return chain_(self, (a) => map_(that, (b) => f(a, b))) } /** * Combines this computation with the specified computation, passing the * updated state from this computation to that computation and combining the * results of both into a tuple. * * @ets_data_first zip_ */ export function zip(that: Async) { return (self: Async) => zip_(self, that) } /** * Combines this computation with the specified computation, passing the * updated state from this computation to that computation and combining the * results of both into a tuple. */ export function zip_(self: Async, that: Async) { return zipWith_(self, that, Tp.tuple) } /** * Suspend a computation, useful in recursion */ export function suspend(f: () => Async): Async { return new ISuspend(f) } /** * Lift a sync (non failable) computation */ export function succeedWith(f: () => A) { return suspend(() => succeed(f())) } /** * Lift a sync (non failable) computation */ export function tryCatch(f: () => A, onThrow: (u: unknown) => E) { return suspend(() => { try { return succeed(f()) } catch (u) { return fail(onThrow(u)) } }) } // construct from a promise export function promise( promise: (onInterrupt: (f: () => void) => void) => Promise, onError: (u: unknown) => E ): Async { return new IPromise(promise, onError) } // construct from a non failable promise export function unfailable( promise: (onInterrupt: (f: () => void) => void) => Promise ): Async { return new IPromise(promise, () => undefined as never) } // construct a Task from an exit value export function done(exit: Exit): Async { return new IDone(exit) } // like .then in Promise when the result of f is a Promise but ignores the outout of f // useful for logging or doing things that should not change the result export function tapError(f: (_: EA) => Async) { return (self: Async) => pipe( self, catchAll((e) => pipe( f(e), chain((_) => fail(e)) ) ) ) } // sleeps for ms milliseconds export function sleep(ms: number) { return unfailable( (onInterrupt) => new Promise((res) => { const timer = setTimeout(() => { res(undefined) }, ms) onInterrupt(() => { clearTimeout(timer) }) }) ) } // delay the computation prepending a sleep of ms milliseconds export function delay(ms: number) { return (self: Async) => pipe( sleep(ms), chain(() => self) ) } // list an Either export function fromEither(e: E.Either) { return e._tag === "Right" ? succeed(e.right) : fail(e.left) } /** * Compact the union produced by the result of f * * @ets_optimize identity */ export function unionFn>( _: (...args: ARGS) => Ret ): (...args: ARGS) => Async, U._E, U._A> { return _ as any } /** * Compact the union * * @ets_optimize identity */ export function union>( _: Ret ): Async, U._E, U._A> { return _ as any } /** * Get the A from an option */ export default function tryCatchOption_(ma: Option, onNone: () => E) { return pipe(E.fromOption_(ma, onNone), fromEither) } /** * Get the A from an option * * @ets_data_first tryCatchOption_ */ export function tryCatchOption(onNone: () => E) { return (ma: Option) => tryCatchOption_(ma, onNone) }