import type * as Cause from "../../Cause.js" import type * as Deferred from "../../Deferred.js" import * as Duration from "../../Duration.js" import type * as Effect from "../../Effect.js" import * as Effectable from "../../Effectable.js" import * as Equal from "../../Equal.js" import type { Equivalence } from "../../Equivalence.js" import * as Exit from "../../Exit.js" import type * as Fiber from "../../Fiber.js" import * as FiberId from "../../FiberId.js" import type * as FiberRefsPatch from "../../FiberRefsPatch.js" import type { LazyArg } from "../../Function.js" import { dual, pipe } from "../../Function.js" import * as Hash from "../../Hash.js" import * as MutableHashMap from "../../MutableHashMap.js" import * as Option from "../../Option.js" import { pipeArguments } from "../../Pipeable.js" import * as Predicate from "../../Predicate.js" import * as Readable from "../../Readable.js" import type * as Ref from "../../Ref.js" import { currentScheduler } from "../../Scheduler.js" import type * as Scope from "../../Scope.js" import type * as Supervisor from "../../Supervisor.js" import type * as Synchronized from "../../SynchronizedRef.js" import type * as Types from "../../Types.js" import * as internalCause from "../cause.js" import * as effect from "../core-effect.js" import * as core from "../core.js" import * as internalFiber from "../fiber.js" import * as fiberRuntime from "../fiberRuntime.js" import { globalScope } from "../fiberScope.js" import * as internalRef from "../ref.js" import * as supervisor from "../supervisor.js" /** @internal */ class Semaphore { public waiters = new Set<() => void>() public taken = 0 constructor(public permits: number) {} get free() { return this.permits - this.taken } readonly take = (n: number): Effect.Effect => core.asyncInterrupt((resume) => { if (this.free < n) { const observer = () => { if (this.free < n) return this.waiters.delete(observer) resume(core.suspend(() => { if (this.free < n) return this.take(n) this.taken += n return core.succeed(n) })) } this.waiters.add(observer) return core.sync(() => { this.waiters.delete(observer) }) } resume(core.suspend(() => { if (this.free < n) return this.take(n) this.taken += n return core.succeed(n) })) }) updateTakenUnsafe(fiber: Fiber.RuntimeFiber, f: (n: number) => number): Effect.Effect { this.taken = f(this.taken) if (this.waiters.size > 0) { fiber.getFiberRef(currentScheduler).scheduleTask( () => { const iter = this.waiters.values() let item = iter.next() while (item.done === false && this.free > 0) { item.value() item = iter.next() } }, fiber.getFiberRef(core.currentSchedulingPriority), fiber ) } return core.succeed(this.free) } updateTaken(f: (n: number) => number): Effect.Effect { return core.withFiberRuntime((fiber) => this.updateTakenUnsafe(fiber, f)) } readonly resize = (permits: number) => core.asVoid( core.withFiberRuntime((fiber) => { this.permits = permits if (this.free < 0) { return core.void } return this.updateTakenUnsafe(fiber, (taken) => taken) }) ) readonly release = (n: number): Effect.Effect => this.updateTaken((taken) => taken - n) readonly releaseAll: Effect.Effect = this.updateTaken((_) => 0) readonly withPermits = (n: number) => (self: Effect.Effect) => core.uninterruptibleMask((restore) => core.flatMap( restore(this.take(n)), (permits) => fiberRuntime.ensuring(restore(self), this.release(permits)) ) ) readonly withPermitsIfAvailable = (n: number) => (self: Effect.Effect) => core.uninterruptibleMask((restore) => core.suspend(() => { if (this.free < n) { return effect.succeedNone } this.taken += n return fiberRuntime.ensuring(restore(effect.asSome(self)), this.release(n)) }) ) } /** @internal */ export const unsafeMakeSemaphore = (permits: number): Effect.Semaphore => new Semaphore(permits) /** @internal */ export const makeSemaphore = (permits: number) => core.sync(() => unsafeMakeSemaphore(permits)) class Latch extends Effectable.Class implements Effect.Latch { waiters: Array<(_: Effect.Effect) => void> = [] scheduled = false constructor(private isOpen: boolean) { super() } commit() { return this.await } private unsafeSchedule(fiber: Fiber.RuntimeFiber) { if (this.scheduled || this.waiters.length === 0) { return core.void } this.scheduled = true fiber.currentScheduler.scheduleTask(this.flushWaiters, fiber.getFiberRef(core.currentSchedulingPriority), fiber) return core.void } private flushWaiters = () => { this.scheduled = false const waiters = this.waiters this.waiters = [] for (let i = 0; i < waiters.length; i++) { waiters[i](core.exitVoid) } } open = core.withFiberRuntime((fiber) => { if (this.isOpen) { return core.void } this.isOpen = true return this.unsafeSchedule(fiber) }) unsafeOpen() { if (this.isOpen) return this.isOpen = true this.flushWaiters() } release = core.withFiberRuntime((fiber) => { if (this.isOpen) { return core.void } return this.unsafeSchedule(fiber) }) await = core.asyncInterrupt((resume) => { if (this.isOpen) { return resume(core.void) } this.waiters.push(resume) return core.sync(() => { const index = this.waiters.indexOf(resume) if (index !== -1) { this.waiters.splice(index, 1) } }) }) unsafeClose() { this.isOpen = false } close = core.sync(() => { this.isOpen = false }) whenOpen = (self: Effect.Effect): Effect.Effect => { return core.zipRight(this.await, self) } } /** @internal */ export const unsafeMakeLatch = (open?: boolean | undefined): Effect.Latch => new Latch(open ?? false) /** @internal */ export const makeLatch = (open?: boolean | undefined) => core.sync(() => unsafeMakeLatch(open)) /** @internal */ export const awaitAllChildren = (self: Effect.Effect): Effect.Effect => ensuringChildren(self, fiberRuntime.fiberAwaitAll) /** @internal */ export const cached: { ( timeToLive: Duration.DurationInput ): (self: Effect.Effect) => Effect.Effect, never, R> ( self: Effect.Effect, timeToLive: Duration.DurationInput ): Effect.Effect, never, R> } = dual( 2, ( self: Effect.Effect, timeToLive: Duration.DurationInput ): Effect.Effect, never, R> => core.map(cachedInvalidateWithTTL(self, timeToLive), (tuple) => tuple[0]) ) /** @internal */ export const cachedInvalidateWithTTL: { (timeToLive: Duration.DurationInput): ( self: Effect.Effect ) => Effect.Effect<[Effect.Effect, Effect.Effect], never, R> ( self: Effect.Effect, timeToLive: Duration.DurationInput ): Effect.Effect<[Effect.Effect, Effect.Effect], never, R> } = dual( 2, ( self: Effect.Effect, timeToLive: Duration.DurationInput ): Effect.Effect<[Effect.Effect, Effect.Effect], never, R> => { const duration = Duration.decode(timeToLive) return core.flatMap( core.context(), (env) => core.map( makeSynchronized]>>(Option.none()), (cache) => [ core.provideContext(getCachedValue(self, duration, cache), env), invalidateCache(cache) ] as [Effect.Effect, Effect.Effect] ) ) } ) /** @internal */ const computeCachedValue = ( self: Effect.Effect, timeToLive: Duration.DurationInput, start: number ): Effect.Effect]>, never, R> => { const timeToLiveMillis = Duration.toMillis(Duration.decode(timeToLive)) return pipe( core.deferredMake(), core.tap((deferred) => core.intoDeferred(self, deferred)), core.map((deferred) => Option.some([start + timeToLiveMillis, deferred])) ) } /** @internal */ const getCachedValue = ( self: Effect.Effect, timeToLive: Duration.DurationInput, cache: Synchronized.SynchronizedRef]>> ): Effect.Effect => core.uninterruptibleMask((restore) => pipe( effect.clockWith((clock) => clock.currentTimeMillis), core.flatMap((time) => updateSomeAndGetEffectSynchronized(cache, (option) => { switch (option._tag) { case "None": { return Option.some(computeCachedValue(self, timeToLive, time)) } case "Some": { const [end] = option.value return end - time <= 0 ? Option.some(computeCachedValue(self, timeToLive, time)) : Option.none() } } }) ), core.flatMap((option) => Option.isNone(option) ? core.dieMessage( "BUG: Effect.cachedInvalidate - please report an issue at https://github.com/Effect-TS/effect/issues" ) : restore(core.deferredAwait(option.value[1])) ) ) ) /** @internal */ const invalidateCache = ( cache: Synchronized.SynchronizedRef]>> ): Effect.Effect => internalRef.set(cache, Option.none()) /** @internal */ export const ensuringChild = dual< ( f: (fiber: Fiber.Fiber, any>) => Effect.Effect ) => ( self: Effect.Effect ) => Effect.Effect, ( self: Effect.Effect, f: (fiber: Fiber.Fiber, any>) => Effect.Effect ) => Effect.Effect >(2, (self, f) => ensuringChildren(self, (children) => f(fiberRuntime.fiberAll(children)))) /** @internal */ export const ensuringChildren = dual< ( children: (fibers: ReadonlyArray>) => Effect.Effect ) => (self: Effect.Effect) => Effect.Effect, ( self: Effect.Effect, children: (fibers: ReadonlyArray>) => Effect.Effect ) => Effect.Effect >(2, (self, children) => core.flatMap(supervisor.track, (supervisor) => pipe( supervised(self, supervisor), fiberRuntime.ensuring(core.flatMap(supervisor.value, children)) ))) /** @internal */ export const forkAll: { ( options?: { readonly discard?: false | undefined } ): >( effects: Iterable ) => Effect.Effect< Fiber.Fiber>, Effect.Effect.Error>, never, Effect.Effect.Context > (options: { readonly discard: true }): >( effects: Iterable ) => Effect.Effect> >( effects: Iterable, options?: { readonly discard?: false | undefined } ): Effect.Effect< Fiber.Fiber>, Effect.Effect.Error>, never, Effect.Effect.Context > >(effects: Iterable, options: { readonly discard: true }): Effect.Effect> } = dual((args) => Predicate.isIterable(args[0]), (effects: Iterable>, options: { readonly discard: true }): Effect.Effect => options?.discard ? core.forEachSequentialDiscard(effects, fiberRuntime.fork) : core.map(core.forEachSequential(effects, fiberRuntime.fork), fiberRuntime.fiberAll)) /** @internal */ export const forkIn = dual< (scope: Scope.Scope) => (self: Effect.Effect) => Effect.Effect, never, R>, (self: Effect.Effect, scope: Scope.Scope) => Effect.Effect, never, R> >( 2, (self, scope) => core.withFiberRuntime((parent, parentStatus) => { const scopeImpl = scope as fiberRuntime.ScopeImpl const fiber = fiberRuntime.unsafeFork(self, parent, parentStatus.runtimeFlags, globalScope) if (scopeImpl.state._tag === "Open") { const finalizer = () => core.fiberIdWith((fiberId) => Equal.equals(fiberId, fiber.id()) ? core.void : core.asVoid(core.interruptFiber(fiber)) ) const key = {} scopeImpl.state.finalizers.set(key, finalizer) fiber.addObserver(() => { if (scopeImpl.state._tag === "Closed") return scopeImpl.state.finalizers.delete(key) }) } else { fiber.unsafeInterruptAsFork(parent.id()) } return core.succeed(fiber) }) ) /** @internal */ export const forkScoped = ( self: Effect.Effect ): Effect.Effect, never, R | Scope.Scope> => fiberRuntime.scopeWith((scope) => forkIn(self, scope)) /** @internal */ export const fromFiber = (fiber: Fiber.Fiber): Effect.Effect => internalFiber.join(fiber) /** @internal */ export const fromFiberEffect = (fiber: Effect.Effect, E, R>): Effect.Effect => core.suspend(() => core.flatMap(fiber, internalFiber.join)) const memoKeySymbol = Symbol.for("effect/Effect/memoizeFunction.key") class Key implements Equal.Equal { [memoKeySymbol] = memoKeySymbol constructor(readonly a: A, readonly eq?: Equivalence) {} [Equal.symbol](that: Equal.Equal) { if (Predicate.hasProperty(that, memoKeySymbol)) { if (this.eq) { return this.eq(this.a, (that as unknown as Key).a) } else { return Equal.equals(this.a, (that as unknown as Key).a) } } return false } [Hash.symbol]() { return this.eq ? 0 : Hash.cached(this, Hash.hash(this.a)) } } /** @internal */ export const cachedFunction = ( f: (a: A) => Effect.Effect, eq?: Equivalence ): Effect.Effect<(a: A) => Effect.Effect> => { return pipe( core.sync(() => MutableHashMap.empty, Deferred.Deferred>()), core.flatMap(makeSynchronized), core.map((ref) => (a: A) => pipe( ref.modifyEffect((map) => { const result = pipe(map, MutableHashMap.get(new Key(a, eq))) if (Option.isNone(result)) { return pipe( core.deferredMake(), core.tap((deferred) => pipe( effect.diffFiberRefs(f(a)), core.intoDeferred(deferred), fiberRuntime.fork ) ), core.map((deferred) => [deferred, pipe(map, MutableHashMap.set(new Key(a, eq), deferred))] as const) ) } return core.succeed([result.value, map] as const) }), core.flatMap(core.deferredAwait), core.flatMap(([patch, b]) => pipe(effect.patchFiberRefs(patch), core.as(b))) ) ) ) } /** @internal */ export const raceFirst = dual< ( that: Effect.Effect ) => ( self: Effect.Effect ) => Effect.Effect, ( self: Effect.Effect, that: Effect.Effect ) => Effect.Effect >(2, ( self: Effect.Effect, that: Effect.Effect ) => pipe( core.exit(self), fiberRuntime.race(core.exit(that)), (effect: Effect.Effect, never, R | R2>) => core.flatten(effect) )) /** @internal */ export const supervised = dual< (supervisor: Supervisor.Supervisor) => (self: Effect.Effect) => Effect.Effect, (self: Effect.Effect, supervisor: Supervisor.Supervisor) => Effect.Effect >(2, (self, supervisor) => { const supervise = core.fiberRefLocallyWith(fiberRuntime.currentSupervisor, (s) => s.zip(supervisor)) return supervise(self) }) /** @internal */ export const timeout = dual< ( duration: Duration.DurationInput ) => (self: Effect.Effect) => Effect.Effect, ( self: Effect.Effect, duration: Duration.DurationInput ) => Effect.Effect >(2, (self, duration) => timeoutFail(self, { onTimeout: () => core.timeoutExceptionFromDuration(duration), duration })) /** @internal */ export const timeoutFail = dual< ( options: { readonly onTimeout: LazyArg readonly duration: Duration.DurationInput } ) => (self: Effect.Effect) => Effect.Effect, ( self: Effect.Effect, options: { readonly onTimeout: LazyArg readonly duration: Duration.DurationInput } ) => Effect.Effect >(2, (self, { duration, onTimeout }) => core.flatten(timeoutTo(self, { onTimeout: () => core.failSync(onTimeout), onSuccess: core.succeed, duration }))) /** @internal */ export const timeoutFailCause = dual< ( options: { readonly onTimeout: LazyArg> readonly duration: Duration.DurationInput } ) => (self: Effect.Effect) => Effect.Effect, ( self: Effect.Effect, options: { readonly onTimeout: LazyArg> readonly duration: Duration.DurationInput } ) => Effect.Effect >(2, (self, { duration, onTimeout }) => core.flatten(timeoutTo(self, { onTimeout: () => core.failCauseSync(onTimeout), onSuccess: core.succeed, duration }))) /** @internal */ export const timeoutOption = dual< ( duration: Duration.DurationInput ) => (self: Effect.Effect) => Effect.Effect, E, R>, ( self: Effect.Effect, duration: Duration.DurationInput ) => Effect.Effect, E, R> >(2, (self, duration) => timeoutTo(self, { duration, onSuccess: Option.some, onTimeout: Option.none })) /** @internal */ export const timeoutTo = dual< ( options: { readonly onTimeout: LazyArg readonly onSuccess: (a: A) => B readonly duration: Duration.DurationInput } ) => (self: Effect.Effect) => Effect.Effect, ( self: Effect.Effect, options: { readonly onTimeout: LazyArg readonly onSuccess: (a: A) => B readonly duration: Duration.DurationInput } ) => Effect.Effect >( 2, (self, { duration, onSuccess, onTimeout }) => core.fiberIdWith((parentFiberId) => core.uninterruptibleMask((restore) => fiberRuntime.raceFibersWith( restore(self), core.interruptible(effect.sleep(duration)), { onSelfWin: (winner, loser) => core.flatMap( winner.await, (exit) => { if (exit._tag === "Success") { return core.flatMap( winner.inheritAll, () => core.as( core.interruptAsFiber(loser, parentFiberId), onSuccess(exit.value) ) ) } else { return core.flatMap( core.interruptAsFiber(loser, parentFiberId), () => core.exitFailCause(exit.cause) ) } } ), onOtherWin: (winner, loser) => core.flatMap( winner.await, (exit) => { if (exit._tag === "Success") { return core.flatMap( winner.inheritAll, () => core.as( core.interruptAsFiber(loser, parentFiberId), onTimeout() ) ) } else { return core.flatMap( core.interruptAsFiber(loser, parentFiberId), () => core.exitFailCause(exit.cause) ) } } ), otherScope: globalScope } ) ) ) ) // circular with Synchronized /** @internal */ const SynchronizedSymbolKey = "effect/Ref/SynchronizedRef" /** @internal */ export const SynchronizedTypeId: Synchronized.SynchronizedRefTypeId = Symbol.for( SynchronizedSymbolKey ) as Synchronized.SynchronizedRefTypeId /** @internal */ export const synchronizedVariance = { /* c8 ignore next */ _A: (_: any) => _ } /** @internal */ class SynchronizedImpl extends Effectable.Class implements Synchronized.SynchronizedRef { readonly [SynchronizedTypeId] = synchronizedVariance readonly [internalRef.RefTypeId] = internalRef.refVariance readonly [Readable.TypeId]: Readable.TypeId = Readable.TypeId constructor( readonly ref: Ref.Ref, readonly withLock: (self: Effect.Effect) => Effect.Effect ) { super() this.get = internalRef.get(this.ref) } readonly get: Effect.Effect commit() { return this.get } modify(f: (a: A) => readonly [B, A]): Effect.Effect { return this.modifyEffect((a) => core.succeed(f(a))) } modifyEffect(f: (a: A) => Effect.Effect): Effect.Effect { return this.withLock( pipe( core.flatMap(internalRef.get(this.ref), f), core.flatMap(([b, a]) => core.as(internalRef.set(this.ref, a), b)) ) ) } } /** @internal */ export const makeSynchronized = (value: A): Effect.Effect> => core.sync(() => unsafeMakeSynchronized(value)) /** @internal */ export const unsafeMakeSynchronized = (value: A): Synchronized.SynchronizedRef => { const ref = internalRef.unsafeMake(value) const sem = unsafeMakeSemaphore(1) return new SynchronizedImpl(ref, sem.withPermits(1)) } /** @internal */ export const updateSomeAndGetEffectSynchronized = dual< ( pf: (a: A) => Option.Option> ) => (self: Synchronized.SynchronizedRef) => Effect.Effect, ( self: Synchronized.SynchronizedRef, pf: (a: A) => Option.Option> ) => Effect.Effect >(2, (self, pf) => self.modifyEffect((value) => { const result = pf(value) switch (result._tag) { case "None": { return core.succeed([value, value] as const) } case "Some": { return core.map(result.value, (a) => [a, a] as const) } } })) // circular with Fiber /** @internal */ export const zipFiber = dual< (that: Fiber.Fiber) => (self: Fiber.Fiber) => Fiber.Fiber<[A, A2], E | E2>, (self: Fiber.Fiber, that: Fiber.Fiber) => Fiber.Fiber<[A, A2], E | E2> >(2, (self, that) => zipWithFiber(self, that, (a, b) => [a, b])) /** @internal */ export const zipLeftFiber = dual< (that: Fiber.Fiber) => (self: Fiber.Fiber) => Fiber.Fiber, (self: Fiber.Fiber, that: Fiber.Fiber) => Fiber.Fiber >(2, (self, that) => zipWithFiber(self, that, (a, _) => a)) /** @internal */ export const zipRightFiber = dual< (that: Fiber.Fiber) => (self: Fiber.Fiber) => Fiber.Fiber, (self: Fiber.Fiber, that: Fiber.Fiber) => Fiber.Fiber >(2, (self, that) => zipWithFiber(self, that, (_, b) => b)) /** @internal */ export const zipWithFiber = dual< ( that: Fiber.Fiber, f: (a: A, b: B) => C ) => (self: Fiber.Fiber) => Fiber.Fiber, ( self: Fiber.Fiber, that: Fiber.Fiber, f: (a: A, b: B) => C ) => Fiber.Fiber >(3, (self, that, f) => ({ ...Effectable.CommitPrototype, commit() { return internalFiber.join(this) }, [internalFiber.FiberTypeId]: internalFiber.fiberVariance, id: () => pipe(self.id(), FiberId.getOrElse(that.id())), await: pipe( self.await, core.flatten, fiberRuntime.zipWithOptions(core.flatten(that.await), f, { concurrent: true }), core.exit ), children: self.children, inheritAll: core.zipRight( that.inheritAll, self.inheritAll ), poll: core.zipWith( self.poll, that.poll, (optionA, optionB) => pipe( optionA, Option.flatMap((exitA) => pipe( optionB, Option.map((exitB) => Exit.zipWith(exitA, exitB, { onSuccess: f, onFailure: internalCause.parallel }) ) ) ) ) ), interruptAsFork: (id) => core.zipRight( self.interruptAsFork(id), that.interruptAsFork(id) ), pipe() { return pipeArguments(this, arguments) } })) /* @internal */ export const bindAll: { < A extends object, X extends Record>, O extends Types.NoExcessProperties<{ readonly concurrency?: Types.Concurrency | undefined readonly batching?: boolean | "inherit" | undefined readonly mode?: "default" | "validate" | "either" | undefined readonly concurrentFinalizers?: boolean | undefined }, O> >( f: (a: A) => [Extract] extends [never] ? X : `Duplicate keys`, options?: undefined | O ): ( self: Effect.Effect ) => [Effect.All.ReturnObject>] extends [Effect.Effect] ? Effect.Effect< { [K in keyof A | keyof Success]: K extends keyof A ? A[K] : K extends keyof Success ? Success[K] : never }, | E1 | Error, R1 | Context > : never < A extends object, X extends Record>, O extends Types.NoExcessProperties<{ readonly concurrency?: Types.Concurrency | undefined readonly batching?: boolean | "inherit" | undefined readonly mode?: "default" | "validate" | "either" | undefined readonly concurrentFinalizers?: boolean | undefined }, O>, E1, R1 >( self: Effect.Effect, f: (a: A) => [Extract] extends [never] ? X : `Duplicate keys`, options?: undefined | { readonly concurrency?: Types.Concurrency | undefined readonly batching?: boolean | "inherit" | undefined readonly mode?: "default" | "validate" | "either" | undefined readonly concurrentFinalizers?: boolean | undefined } ): [Effect.All.ReturnObject>] extends [Effect.Effect] ? Effect.Effect< { [K in keyof A | keyof Success]: K extends keyof A ? A[K] : K extends keyof Success ? Success[K] : never }, | E1 | Error, R1 | Context > : never } = dual((args) => core.isEffect(args[0]), < A extends object, X extends Record>, O extends Types.NoExcessProperties<{ readonly concurrency?: Types.Concurrency | undefined readonly batching?: boolean | "inherit" | undefined readonly mode?: "default" | "validate" | "either" | undefined readonly concurrentFinalizers?: boolean | undefined }, O>, E1, R1 >( self: Effect.Effect, f: (a: A) => X, options?: undefined | O ) => core.flatMap( self, (a) => (fiberRuntime.all(f(a), options) as Effect.All.ReturnObject< X, Effect.All.IsDiscard, Effect.All.ExtractMode >) .pipe( core.map((record) => Object.assign({}, a, record)) ) ))