import { Effect } from "@effect/core/io/Effect/definition" import { HubSym } from "@effect/core/io/Hub/definition" import type { AtomicHub } from "@effect/core/io/Hub/operations/_internal/AtomicHub" import { makeSubscription } from "@effect/core/io/Hub/operations/_internal/makeSubscription" import type { Subscription } from "@effect/core/io/Hub/operations/_internal/Subscription" import { unsafePublishAll } from "@effect/core/io/Hub/operations/_internal/unsafePublishAll" import type { Strategy } from "@effect/core/io/Hub/operations/strategy" import type { Dequeue } from "@effect/core/io/Queue/definition" import { _In, _Out, QueueSym } from "@effect/core/io/Queue/definition" import { Scope } from "@effect/core/io/Scope/definition" import type { Collection } from "@tsplus/stdlib/collections/Collection" /** * Creates a hub with the specified strategy. */ export function makeHub(hub: AtomicHub, strategy: Strategy): Effect> { return Scope.make.flatMap((scope) => Deferred.make().map((deferred) => unsafeMakeHub( hub, MutableHashSet.empty(), scope, deferred, new AtomicBoolean(false), strategy ) ) ) } class HubImpl implements Hub { get [_In](): (_: A) => unknown { return (a) => a } get [HubSym](): HubSym { return HubSym } get [QueueSym](): QueueSym { return QueueSym } constructor( readonly hub: AtomicHub, readonly subscribers: MutableHashSet< readonly [Subscription, MutableQueue>] >, readonly scope: Scope.Closeable, readonly shutdownHook: Deferred, readonly shutdownFlag: AtomicBoolean, readonly strategy: Strategy ) {} publish(this: this, a: A): Effect { return Effect.suspendSucceed(() => { if (this.shutdownFlag.get) { return Effect.interrupt } if ((this.hub as AtomicHub).publish(a)) { this.strategy.unsafeCompleteSubscribers(this.hub, this.subscribers) return Effect.succeed(true) } return this.strategy.handleSurplus( this.hub, this.subscribers, Chunk.single(a), this.shutdownFlag ) }) } publishAll(this: this, as: Collection): Effect { return Effect.suspendSucceed(() => { if (this.shutdownFlag.get) { return Effect.interrupt } const surplus = unsafePublishAll(this.hub, as) this.strategy.unsafeCompleteSubscribers(this.hub, this.subscribers) if (surplus.isEmpty) { return Effect.succeed(true) } return this.strategy.handleSurplus( this.hub, this.subscribers, surplus, this.shutdownFlag ) }) } get subscribe(): Effect> { return Effect.acquireRelease( makeSubscription(this.hub, this.subscribers, this.strategy).tap((dequeue) => this.scope.addFinalizer(dequeue.shutdown) ), (dequeue) => dequeue.shutdown ) } offer(this: this, a: A): Effect { return this.publish(a) } offerAll(this: this, as: Collection): Effect { return this.publishAll(as) } get capacity(): number { return this.hub.capacity } get size(): Effect { return Effect.suspendSucceed( this.shutdownFlag.get ? Effect.interrupt : Effect.sync(this.hub.size) ) } get awaitShutdown(): Effect { return this.shutdownHook.await } get isShutdown(): Effect { return Effect.sync(this.shutdownFlag.get) } get shutdown(): Effect { return Effect.withFiberRuntime((state) => { this.shutdownFlag.set(true) return Effect.whenEffect( this.shutdownHook.succeed(undefined), this.scope.close(Exit.interrupt(state.id)) > this.strategy.shutdown ).unit }).uninterruptible } get isFull(): Effect { return this.size.map((size) => size === this.capacity) } get isEmpty(): Effect { return this.size.map((size) => size === 0) } } /** * Unsafely creates a hub with the specified strategy. */ export function unsafeMakeHub( hub: AtomicHub, subscribers: MutableHashSet, MutableQueue>]>, scope: Scope.Closeable, shutdownHook: Deferred, shutdownFlag: AtomicBoolean, strategy: Strategy ): Hub { return new HubImpl(hub, subscribers, scope, shutdownHook, shutdownFlag, strategy) }