/**
* @since 1.0.0
*/
import * as Arr from "effect/Array"
import * as Cause from "effect/Cause"
import * as Context from "effect/Context"
import * as Deferred from "effect/Deferred"
import * as Effect from "effect/Effect"
import * as Fiber from "effect/Fiber"
import * as FiberMap from "effect/FiberMap"
import * as FiberRef from "effect/FiberRef"
import * as FiberRefs from "effect/FiberRefs"
import * as FiberSet from "effect/FiberSet"
import { dual, identity, pipe } from "effect/Function"
import { globalValue } from "effect/GlobalValue"
import * as Option from "effect/Option"
import type * as ParseResult from "effect/ParseResult"
import type { Pipeable } from "effect/Pipeable"
import { pipeArguments } from "effect/Pipeable"
import * as PubSub from "effect/PubSub"
import * as Queue from "effect/Queue"
import * as Readable from "effect/Readable"
import type { Request } from "effect/Request"
import type * as Schedule from "effect/Schedule"
import * as Schema from "effect/Schema"
import type * as Scope from "effect/Scope"
import * as Stream from "effect/Stream"
import * as Subscribable from "effect/Subscribable"
import * as Tracer from "effect/Tracer"
import * as Procedure from "./Machine/Procedure.js"
import type { ProcedureList } from "./Machine/ProcedureList.js"
import type { SerializableProcedureList } from "./Machine/SerializableProcedureList.js"
/**
* @since 1.0.0
* @category procedures
*/
export * as procedures from "./Machine/ProcedureList.js"
/**
* @since 1.0.0
* @category procedures
*/
export * as serializable from "./Machine/SerializableProcedureList.js"
export {
/**
* @since 1.0.0
* @category symbols
*/
NoReply
} from "./Machine/Procedure.js"
/**
* @since 1.0.0
* @category type ids
*/
export const TypeId: unique symbol = Symbol.for("@effect/experimental/Machine")
/**
* @since 1.0.0
* @category type ids
*/
export type TypeId = typeof TypeId
/**
* @since 1.0.0
* @category models
*/
export interface Machine<
State,
Public extends Procedure.TaggedRequest.Any,
Private extends Procedure.TaggedRequest.Any,
Input,
InitErr,
R
> extends Pipeable {
readonly [TypeId]: TypeId
readonly initialize: Machine.Initialize
readonly retryPolicy: Schedule.Schedule | undefined
}
/**
* @since 1.0.0
* @category type ids
*/
export const SerializableTypeId: unique symbol = Symbol.for("@effect/experimental/Machine/Serializable")
/**
* @since 1.0.0
* @category type ids
*/
export type SerializableTypeId = typeof SerializableTypeId
/**
* @since 1.0.0
* @category models
*/
export interface SerializableMachine<
State,
Public extends Schema.TaggedRequest.All,
Private extends Schema.TaggedRequest.All,
Input,
InitErr,
R,
SR
> extends
Machine<
State,
Public,
Private,
Input,
InitErr,
R
>
{
readonly [SerializableTypeId]: SerializableTypeId
readonly schemaInput: Schema.Schema
readonly schemaState: Schema.Schema
}
/**
* @since 1.0.0
* @category type ids
*/
export const ActorTypeId: unique symbol = Symbol.for("@effect/experimental/Machine/Actor")
/**
* @since 1.0.0
* @category type ids
*/
export type ActorTypeId = typeof ActorTypeId
/**
* @since 1.0.0
* @category errors
*/
export class MachineDefect extends Schema.TaggedError()("MachineDefect", {
cause: Schema.Defect
}) {
/**
* @since 1.0.0
*/
static wrap(effect: Effect.Effect): Effect.Effect {
return Effect.catchAllCause(
Effect.orDie(effect),
(cause) => Effect.fail(new MachineDefect({ cause: Cause.squash(cause) }))
)
}
}
/**
* @since 1.0.0
* @category tags
*/
export class MachineContext extends Context.Tag("@effect/experimental/Machine/Context")<
MachineContext,
Procedure.Procedure.BaseContext
>() {}
/**
* @since 1.0.0
* @category models
*/
export declare namespace Machine {
/**
* @since 1.0.0
* @category models
*/
export type Any =
| Machine
| Machine
| Machine
| Machine
| Machine
/**
* @since 1.0.0
* @category models
*/
export type Initialize<
Input,
State,
Public extends Procedure.TaggedRequest.Any,
Private extends Procedure.TaggedRequest.Any,
R,
E,
InitR
> = (
input: Input,
previousState?: State | undefined
) => Effect.Effect, E, InitR>
/**
* @since 1.0.0
* @category models
*/
export type InitializeSerializable<
Input,
State,
Public extends Schema.TaggedRequest.All,
Private extends Schema.TaggedRequest.All,
R,
E,
InitR
> = (
input: Input,
previousState?: State | undefined
) => Effect.Effect, E, InitR>
/**
* @since 1.0.0
*/
export type Public = M extends Machine ? Public
: never
/**
* @since 1.0.0
*/
export type Private = M extends Machine ?
Private
: never
/**
* @since 1.0.0
*/
export type State = M extends Machine ? State
: never
/**
* @since 1.0.0
*/
export type InitError = M extends Machine ?
InitErr
: never
/**
* @since 1.0.0
*/
export type Context = M extends Machine ? R
: never
/**
* @since 1.0.0
*/
export type Input = M extends Machine ? Input
: never
/**
* @since 1.0.0
*/
export type AddContext = M extends SerializableMachine<
infer State,
infer Public,
infer Private,
infer Input,
infer InitErr,
infer R2,
infer SR
> ? SerializableMachine<
State,
Public,
Private,
Input,
InitErr | E,
R | R2,
SR
> :
M extends Machine ? Machine<
State,
Public,
Private,
Input,
InitErr | E,
R | R2
> :
never
}
/**
* @since 1.0.0
* @category models
*/
export interface Actor extends Subscribable.Subscribable> {
readonly [ActorTypeId]: ActorTypeId
readonly machine: M
readonly input: Machine.Input
readonly send: >(request: Req) => Effect.Effect<
Request.Success,
Request.Error
>
readonly join: Effect.Effect | MachineDefect>
}
const ActorProto = {
[ActorTypeId]: ActorTypeId,
[Readable.TypeId]: Readable.TypeId,
[Subscribable.TypeId]: Subscribable.TypeId,
pipe() {
return pipeArguments(this, arguments)
}
}
/**
* @since 1.0.0
* @category models
*/
export interface SerializableActor extends Actor {
readonly sendUnknown: (request: unknown) => Effect.Effect<
Schema.ExitEncoded,
ParseResult.ParseError
>
}
/**
* @since 1.0.0
* @category constructors
*/
export const make: {
/**
* @since 1.0.0
* @category constructors
*/
(
initialize: Effect.Effect, InitErr, R>
): Machine>
/**
* @since 1.0.0
* @category constructors
*/
(
initialize: Machine.Initialize
): Machine>
} = (
initialize:
| Machine.Initialize
| Effect.Effect, InitErr, R>
): Machine> => ({
[TypeId]: TypeId,
initialize: Effect.isEffect(initialize) ? (() => initialize) : initialize as any,
retryPolicy: undefined,
pipe() {
return pipeArguments(this, arguments)
}
})
/**
* @since 1.0.0
* @category constructors
*/
export const makeWith = (): {
(
initialize: Effect.Effect, InitErr, R>
): Machine>
(
initialize: Machine.Initialize
): Machine>
} => make
/**
* @since 1.0.0
* @category constructors
*/
export const makeSerializable: {
/**
* @since 1.0.0
* @category constructors
*/
<
State,
IS,
RS,
Public extends Schema.TaggedRequest.All,
Private extends Schema.TaggedRequest.All,
InitErr,
R
>(
options: {
readonly state: Schema.Schema
readonly input?: undefined
},
initialize:
| Effect.Effect, InitErr, R>
| Machine.InitializeSerializable
): SerializableMachine, RS>
/**
* @since 1.0.0
* @category constructors
*/
<
State,
IS,
RS,
Input,
II,
RI,
Public extends Schema.TaggedRequest.All,
Private extends Schema.TaggedRequest.All,
InitErr,
R
>(
options: {
readonly state: Schema.Schema
readonly input: Schema.Schema
},
initialize: Machine.InitializeSerializable
): SerializableMachine, RS | RI>
} = <
State,
IS,
RS,
Input,
II,
RI,
Public extends Schema.TaggedRequest.All,
Private extends Schema.TaggedRequest.All,
InitErr,
R
>(
options: {
readonly state: Schema.Schema
readonly input?: Schema.Schema | undefined
},
initialize:
| Machine.InitializeSerializable
| Effect.Effect, InitErr, R>
): SerializableMachine, RS | RI> => (({
[TypeId]: TypeId,
[SerializableTypeId]: SerializableTypeId,
initialize: Effect.isEffect(initialize) ? (() => initialize) : initialize as any,
identifier: "SerializableMachine",
retryPolicy: undefined,
schemaInput: options.input as any,
schemaState: options.state as any,
pipe() {
return pipeArguments(this, arguments)
}
}) as any)
/**
* @since 1.0.0
* @category combinators
*/
export const retry: {
/**
* @since 1.0.0
* @category combinators
*/
| MachineDefect, R>(policy: Schedule.Schedule): (self: M) => Machine.AddContext
/**
* @since 1.0.0
* @category combinators
*/
| MachineDefect, R>(self: M, policy: Schedule.Schedule): Machine.AddContext
} = dual(2, | MachineDefect, R>(
self: M,
retryPolicy: Schedule.Schedule
): Machine.AddContext => (({
...self,
retryPolicy
}) as any))
/**
* @since 1.0.0
* @category tracing
*/
export const currentTracingEnabled: FiberRef.FiberRef = globalValue(
"@effect/experimental/Machine/currentTracingEnabled",
() => FiberRef.unsafeMake(true)
)
/**
* @since 1.0.0
* @category tracing
*/
export const withTracingEnabled: {
/**
* @since 1.0.0
* @category tracing
*/
(enabled: boolean): (effect: Effect.Effect) => Effect.Effect
/**
* @since 1.0.0
* @category tracing
*/
(effect: Effect.Effect, enabled: boolean): Effect.Effect
} = dual(
2,
(self: Effect.Effect, enabled: boolean) => Effect.locally(self, currentTracingEnabled, enabled)
)
/**
* @since 1.0.0
* @category runtime
*/
export const boot = <
M extends Machine.Any
>(
self: M,
...[input, options]: [Machine.Input] extends [void] ? [
input?: Machine.Input,
options?: { readonly previousState?: Machine.State }
] :
[
input: Machine.Input,
options?: { readonly previousState?: Machine.State }
]
): Effect.Effect<
M extends { readonly [SerializableTypeId]: SerializableTypeId } ? SerializableActor : Actor,
never,
Machine.Context | Scope.Scope
> =>
Effect.gen(function*() {
const context = yield* Effect.context>()
const requests = yield* Queue.unbounded<
readonly [
Procedure.TaggedRequest.Any,
Deferred.Deferred,
Tracer.AnySpan | undefined,
addSpans: boolean
]
>()
const pubsub = yield* Effect.acquireRelease(
PubSub.unbounded>(),
PubSub.shutdown
)
const latch = yield* Deferred.make()
let currentState: Machine.State = undefined as any
let runState: {
readonly identifier: string
readonly publicTags: Set
readonly decodeRequest: (u: unknown) => Effect.Effect, ParseResult.ParseError>
} = {
identifier: "Unknown",
publicTags: new Set(),
decodeRequest: undefined as any
}
const requestContext = >(request: R) =>
Effect.sync(() => {
const fiber = Option.getOrThrow(Fiber.getCurrentFiber())
const fiberRefs = fiber.getFiberRefs()
const context = FiberRefs.getOrDefault(fiberRefs, FiberRef.currentContext)
const deferred = Deferred.unsafeMake, Request.Error>(fiber.id())
const span: Tracer.AnySpan | undefined = context.unsafeMap.get(Tracer.ParentSpan.key)
const addSpans = FiberRefs.getOrDefault(fiberRefs, currentTracingEnabled)
return [request, deferred, span, addSpans] as const
})
const send = >(request: R) =>
Effect.flatMap(
requestContext(request),
(item) => {
if (!item[3]) {
return Queue.offer(requests, item).pipe(
Effect.zipRight(Deferred.await(item[1])),
Effect.onInterrupt(() => Deferred.interrupt(item[1]))
)
}
const [, deferred, span] = item
return Effect.useSpan(`Machine.send ${request._tag}`, {
parent: span,
attributes: {
"effect.machine": runState.identifier,
...request
},
kind: "client",
captureStackTrace: false
}, (span) =>
Queue.offer(requests, [request, deferred, span, true]).pipe(
Effect.zipRight(Deferred.await(deferred)),
Effect.onInterrupt(() => Deferred.interrupt(deferred))
))
}
)
const sendIgnore = >(request: R) =>
Effect.flatMap(
requestContext(request),
(item) => {
if (!item[3]) {
return Queue.offer(requests, item)
}
const [, deferred, span] = item
return Effect.useSpan(`Machine.sendIgnore ${request._tag}`, {
parent: span,
attributes: {
"effect.machine": runState.identifier,
...request
},
kind: "client",
captureStackTrace: false
}, (span) => Queue.offer(requests, [request, deferred, span, true]))
}
)
const sendExternal = >(request: R) =>
Effect.suspend(() =>
runState.publicTags.has(request._tag)
? send(request)
: Effect.die(`Request ${request._tag} marked as internal`)
)
const sendUnknown = (u: unknown) =>
Effect.suspend(() =>
runState.decodeRequest(u).pipe(
Effect.flatMap((req) =>
Effect.flatMap(
Effect.exit(send(req)),
(exit) => Schema.serializeExit(req, exit)
)
),
Effect.provide(context)
)
) as Effect.Effect, ParseResult.ParseError>
const publishState = (newState: Machine.State) => {
if (currentState !== newState) {
currentState = newState
return PubSub.publish(pubsub, newState)
}
return Effect.void
}
const run = Effect.gen(function*() {
const fiberSet = yield* FiberSet.make()
const fiberMap = yield* FiberMap.make()
const fork = (effect: Effect.Effect) =>
Effect.asVoid(FiberSet.run(fiberSet, MachineDefect.wrap(effect)))
const forkWith: {
(state: Machine.State): (
effect: Effect.Effect
) => Effect.Effect], never, R>
(
effect: Effect.Effect,
state: Machine.State
): Effect.Effect], never, R>
} = dual(2, (
effect: Effect.Effect,
state: Machine.State
): Effect.Effect], never, R> =>
Effect.map(fork(effect), (_) => [_, state] as const))
const forkReplace: {
(id: string): (effect: Effect.Effect) => Effect.Effect
(effect: Effect.Effect, id: string): Effect.Effect
} = dual(2, (effect: Effect.Effect, id: string): Effect.Effect =>
Effect.asVoid(
FiberMap.run(fiberMap, id, MachineDefect.wrap(effect))
))
const forkReplaceWith: {
(
id: string,
state: Machine.State
): (effect: Effect.Effect) => Effect.Effect], never, R>
(
effect: Effect.Effect,
id: string,
state: Machine.State
): Effect.Effect], never, R>
} = dual(3, (
effect: Effect.Effect,
id: string,
state: Machine.State
): Effect.Effect], never, R> =>
Effect.map(forkReplace(effect, id), (_) => [_, state] as const))
const forkOne: {
(id: string): (effect: Effect.Effect) => Effect.Effect
(effect: Effect.Effect, id: string): Effect.Effect
} = dual(2, (effect: Effect.Effect, id: string): Effect.Effect =>
Effect.asVoid(FiberMap.run(fiberMap, id, MachineDefect.wrap(effect), { onlyIfMissing: true })))
const forkOneWith: {
(
id: string,
state: Machine.State
): (effect: Effect.Effect) => Effect.Effect], never, R>
(
effect: Effect.Effect,
id: string,
state: Machine.State
): Effect.Effect], never, R>
} = dual(3, (
effect: Effect.Effect,
id: string,
state: Machine.State
): Effect.Effect], never, R> =>
Effect.map(
forkOne(effect, id),
(_) =>
[_, state] as const
))
const contextProto: Procedure.Procedure.ContextProto, Machine.State> = {
sendAwait: send,
send: sendIgnore,
unsafeSend: sendIgnore as any,
unsafeSendAwait: send as any,
fork,
forkWith,
forkOne,
forkOneWith,
forkReplace,
forkReplaceWith
}
const procedures = yield* pipe(
self.initialize(input, currentState ?? options?.previousState) as Effect.Effect<
SerializableProcedureList, Machine.Public, Machine.Private, never>,
Machine.InitError
>,
Effect.provideService(MachineContext, contextProto)
)
const procedureMap: Record<
string,
Procedure.Procedure, Machine.Context>
> = Object.fromEntries(
procedures.private.map((p) => [p.tag, p]).concat(
procedures.public.map((p) => [p.tag, p])
)
)
runState = {
identifier: procedures.identifier,
publicTags: new Set(procedures.public.map((p) =>
p.tag
)),
decodeRequest: Schema.decodeUnknown(
Schema.Union(
...Arr.filter(
procedures.public,
Procedure.isSerializable
).map((p) => p.schema)
)
)
}
yield* publishState(procedures.initialState)
yield* Deferred.succeed(latch, void 0)
const process = pipe(
Queue.take(requests),
Effect.flatMap(([request, deferred, span, addSpan]) =>
Effect.flatMap(Deferred.isDone(deferred), (done) => {
if (done) {
return Effect.void
}
const procedure = procedureMap[request._tag]
if (procedure === undefined) {
return Deferred.die(deferred, `Unknown request ${request._tag}`)
}
const context = Object.create(contextProto)
context.state = currentState
context.request = request
context.deferred = deferred
let handler = Effect.matchCauseEffect(
procedure.handler(context),
{
onFailure: (e) => {
if (Cause.isFailure(e)) {
return Deferred.failCause(deferred, e)
}
// defects kill the actor
return Effect.zipRight(
Deferred.failCause(deferred, e),
Effect.failCause(e)
)
},
onSuccess: ([response, newState]) => {
if (response === Procedure.NoReply) {
return publishState(newState)
}
return Effect.zipRight(
publishState(newState),
Deferred.succeed(deferred, response)
)
}
}
)
if (addSpan) {
handler = Effect.withSpan(handler, `Machine.process ${request._tag}`, {
kind: "server",
parent: span,
attributes: {
"effect.machine": runState.identifier
},
captureStackTrace: false
})
} else if (span !== undefined) {
handler = Effect.provideService(handler, Tracer.ParentSpan, span)
}
return handler
})
),
Effect.forever,
Effect.provideService(MachineContext, contextProto)
)
yield* pipe(
Effect.all([
process,
FiberSet.join(fiberSet),
FiberMap.join(fiberMap)
], { concurrency: "unbounded", discard: true }),
Effect.onExit((exit) => {
if (exit._tag === "Success") return Effect.die("absurd")
return Effect.flatMap(
Queue.takeAll(requests),
Effect.forEach(([, deferred]) => Deferred.failCause(deferred, exit.cause))
)
}),
Effect.tapErrorCause((cause) =>
FiberRef.getWith(
FiberRef.unhandledErrorLogLevel,
Option.match({
onNone: () => Effect.void,
onSome: (level) =>
Effect.log(`Unhandled Machine (${runState.identifier}) failure`, cause).pipe(
Effect.locally(FiberRef.currentLogLevel, level)
)
})
)
),
Effect.catchAllDefect((cause) => Effect.fail(new MachineDefect({ cause })))
)
}).pipe(Effect.scoped) as Effect.Effect<
never,
MachineDefect | Machine.InitError
>
const fiber = yield* pipe(
run,
self.retryPolicy ?
Effect.retry(self.retryPolicy) :
identity,
Effect.forkScoped,
Effect.interruptible
)
yield* Deferred.await(latch)
return identity>(Object.assign(Object.create(ActorProto), {
machine: self,
input: input!,
get: Effect.sync(() => currentState),
changes: Stream.concat(
Stream.sync(() => currentState),
Stream.fromPubSub(pubsub)
),
send: sendExternal,
sendUnknown,
join: Fiber.join(fiber)
})) as any
})
/**
* @since 1.0.0
* @category runtime
*/
export const snapshot = <
State,
Public extends Schema.TaggedRequest.All,
Private extends Schema.TaggedRequest.All,
Input,
InitErr,
R,
SR
>(
self: Actor<
SerializableMachine<
State,
Public,
Private,
Input,
InitErr,
R,
SR
>
>
): Effect.Effect<[input: unknown, state: unknown], ParseResult.ParseError, SR> =>
Effect.zip(
Schema.encode(self.machine.schemaInput)(self.input),
Effect.flatMap(self.get, Schema.encode(self.machine.schemaState))
)
/**
* @since 1.0.0
* @category runtime
*/
export const restore = <
State,
Public extends Schema.TaggedRequest.All,
Private extends Schema.TaggedRequest.All,
Input,
InitErr,
R,
SR
>(
self: SerializableMachine<
State,
Public,
Private,
Input,
InitErr,
R,
SR
>,
snapshot: readonly [input: unknown, state: unknown]
): Effect.Effect<
Actor<
SerializableMachine<
State,
Public,
Private,
Input,
InitErr,
R,
SR
>
>,
ParseResult.ParseError,
R | SR
> =>
Effect.flatMap(
Schema.decodeUnknown(Schema.Tuple(self.schemaInput, self.schemaState))(snapshot),
([input, previousState]) => (boot as any)(self, input, { previousState })
)