import { Effect } from "@effect/core/io/Effect"
import { _A, RefSym, SynchronizedSym } from "@effect/core/io/Ref/definition"
import { SubscriptionRefSym } from "@effect/core/stream/SubscriptionRef/definition"
import type { Maybe } from "@tsplus/stdlib/data/Maybe"
export class SubscriptionRefInternal implements SubscriptionRef {
get [RefSym](): RefSym {
return RefSym
}
get [SynchronizedSym](): SynchronizedSym {
return SynchronizedSym
}
get [SubscriptionRefSym](): SubscriptionRefSym {
return SubscriptionRefSym
}
get [_A](): (_: never) => A {
return (a) => a
}
constructor(readonly ref: Ref, readonly hub: Hub, readonly semaphore: TSemaphore) {}
get changes(): Stream.UIO {
return Stream.unwrapScoped(
this.semaphore.withPermit(
this.ref.get.flatMap((a) =>
Stream.fromHubScoped(this.hub).map((stream) => Stream(a).concat(stream))
)
)
)
}
modifyEffect(this: this, f: (a: A) => Effect): Effect {
return this.semaphore.withPermit(
this.get.flatMap(f).flatMap((tp) => {
const [b, a] = tp
return this.ref.set(a).as(b).tap(() => this.hub.publish(a))
})
)
}
getAndUpdateEffect(this: this, f: (a: A) => Effect): Effect {
return this.modifyEffect((v) => f(v).map((result) => [v, result]))
}
getAndUpdateSomeEffect(this: this, pf: (a: A) => Maybe>): Effect {
return this.modifyEffect(v => pf(v).getOrElse(Effect.succeed(v)).map((result) => [v, result]))
}
modifySomeEffect(
this: this,
fallback: B,
pf: (a: A) => Maybe>
): Effect {
return this.modifyEffect(v => pf(v).getOrElse(Effect.succeed([fallback, v] as const)))
}
updateEffect(this: this, f: (a: A) => Effect): Effect {
return this.modifyEffect(v => f(v).map(result => [undefined as void, result] as const))
}
updateAndGetEffect(this: this, f: (a: A) => Effect): Effect {
return this.modifyEffect(v => f(v).map(result => [result, result] as const))
}
updateSomeEffect(this: this, pf: (a: A) => Maybe>): Effect {
return this.modifyEffect(v =>
pf(v).getOrElse(Effect.succeed(v)).map(result => [undefined as void, result] as const)
)
}
updateSomeAndGetEffect(this: this, pf: (a: A) => Maybe>): Effect {
return this.modifyEffect(v =>
pf(v).getOrElse(Effect.succeed(v)).map(result => [result, result] as const)
)
}
get get(): Effect {
return this.ref.get
}
modify(this: this, f: (a: A) => readonly [B, A]): Effect {
return this.modifyEffect((a) => Effect.sync(f(a)))
}
set(this: this, a: A): Effect {
return this.semaphore.withPermit(this.ref.set(a).tap(() => this.hub.publish(a)))
}
getAndSet(this: this, a: A): Effect {
return this.modify((v) => [v, a] as const)
}
getAndUpdate(this: this, f: (a: A) => A): Effect {
return this.modify((v) => [v, f(v)] as const)
}
getAndUpdateSome(
this: this,
pf: (a: A) => Maybe
): Effect {
return this.modify((v) => [v, pf(v).getOrElse(v)] as const)
}
modifySome(
this: this,
fallback: B,
pf: (a: A) => Maybe
): Effect {
return this.modify((v) => pf(v).getOrElse([fallback, v] as const))
}
update(this: this, f: (a: A) => A): Effect {
return this.modify((v) => [undefined as void, f(v)] as const)
}
updateAndGet(this: this, f: (a: A) => A): Effect {
return this.modify(v => {
const result = f(v)
return [result, result] as const
})
}
updateSome(
this: this,
pf: (a: A) => Maybe
): Effect {
return this.modify((v) => [undefined as void, pf(v).getOrElse(v)] as const)
}
updateSomeAndGet(
this: this,
pf: (a: A) => Maybe
): Effect {
return this.modify(v => {
const result = pf(v).getOrElse(v)
return [result, result] as const
})
}
}