import type { AtomicHub } from "@effect/core/io/Hub/operations/_internal/AtomicHub" import type { Subscription } from "@effect/core/io/Hub/operations/_internal/Subscription" import { unsafeCompleteDeferred } from "@effect/core/io/Hub/operations/_internal/unsafeCompleteDeferred" import { unsafeOfferAll } from "@effect/core/io/Hub/operations/_internal/unsafeOfferAll" import { unsafePollAllQueue } from "@effect/core/io/Hub/operations/_internal/unsafePollAllQueue" /** * A `Strategy` describes the protocol for how publishers and subscribers * will communicate with each other through the hub. * * @tsplus type effect/core/io/Hub/Strategy */ export interface Strategy { /** * Describes how publishers should signal to subscribers that they are * waiting for space to become available in the hub. */ readonly handleSurplus: ( hub: AtomicHub, subscribers: MutableHashSet, MutableQueue>]>, as: Collection, isShutdown: AtomicBoolean ) => Effect /** * Describes any finalization logic associated with this strategy. */ readonly shutdown: Effect /** * Describes how subscribers should signal to publishers waiting for space * to become available in the hub that space may be available. */ readonly unsafeOnHubEmptySpace: ( hub: AtomicHub, subscribers: MutableHashSet, MutableQueue>]> ) => void /** * Describes how subscribers waiting for additional values from the hub * should take those values and signal to publishers that they are no * longer waiting for additional values. */ readonly unsafeCompletePollers: ( hub: AtomicHub, subscribers: MutableHashSet, MutableQueue>]>, subscription: Subscription, pollers: MutableQueue> ) => void /** * Describes how publishers should signal to subscribers waiting for * additional values from the hub that new values are available. */ readonly unsafeCompleteSubscribers: ( hub: AtomicHub, subscribers: MutableHashSet, MutableQueue>]> ) => void } /** * @tsplus type effect/core/io/Hub/Strategy.Ops */ export interface StrategyOps {} export const Strategy: StrategyOps = {} abstract class BaseStrategy implements Strategy { abstract handleSurplus( hub: AtomicHub, subscribers: MutableHashSet, MutableQueue>]>, as: Collection, isShutdown: AtomicBoolean ): Effect abstract shutdown: Effect abstract unsafeOnHubEmptySpace( hub: AtomicHub, subscribers: MutableHashSet, MutableQueue>]> ): void unsafeCompletePollers( hub: AtomicHub, subscribers: MutableHashSet, MutableQueue>]>, subscription: Subscription, pollers: MutableQueue> ): void { let keepPolling = true while (keepPolling && !subscription.isEmpty) { const poller = pollers.poll(EmptyMutableQueue)! if (poller === EmptyMutableQueue) { const subPollerPair = [subscription, pollers] as const subscribers.remove(subPollerPair) if (pollers.isEmpty) { keepPolling = false } else { subscribers.add(subPollerPair) } } else { const pollResult = subscription.poll(EmptyMutableQueue) if (pollResult == EmptyMutableQueue) { unsafeOfferAll(pollers, unsafePollAllQueue(pollers).prepend(poller)) } else { unsafeCompleteDeferred(poller, pollResult) this.unsafeOnHubEmptySpace(hub, subscribers) } } } } unsafeCompleteSubscribers( hub: AtomicHub, subscribers: MutableHashSet, MutableQueue>]> ): void { for ( const [subscription, pollers] of subscribers ) { this.unsafeCompletePollers(hub, subscribers, subscription, pollers) } } } /** * A strategy that applies back pressure to publishers when the hub is at * capacity. This guarantees that all subscribers will receive all messages * published to the hub while they are subscribed. However, it creates the * risk that a slow subscriber will slow down the rate at which messages * are published and received by other subscribers. */ export class BackPressure extends BaseStrategy { publishers: MutableQueue, boolean]> = MutableQueue .unbounded() handleSurplus( hub: AtomicHub, subscribers: MutableHashSet, MutableQueue>]>, as: Collection, isShutdown: AtomicBoolean ): Effect { return Effect.withFiberRuntime((state) => { const deferred: Deferred = Deferred.unsafeMake(state.id) return Effect.suspendSucceed(() => { this.unsafeOffer(as, deferred) this.unsafeOnHubEmptySpace(hub, subscribers) this.unsafeCompleteSubscribers(hub, subscribers) return isShutdown.get ? Effect.interrupt : deferred.await }).onInterrupt(() => Effect.sync(this.unsafeRemove(deferred))) }) } get shutdown(): Effect { return Effect.Do() .bind("fiberId", () => Effect.fiberId) .bind("publishers", () => Effect.sync(unsafePollAllQueue(this.publishers))) .tap(({ fiberId, publishers }) => Effect.forEachParDiscard( publishers, ([_, deferred, last]) => last ? deferred.interruptAs(fiberId) : Effect.unit ) ) .unit } unsafeOnHubEmptySpace( hub: AtomicHub, subscribers: MutableHashSet, MutableQueue>]> ): void { let keepPolling = true while (keepPolling && !hub.isFull) { const publisher = this.publishers.poll(EmptyMutableQueue)! if (publisher === EmptyMutableQueue) { keepPolling = false } else { const published = hub.publish(publisher[0]) if (published && publisher[2]) { unsafeCompleteDeferred(publisher[1], true) } else if (!published) { unsafeOfferAll( this.publishers, unsafePollAllQueue(this.publishers).prepend(publisher) ) } this.unsafeCompleteSubscribers(hub, subscribers) } } } private unsafeOffer(as: Collection, deferred: Deferred): void { const it = as[Symbol.iterator]() let curr = it.next() if (!curr.done) { let next while ((next = it.next()) && !next.done) { this.publishers.offer([curr.value, deferred, false] as const) curr = next } this.publishers.offer([curr.value, deferred, true] as const) } } private unsafeRemove(deferred: Deferred): void { unsafeOfferAll( this.publishers, unsafePollAllQueue(this.publishers).filter(([_, a]) => a !== deferred) ) } } /** * A strategy that drops new messages when the hub is at capacity. This * guarantees that a slow subscriber will not slow down the rate at which * messages are published. However, it creates the risk that a slow * subscriber will slow down the rate at which messages are received by * other subscribers and that subscribers may not receive all messages * published to the hub while they are subscribed. */ export class Dropping extends BaseStrategy { handleSurplus( _hub: AtomicHub, _subscribers: MutableHashSet, MutableQueue>]>, _as: Collection, _isShutdown: AtomicBoolean ): Effect { return Effect.succeed(false) } shutdown: Effect = Effect.unit unsafeOnHubEmptySpace( _hub: AtomicHub, _subscribers: MutableHashSet, MutableQueue>]> ): void { // } } /** * A strategy that adds new messages and drops old messages when the hub is * at capacity. This guarantees that a slow subscriber will not slow down * the rate at which messages are published and received by other * subscribers. However, it creates the risk that a slow subscriber will * not receive some messages published to the hub while it is subscribed. */ export class Sliding extends BaseStrategy { private unsafeSlidingPublish(hub: AtomicHub, as: Collection): void { const it = as[Symbol.iterator]() let next = it.next() if (!next.done && hub.capacity > 0) { let a = next.value let loop = true while (loop) { hub.slide() const pub = hub.publish(a) if (pub && (next = it.next()) && !next.done) { a = next.value } else if (pub) { loop = false } } } } handleSurplus( hub: AtomicHub, subscribers: MutableHashSet, MutableQueue>]>, as: Collection, _isShutdown: AtomicBoolean ): Effect { return Effect.sync(() => { this.unsafeSlidingPublish(hub, as) this.unsafeCompleteSubscribers(hub, subscribers) return true }) } shutdown: Effect = Effect.unit unsafeOnHubEmptySpace( _hub: AtomicHub, _subscribers: MutableHashSet, MutableQueue>]> ): void { // } } /** * @tsplus static effect/core/io/Hub/Strategy.Ops BackPressure */ export function backPressureStrategy(): Strategy { return new BackPressure() } /** * @tsplus static effect/core/io/Hub/Strategy.Ops Dropping */ export function droppingStrategy(): Strategy { return new Dropping() } /** * @tsplus static effect/core/io/Hub/Strategy.Ops Sliding */ export function slidingStrategy(): Strategy { return new Sliding() }