import type { AtomicHub } from "@effect/core/io/Hub/operations/_internal/AtomicHub"
import type { Subscription } from "@effect/core/io/Hub/operations/_internal/Subscription"
import { unsafeCompleteDeferred } from "@effect/core/io/Hub/operations/_internal/unsafeCompleteDeferred"
import { unsafeOfferAll } from "@effect/core/io/Hub/operations/_internal/unsafeOfferAll"
import { unsafePollAllQueue } from "@effect/core/io/Hub/operations/_internal/unsafePollAllQueue"
/**
* A `Strategy` describes the protocol for how publishers and subscribers
* will communicate with each other through the hub.
*
* @tsplus type effect/core/io/Hub/Strategy
*/
export interface Strategy {
/**
* Describes how publishers should signal to subscribers that they are
* waiting for space to become available in the hub.
*/
readonly handleSurplus: (
hub: AtomicHub,
subscribers: MutableHashSet, MutableQueue>]>,
as: Collection,
isShutdown: AtomicBoolean
) => Effect
/**
* Describes any finalization logic associated with this strategy.
*/
readonly shutdown: Effect
/**
* Describes how subscribers should signal to publishers waiting for space
* to become available in the hub that space may be available.
*/
readonly unsafeOnHubEmptySpace: (
hub: AtomicHub,
subscribers: MutableHashSet, MutableQueue>]>
) => void
/**
* Describes how subscribers waiting for additional values from the hub
* should take those values and signal to publishers that they are no
* longer waiting for additional values.
*/
readonly unsafeCompletePollers: (
hub: AtomicHub,
subscribers: MutableHashSet, MutableQueue>]>,
subscription: Subscription,
pollers: MutableQueue>
) => void
/**
* Describes how publishers should signal to subscribers waiting for
* additional values from the hub that new values are available.
*/
readonly unsafeCompleteSubscribers: (
hub: AtomicHub,
subscribers: MutableHashSet, MutableQueue>]>
) => void
}
/**
* @tsplus type effect/core/io/Hub/Strategy.Ops
*/
export interface StrategyOps {}
export const Strategy: StrategyOps = {}
abstract class BaseStrategy implements Strategy {
abstract handleSurplus(
hub: AtomicHub,
subscribers: MutableHashSet, MutableQueue>]>,
as: Collection,
isShutdown: AtomicBoolean
): Effect
abstract shutdown: Effect
abstract unsafeOnHubEmptySpace(
hub: AtomicHub,
subscribers: MutableHashSet, MutableQueue>]>
): void
unsafeCompletePollers(
hub: AtomicHub,
subscribers: MutableHashSet, MutableQueue>]>,
subscription: Subscription,
pollers: MutableQueue>
): void {
let keepPolling = true
while (keepPolling && !subscription.isEmpty) {
const poller = pollers.poll(EmptyMutableQueue)!
if (poller === EmptyMutableQueue) {
const subPollerPair = [subscription, pollers] as const
subscribers.remove(subPollerPair)
if (pollers.isEmpty) {
keepPolling = false
} else {
subscribers.add(subPollerPair)
}
} else {
const pollResult = subscription.poll(EmptyMutableQueue)
if (pollResult == EmptyMutableQueue) {
unsafeOfferAll(pollers, unsafePollAllQueue(pollers).prepend(poller))
} else {
unsafeCompleteDeferred(poller, pollResult)
this.unsafeOnHubEmptySpace(hub, subscribers)
}
}
}
}
unsafeCompleteSubscribers(
hub: AtomicHub,
subscribers: MutableHashSet, MutableQueue>]>
): void {
for (
const [subscription, pollers] of subscribers
) {
this.unsafeCompletePollers(hub, subscribers, subscription, pollers)
}
}
}
/**
* A strategy that applies back pressure to publishers when the hub is at
* capacity. This guarantees that all subscribers will receive all messages
* published to the hub while they are subscribed. However, it creates the
* risk that a slow subscriber will slow down the rate at which messages
* are published and received by other subscribers.
*/
export class BackPressure extends BaseStrategy {
publishers: MutableQueue, boolean]> = MutableQueue
.unbounded()
handleSurplus(
hub: AtomicHub,
subscribers: MutableHashSet, MutableQueue>]>,
as: Collection,
isShutdown: AtomicBoolean
): Effect {
return Effect.withFiberRuntime((state) => {
const deferred: Deferred = Deferred.unsafeMake(state.id)
return Effect.suspendSucceed(() => {
this.unsafeOffer(as, deferred)
this.unsafeOnHubEmptySpace(hub, subscribers)
this.unsafeCompleteSubscribers(hub, subscribers)
return isShutdown.get ? Effect.interrupt : deferred.await
}).onInterrupt(() => Effect.sync(this.unsafeRemove(deferred)))
})
}
get shutdown(): Effect {
return Effect.Do()
.bind("fiberId", () => Effect.fiberId)
.bind("publishers", () => Effect.sync(unsafePollAllQueue(this.publishers)))
.tap(({ fiberId, publishers }) =>
Effect.forEachParDiscard(
publishers,
([_, deferred, last]) => last ? deferred.interruptAs(fiberId) : Effect.unit
)
)
.unit
}
unsafeOnHubEmptySpace(
hub: AtomicHub,
subscribers: MutableHashSet, MutableQueue>]>
): void {
let keepPolling = true
while (keepPolling && !hub.isFull) {
const publisher = this.publishers.poll(EmptyMutableQueue)!
if (publisher === EmptyMutableQueue) {
keepPolling = false
} else {
const published = hub.publish(publisher[0])
if (published && publisher[2]) {
unsafeCompleteDeferred(publisher[1], true)
} else if (!published) {
unsafeOfferAll(
this.publishers,
unsafePollAllQueue(this.publishers).prepend(publisher)
)
}
this.unsafeCompleteSubscribers(hub, subscribers)
}
}
}
private unsafeOffer(as: Collection, deferred: Deferred): void {
const it = as[Symbol.iterator]()
let curr = it.next()
if (!curr.done) {
let next
while ((next = it.next()) && !next.done) {
this.publishers.offer([curr.value, deferred, false] as const)
curr = next
}
this.publishers.offer([curr.value, deferred, true] as const)
}
}
private unsafeRemove(deferred: Deferred): void {
unsafeOfferAll(
this.publishers,
unsafePollAllQueue(this.publishers).filter(([_, a]) => a !== deferred)
)
}
}
/**
* A strategy that drops new messages when the hub is at capacity. This
* guarantees that a slow subscriber will not slow down the rate at which
* messages are published. However, it creates the risk that a slow
* subscriber will slow down the rate at which messages are received by
* other subscribers and that subscribers may not receive all messages
* published to the hub while they are subscribed.
*/
export class Dropping extends BaseStrategy {
handleSurplus(
_hub: AtomicHub,
_subscribers: MutableHashSet, MutableQueue>]>,
_as: Collection,
_isShutdown: AtomicBoolean
): Effect {
return Effect.succeed(false)
}
shutdown: Effect = Effect.unit
unsafeOnHubEmptySpace(
_hub: AtomicHub,
_subscribers: MutableHashSet, MutableQueue>]>
): void {
//
}
}
/**
* A strategy that adds new messages and drops old messages when the hub is
* at capacity. This guarantees that a slow subscriber will not slow down
* the rate at which messages are published and received by other
* subscribers. However, it creates the risk that a slow subscriber will
* not receive some messages published to the hub while it is subscribed.
*/
export class Sliding extends BaseStrategy {
private unsafeSlidingPublish(hub: AtomicHub, as: Collection): void {
const it = as[Symbol.iterator]()
let next = it.next()
if (!next.done && hub.capacity > 0) {
let a = next.value
let loop = true
while (loop) {
hub.slide()
const pub = hub.publish(a)
if (pub && (next = it.next()) && !next.done) {
a = next.value
} else if (pub) {
loop = false
}
}
}
}
handleSurplus(
hub: AtomicHub,
subscribers: MutableHashSet, MutableQueue>]>,
as: Collection,
_isShutdown: AtomicBoolean
): Effect {
return Effect.sync(() => {
this.unsafeSlidingPublish(hub, as)
this.unsafeCompleteSubscribers(hub, subscribers)
return true
})
}
shutdown: Effect = Effect.unit
unsafeOnHubEmptySpace(
_hub: AtomicHub,
_subscribers: MutableHashSet, MutableQueue>]>
): void {
//
}
}
/**
* @tsplus static effect/core/io/Hub/Strategy.Ops BackPressure
*/
export function backPressureStrategy(): Strategy {
return new BackPressure()
}
/**
* @tsplus static effect/core/io/Hub/Strategy.Ops Dropping
*/
export function droppingStrategy(): Strategy {
return new Dropping()
}
/**
* @tsplus static effect/core/io/Hub/Strategy.Ops Sliding
*/
export function slidingStrategy(): Strategy {
return new Sliding()
}