import * as Effect from "../Effect.js" import * as Effectable from "../Effectable.js" import { dual, pipe } from "../Function.js" import * as PubSub from "../PubSub.js" import * as Readable from "../Readable.js" import * as Ref from "../Ref.js" import type { Stream } from "../Stream.js" import * as Subscribable from "../Subscribable.js" import type * as SubscriptionRef from "../SubscriptionRef.js" import * as Synchronized from "../SynchronizedRef.js" import * as circular_ from "./effect/circular.js" import * as ref_ from "./ref.js" import * as stream from "./stream.js" /** @internal */ const SubscriptionRefSymbolKey = "effect/SubscriptionRef" /** @internal */ export const SubscriptionRefTypeId: SubscriptionRef.SubscriptionRefTypeId = Symbol.for( SubscriptionRefSymbolKey ) as SubscriptionRef.SubscriptionRefTypeId const subscriptionRefVariance = { /* c8 ignore next */ _A: (_: any) => _ } /** @internal */ class SubscriptionRefImpl extends Effectable.Class implements SubscriptionRef.SubscriptionRef { readonly [Readable.TypeId]: Readable.TypeId = Readable.TypeId readonly [Subscribable.TypeId]: Subscribable.TypeId = Subscribable.TypeId readonly [Ref.RefTypeId] = ref_.refVariance readonly [Synchronized.SynchronizedRefTypeId] = circular_.synchronizedVariance readonly [SubscriptionRefTypeId] = subscriptionRefVariance constructor( readonly ref: Ref.Ref, readonly pubsub: PubSub.PubSub, readonly semaphore: Effect.Semaphore ) { super() this.get = Ref.get(this.ref) } commit() { return this.get } readonly get: Effect.Effect get changes(): Stream { return pipe( Ref.get(this.ref), Effect.flatMap((a) => Effect.map( stream.fromPubSub(this.pubsub, { scoped: true }), (s) => stream.concat( stream.make(a), s ) ) ), this.semaphore.withPermits(1), stream.unwrapScoped ) } modify(f: (a: A) => readonly [B, A]): Effect.Effect { return this.modifyEffect((a) => Effect.succeed(f(a))) } modifyEffect(f: (a: A) => Effect.Effect): Effect.Effect { return pipe( Ref.get(this.ref), Effect.flatMap(f), Effect.flatMap(([b, a]) => pipe( Ref.set(this.ref, a), Effect.as(b), Effect.zipLeft(PubSub.publish(this.pubsub, a)) ) ), this.semaphore.withPermits(1) ) } } /** @internal */ export const get = (self: SubscriptionRef.SubscriptionRef): Effect.Effect => Ref.get(self.ref) /** @internal */ export const make = (value: A): Effect.Effect> => pipe( Effect.all([ PubSub.unbounded(), Ref.make(value), Effect.makeSemaphore(1) ]), Effect.map(([pubsub, ref, semaphore]) => new SubscriptionRefImpl(ref, pubsub, semaphore)) ) /** @internal */ export const modify = dual< (f: (a: A) => readonly [B, A]) => (self: SubscriptionRef.SubscriptionRef) => Effect.Effect, ( self: SubscriptionRef.SubscriptionRef, f: (a: A) => readonly [B, A] ) => Effect.Effect >(2, ( self: SubscriptionRef.SubscriptionRef, f: (a: A) => readonly [B, A] ): Effect.Effect => self.modify(f)) /** @internal */ export const modifyEffect = dual< ( f: (a: A) => Effect.Effect ) => (self: SubscriptionRef.SubscriptionRef) => Effect.Effect, ( self: SubscriptionRef.SubscriptionRef, f: (a: A) => Effect.Effect ) => Effect.Effect >(2, ( self: SubscriptionRef.SubscriptionRef, f: (a: A) => Effect.Effect ): Effect.Effect => self.modifyEffect(f)) /** @internal */ export const set = dual< (value: A) => (self: SubscriptionRef.SubscriptionRef) => Effect.Effect, ( self: SubscriptionRef.SubscriptionRef, value: A ) => Effect.Effect >(2, ( self: SubscriptionRef.SubscriptionRef, value: A ): Effect.Effect => pipe( Ref.set(self.ref, value), Effect.zipLeft(PubSub.publish(self.pubsub, value)), self.semaphore.withPermits(1) ))