import { Effect } from "@effect/core/io/Effect" import { _A, RefSym, SynchronizedSym } from "@effect/core/io/Ref/definition" import { SubscriptionRefSym } from "@effect/core/stream/SubscriptionRef/definition" import type { Maybe } from "@tsplus/stdlib/data/Maybe" export class SubscriptionRefInternal implements SubscriptionRef { get [RefSym](): RefSym { return RefSym } get [SynchronizedSym](): SynchronizedSym { return SynchronizedSym } get [SubscriptionRefSym](): SubscriptionRefSym { return SubscriptionRefSym } get [_A](): (_: never) => A { return (a) => a } constructor(readonly ref: Ref, readonly hub: Hub, readonly semaphore: TSemaphore) {} get changes(): Stream.UIO { return Stream.unwrapScoped( this.semaphore.withPermit( this.ref.get.flatMap((a) => Stream.fromHubScoped(this.hub).map((stream) => Stream(a).concat(stream)) ) ) ) } modifyEffect(this: this, f: (a: A) => Effect): Effect { return this.semaphore.withPermit( this.get.flatMap(f).flatMap((tp) => { const [b, a] = tp return this.ref.set(a).as(b).tap(() => this.hub.publish(a)) }) ) } getAndUpdateEffect(this: this, f: (a: A) => Effect): Effect { return this.modifyEffect((v) => f(v).map((result) => [v, result])) } getAndUpdateSomeEffect(this: this, pf: (a: A) => Maybe>): Effect { return this.modifyEffect(v => pf(v).getOrElse(Effect.succeed(v)).map((result) => [v, result])) } modifySomeEffect( this: this, fallback: B, pf: (a: A) => Maybe> ): Effect { return this.modifyEffect(v => pf(v).getOrElse(Effect.succeed([fallback, v] as const))) } updateEffect(this: this, f: (a: A) => Effect): Effect { return this.modifyEffect(v => f(v).map(result => [undefined as void, result] as const)) } updateAndGetEffect(this: this, f: (a: A) => Effect): Effect { return this.modifyEffect(v => f(v).map(result => [result, result] as const)) } updateSomeEffect(this: this, pf: (a: A) => Maybe>): Effect { return this.modifyEffect(v => pf(v).getOrElse(Effect.succeed(v)).map(result => [undefined as void, result] as const) ) } updateSomeAndGetEffect(this: this, pf: (a: A) => Maybe>): Effect { return this.modifyEffect(v => pf(v).getOrElse(Effect.succeed(v)).map(result => [result, result] as const) ) } get get(): Effect { return this.ref.get } modify(this: this, f: (a: A) => readonly [B, A]): Effect { return this.modifyEffect((a) => Effect.sync(f(a))) } set(this: this, a: A): Effect { return this.semaphore.withPermit(this.ref.set(a).tap(() => this.hub.publish(a))) } getAndSet(this: this, a: A): Effect { return this.modify((v) => [v, a] as const) } getAndUpdate(this: this, f: (a: A) => A): Effect { return this.modify((v) => [v, f(v)] as const) } getAndUpdateSome( this: this, pf: (a: A) => Maybe ): Effect { return this.modify((v) => [v, pf(v).getOrElse(v)] as const) } modifySome( this: this, fallback: B, pf: (a: A) => Maybe ): Effect { return this.modify((v) => pf(v).getOrElse([fallback, v] as const)) } update(this: this, f: (a: A) => A): Effect { return this.modify((v) => [undefined as void, f(v)] as const) } updateAndGet(this: this, f: (a: A) => A): Effect { return this.modify(v => { const result = f(v) return [result, result] as const }) } updateSome( this: this, pf: (a: A) => Maybe ): Effect { return this.modify((v) => [undefined as void, pf(v).getOrElse(v)] as const) } updateSomeAndGet( this: this, pf: (a: A) => Maybe ): Effect { return this.modify(v => { const result = pf(v).getOrElse(v) return [result, result] as const }) } }