import * as Cause from "../../Cause.js" import * as Context from "../../Context.js" import * as Effect from "../../Effect.js" import * as Either from "../../Either.js" import * as Equal from "../../Equal.js" import * as Exit from "../../Exit.js" import type * as FiberId from "../../FiberId.js" import * as FiberRef from "../../FiberRef.js" import type { LazyArg } from "../../Function.js" import { constVoid, dual, pipe } from "../../Function.js" import * as Hash from "../../Hash.js" import type * as Option from "../../Option.js" import { pipeArguments } from "../../Pipeable.js" import { hasProperty } from "../../Predicate.js" import type * as Scheduler from "../../Scheduler.js" import type * as STM from "../../STM.js" import { internalCall, YieldWrap } from "../../Utils.js" import { ChannelTypeId } from "../core-stream.js" import { withFiberRuntime } from "../core.js" import { effectVariance, StreamTypeId } from "../effectable.js" import { OP_COMMIT } from "../opCodes/effect.js" import { SingleShotGen } from "../singleShotGen.js" import { SinkTypeId } from "../sink.js" import * as Journal from "./journal.js" import * as OpCodes from "./opCodes/stm.js" import * as TExitOpCodes from "./opCodes/tExit.js" import * as TryCommitOpCodes from "./opCodes/tryCommit.js" import * as STMState from "./stmState.js" import * as TExit from "./tExit.js" import * as TryCommit from "./tryCommit.js" import * as TxnId from "./txnId.js" /** @internal */ const STMSymbolKey = "effect/STM" /** @internal */ export const STMTypeId: STM.STMTypeId = Symbol.for( STMSymbolKey ) as STM.STMTypeId /** @internal */ export type Primitive = | STMEffect | STMOnFailure | STMOnRetry | STMOnSuccess | STMProvide | STMSync | STMSucceed | STMRetry | STMFail | STMDie | STMInterrupt /** @internal */ type Op = STM.STM & Body & { readonly _op: OP_COMMIT readonly effect_instruction_i0: Tag } /** @internal */ interface STMEffect extends Op ) => STM.STM }> {} /** @internal */ interface STMOnFailure extends Op readonly effect_instruction_i2: (error: unknown) => STM.STM }> {} /** @internal */ interface STMOnRetry extends Op readonly effect_instruction_i2: () => STM.STM }> {} /** @internal */ interface STMOnSuccess extends Op readonly effect_instruction_i2: (a: unknown) => STM.STM }> {} /** @internal */ interface STMProvide extends Op readonly effect_instruction_i2: (context: Context.Context) => Context.Context }> {} /** @internal */ interface STMSync extends Op unknown }> {} /** @internal */ interface STMSucceed extends Op {} /** @internal */ interface STMRetry extends Op {} /** @internal */ interface STMFail extends Op }> {} /** @internal */ interface STMDie extends Op }> {} /** @internal */ interface STMInterrupt extends Op {} const stmVariance = { /* c8 ignore next */ _R: (_: never) => _, /* c8 ignore next */ _E: (_: never) => _, /* c8 ignore next */ _A: (_: never) => _ } /** @internal */ class STMPrimitive implements STM.STM { public _op = OP_COMMIT public effect_instruction_i1: any = undefined public effect_instruction_i2: any = undefined; [Effect.EffectTypeId]: any; [StreamTypeId]: any; [SinkTypeId]: any; [ChannelTypeId]: any get [STMTypeId]() { return stmVariance } constructor(readonly effect_instruction_i0: Primitive["effect_instruction_i0"]) { this[Effect.EffectTypeId] = effectVariance this[StreamTypeId] = stmVariance this[SinkTypeId] = stmVariance this[ChannelTypeId] = stmVariance } [Equal.symbol](this: {}, that: unknown) { return this === that } [Hash.symbol](this: {}) { return Hash.cached(this, Hash.random(this)) } [Symbol.iterator]() { return new SingleShotGen(new YieldWrap(this)) as any } commit(this: STM.STM): Effect.Effect { return unsafeAtomically(this, constVoid, constVoid) } pipe() { return pipeArguments(this, arguments) } } /** @internal */ export const isSTM = (u: unknown): u is STM.STM => hasProperty(u, STMTypeId) /** @internal */ export const commit = (self: STM.STM): Effect.Effect => unsafeAtomically(self, constVoid, constVoid) /** @internal */ export const unsafeAtomically = ( self: STM.STM, onDone: (exit: Exit.Exit) => unknown, onInterrupt: LazyArg ): Effect.Effect => withFiberRuntime((state) => { const fiberId = state.id() const env = state.getFiberRef(FiberRef.currentContext) as Context.Context const scheduler = state.getFiberRef(FiberRef.currentScheduler) const priority = state.getFiberRef(FiberRef.currentSchedulingPriority) const commitResult = tryCommitSync(fiberId, self, env, scheduler, priority) switch (commitResult._tag) { case TryCommitOpCodes.OP_DONE: { onDone(commitResult.exit) return commitResult.exit } case TryCommitOpCodes.OP_SUSPEND: { const txnId = TxnId.make() const state: { value: STMState.STMState } = { value: STMState.running } const effect = Effect.async( (k: (effect: Effect.Effect) => unknown): void => tryCommitAsync(fiberId, self, txnId, state, env, scheduler, priority, k) ) return Effect.uninterruptibleMask((restore) => pipe( restore(effect), Effect.catchAllCause((cause) => { let currentState = state.value if (STMState.isRunning(currentState)) { state.value = STMState.interrupted } currentState = state.value if (STMState.isDone(currentState)) { onDone(currentState.exit) return currentState.exit } onInterrupt() return Effect.failCause(cause) }) ) ) } } }) /** @internal */ const tryCommit = ( fiberId: FiberId.FiberId, stm: STM.STM, state: { value: STMState.STMState }, env: Context.Context, scheduler: Scheduler.Scheduler, priority: number ): TryCommit.TryCommit => { const journal: Journal.Journal = new Map() const tExit = new STMDriver(stm, journal, fiberId, env).run() const analysis = Journal.analyzeJournal(journal) if (analysis === Journal.JournalAnalysisReadWrite) { Journal.commitJournal(journal) } else if (analysis === Journal.JournalAnalysisInvalid) { throw new Error( "BUG: STM.TryCommit.tryCommit - please report an issue at https://github.com/Effect-TS/effect/issues" ) } switch (tExit._tag) { case TExitOpCodes.OP_SUCCEED: { state.value = STMState.fromTExit(tExit) return completeTodos(Exit.succeed(tExit.value), journal, scheduler, priority) } case TExitOpCodes.OP_FAIL: { state.value = STMState.fromTExit(tExit) const cause = Cause.fail(tExit.error) return completeTodos( Exit.failCause(cause), journal, scheduler, priority ) } case TExitOpCodes.OP_DIE: { state.value = STMState.fromTExit(tExit) const cause = Cause.die(tExit.defect) return completeTodos( Exit.failCause(cause), journal, scheduler, priority ) } case TExitOpCodes.OP_INTERRUPT: { state.value = STMState.fromTExit(tExit) const cause = Cause.interrupt(fiberId) return completeTodos( Exit.failCause(cause), journal, scheduler, priority ) } case TExitOpCodes.OP_RETRY: { return TryCommit.suspend(journal) } } } /** @internal */ const tryCommitSync = ( fiberId: FiberId.FiberId, stm: STM.STM, env: Context.Context, scheduler: Scheduler.Scheduler, priority: number ): TryCommit.TryCommit => { const journal: Journal.Journal = new Map() const tExit = new STMDriver(stm, journal, fiberId, env).run() const analysis = Journal.analyzeJournal(journal) if (analysis === Journal.JournalAnalysisReadWrite && TExit.isSuccess(tExit)) { Journal.commitJournal(journal) } else if (analysis === Journal.JournalAnalysisInvalid) { throw new Error( "BUG: STM.TryCommit.tryCommitSync - please report an issue at https://github.com/Effect-TS/effect/issues" ) } switch (tExit._tag) { case TExitOpCodes.OP_SUCCEED: { return completeTodos(Exit.succeed(tExit.value), journal, scheduler, priority) } case TExitOpCodes.OP_FAIL: { const cause = Cause.fail(tExit.error) return completeTodos( Exit.failCause(cause), journal, scheduler, priority ) } case TExitOpCodes.OP_DIE: { const cause = Cause.die(tExit.defect) return completeTodos( Exit.failCause(cause), journal, scheduler, priority ) } case TExitOpCodes.OP_INTERRUPT: { const cause = Cause.interrupt(fiberId) return completeTodos( Exit.failCause(cause), journal, scheduler, priority ) } case TExitOpCodes.OP_RETRY: { return TryCommit.suspend(journal) } } } /** @internal */ const tryCommitAsync = ( fiberId: FiberId.FiberId, self: STM.STM, txnId: TxnId.TxnId, state: { value: STMState.STMState }, context: Context.Context, scheduler: Scheduler.Scheduler, priority: number, k: (effect: Effect.Effect) => unknown ) => { if (STMState.isRunning(state.value)) { const result = tryCommit(fiberId, self, state, context, scheduler, priority) switch (result._tag) { case TryCommitOpCodes.OP_DONE: { completeTryCommit(result.exit, k) break } case TryCommitOpCodes.OP_SUSPEND: { Journal.addTodo( txnId, result.journal, () => tryCommitAsync(fiberId, self, txnId, state, context, scheduler, priority, k) ) break } } } } /** @internal */ const completeTodos = ( exit: Exit.Exit, journal: Journal.Journal, scheduler: Scheduler.Scheduler, priority: number ): TryCommit.TryCommit => { const todos = Journal.collectTodos(journal) if (todos.size > 0) { scheduler.scheduleTask(() => Journal.execTodos(todos), priority) } return TryCommit.done(exit) } /** @internal */ const completeTryCommit = ( exit: Exit.Exit, k: (effect: Effect.Effect) => unknown ): void => { k(exit) } /** @internal */ type Continuation = STMOnFailure | STMOnSuccess | STMOnRetry /** @internal */ export const context = (): STM.STM, never, R> => effect>((_, __, env) => env) /** @internal */ export const contextWith = (f: (environment: Context.Context) => R): STM.STM => map(context(), f) /** @internal */ export const contextWithSTM = ( f: (environment: Context.Context) => STM.STM ): STM.STM => flatMap(context(), f) /** @internal */ export class STMDriver { private contStack: Array = [] private env: Context.Context constructor( readonly self: STM.STM, readonly journal: Journal.Journal, readonly fiberId: FiberId.FiberId, r0: Context.Context ) { this.env = r0 as Context.Context } getEnv(): Context.Context { return this.env } pushStack(cont: Continuation) { this.contStack.push(cont) } popStack() { return this.contStack.pop() } nextSuccess() { let current = this.popStack() while (current !== undefined && current.effect_instruction_i0 !== OpCodes.OP_ON_SUCCESS) { current = this.popStack() } return current } nextFailure() { let current = this.popStack() while (current !== undefined && current.effect_instruction_i0 !== OpCodes.OP_ON_FAILURE) { current = this.popStack() } return current } nextRetry() { let current = this.popStack() while (current !== undefined && current.effect_instruction_i0 !== OpCodes.OP_ON_RETRY) { current = this.popStack() } return current } run(): TExit.TExit { let curr = this.self as Primitive | Context.Tag | Either.Either | Option.Option | undefined let exit: TExit.TExit | undefined = undefined while (exit === undefined && curr !== undefined) { try { const current = curr if (current) { switch (current._op) { case "Tag": { curr = effect((_, __, env) => Context.unsafeGet(env, current)) as Primitive break } case "Left": { curr = fail(current.left) as Primitive break } case "None": { curr = fail(new Cause.NoSuchElementException()) as Primitive break } case "Right": { curr = succeed(current.right) as Primitive break } case "Some": { curr = succeed(current.value) as Primitive break } case "Commit": { switch (current.effect_instruction_i0) { case OpCodes.OP_DIE: { exit = TExit.die(internalCall(() => current.effect_instruction_i1())) break } case OpCodes.OP_FAIL: { const cont = this.nextFailure() if (cont === undefined) { exit = TExit.fail(internalCall(() => current.effect_instruction_i1())) } else { curr = internalCall(() => cont.effect_instruction_i2( internalCall(() => current.effect_instruction_i1()) ) as Primitive ) } break } case OpCodes.OP_RETRY: { const cont = this.nextRetry() if (cont === undefined) { exit = TExit.retry } else { curr = internalCall(() => cont.effect_instruction_i2() as Primitive) } break } case OpCodes.OP_INTERRUPT: { exit = TExit.interrupt(this.fiberId) break } case OpCodes.OP_WITH_STM_RUNTIME: { curr = internalCall(() => current.effect_instruction_i1(this as STMDriver) as Primitive ) break } case OpCodes.OP_ON_SUCCESS: case OpCodes.OP_ON_FAILURE: case OpCodes.OP_ON_RETRY: { this.pushStack(current) curr = current.effect_instruction_i1 as Primitive break } case OpCodes.OP_PROVIDE: { const env = this.env this.env = internalCall(() => current.effect_instruction_i2(env)) curr = pipe( current.effect_instruction_i1, ensuring(sync(() => (this.env = env))) ) as Primitive break } case OpCodes.OP_SUCCEED: { const value = current.effect_instruction_i1 const cont = this.nextSuccess() if (cont === undefined) { exit = TExit.succeed(value) } else { curr = internalCall(() => cont.effect_instruction_i2(value) as Primitive) } break } case OpCodes.OP_SYNC: { const value = internalCall(() => current.effect_instruction_i1()) const cont = this.nextSuccess() if (cont === undefined) { exit = TExit.succeed(value) } else { curr = internalCall(() => cont.effect_instruction_i2(value) as Primitive) } break } } break } } } } catch (e) { curr = die(e) as Primitive } } return exit as TExit.TExit } } /** @internal */ export const catchAll = dual< ( f: (e: E) => STM.STM ) => ( self: STM.STM ) => STM.STM, ( self: STM.STM, f: (e: E) => STM.STM ) => STM.STM >(2, (self, f) => { const stm = new STMPrimitive(OpCodes.OP_ON_FAILURE) stm.effect_instruction_i1 = self stm.effect_instruction_i2 = f return stm }) /** @internal */ export const mapInputContext = dual< ( f: (context: Context.Context) => Context.Context ) => ( self: STM.STM ) => STM.STM, ( self: STM.STM, f: (context: Context.Context) => Context.Context ) => STM.STM >(2, (self, f) => { const stm = new STMPrimitive(OpCodes.OP_PROVIDE) stm.effect_instruction_i1 = self stm.effect_instruction_i2 = f return stm }) /** @internal */ export const die = (defect: unknown): STM.STM => dieSync(() => defect) /** @internal */ export const dieMessage = (message: string): STM.STM => dieSync(() => new Cause.RuntimeException(message)) /** @internal */ export const dieSync = (evaluate: LazyArg): STM.STM => { const stm = new STMPrimitive(OpCodes.OP_DIE) stm.effect_instruction_i1 = evaluate return stm as any } /** @internal */ export const effect = ( f: (journal: Journal.Journal, fiberId: FiberId.FiberId, environment: Context.Context) => A ): STM.STM => withSTMRuntime((_) => succeed(f(_.journal, _.fiberId, _.getEnv()))) /** @internal */ export const ensuring = dual< (finalizer: STM.STM) => (self: STM.STM) => STM.STM, (self: STM.STM, finalizer: STM.STM) => STM.STM >(2, (self, finalizer) => matchSTM(self, { onFailure: (e) => zipRight(finalizer, fail(e)), onSuccess: (a) => zipRight(finalizer, succeed(a)) })) /** @internal */ export const fail = (error: E): STM.STM => failSync(() => error) /** @internal */ export const failSync = (evaluate: LazyArg): STM.STM => { const stm = new STMPrimitive(OpCodes.OP_FAIL) stm.effect_instruction_i1 = evaluate return stm as any } /** @internal */ export const flatMap = dual< (f: (a: A) => STM.STM) => (self: STM.STM) => STM.STM, (self: STM.STM, f: (a: A) => STM.STM) => STM.STM >(2, (self, f) => { const stm = new STMPrimitive(OpCodes.OP_ON_SUCCESS) stm.effect_instruction_i1 = self stm.effect_instruction_i2 = f return stm }) /** @internal */ export const matchSTM = dual< ( options: { readonly onFailure: (e: E) => STM.STM readonly onSuccess: (a: A) => STM.STM } ) => (self: STM.STM) => STM.STM, ( self: STM.STM, options: { readonly onFailure: (e: E) => STM.STM readonly onSuccess: (a: A) => STM.STM } ) => STM.STM >(2, ( self: STM.STM, { onFailure, onSuccess }: { readonly onFailure: (e: E) => STM.STM readonly onSuccess: (a: A) => STM.STM } ): STM.STM => pipe( self, map(Either.right), catchAll((e) => pipe(onFailure(e), map(Either.left))), flatMap((either): STM.STM => { switch (either._tag) { case "Left": { return succeed(either.left) } case "Right": { return onSuccess(either.right) } } }) )) /** @internal */ export const withSTMRuntime = ( f: (runtime: STMDriver) => STM.STM ): STM.STM => { const stm = new STMPrimitive(OpCodes.OP_WITH_STM_RUNTIME) stm.effect_instruction_i1 = f return stm } /** @internal */ export const interrupt: STM.STM = withSTMRuntime((_) => { const stm = new STMPrimitive(OpCodes.OP_INTERRUPT) stm.effect_instruction_i1 = _.fiberId return stm as any }) /** @internal */ export const interruptAs = (fiberId: FiberId.FiberId): STM.STM => { const stm = new STMPrimitive(OpCodes.OP_INTERRUPT) stm.effect_instruction_i1 = fiberId return stm as any } /** @internal */ export const map = dual< (f: (a: A) => B) => (self: STM.STM) => STM.STM, (self: STM.STM, f: (a: A) => B) => STM.STM >(2, (self, f) => pipe(self, flatMap((a) => sync(() => f(a))))) /** @internal */ export const orTry = dual< ( that: LazyArg> ) => ( self: STM.STM ) => STM.STM, ( self: STM.STM, that: LazyArg> ) => STM.STM >(2, (self, that) => { const stm = new STMPrimitive(OpCodes.OP_ON_RETRY) stm.effect_instruction_i1 = self stm.effect_instruction_i2 = that return stm }) /** @internal */ export const retry: STM.STM = new STMPrimitive(OpCodes.OP_RETRY) /** @internal */ export const succeed = (value: A): STM.STM => { const stm = new STMPrimitive(OpCodes.OP_SUCCEED) stm.effect_instruction_i1 = value return stm as any } /** @internal */ export const sync = (evaluate: () => A): STM.STM => { const stm = new STMPrimitive(OpCodes.OP_SYNC) stm.effect_instruction_i1 = evaluate return stm as any } /** @internal */ export const zip = dual< ( that: STM.STM ) => ( self: STM.STM ) => STM.STM<[A, A1], E1 | E, R1 | R>, ( self: STM.STM, that: STM.STM ) => STM.STM<[A, A1], E1 | E, R1 | R> >(2, (self, that) => pipe(self, zipWith(that, (a, a1) => [a, a1]))) /** @internal */ export const zipLeft = dual< (that: STM.STM) => (self: STM.STM) => STM.STM, (self: STM.STM, that: STM.STM) => STM.STM >(2, (self, that) => pipe(self, flatMap((a) => pipe(that, map(() => a))))) /** @internal */ export const zipRight = dual< (that: STM.STM) => (self: STM.STM) => STM.STM, (self: STM.STM, that: STM.STM) => STM.STM >(2, (self, that) => pipe(self, flatMap(() => that))) /** @internal */ export const zipWith = dual< ( that: STM.STM, f: (a: A, b: A1) => A2 ) => ( self: STM.STM ) => STM.STM, ( self: STM.STM, that: STM.STM, f: (a: A, b: A1) => A2 ) => STM.STM >( 3, (self, that, f) => pipe(self, flatMap((a) => pipe(that, map((b) => f(a, b))))) )