import { Effect } from "@effect/core/io/Effect/definition"
import { HubSym } from "@effect/core/io/Hub/definition"
import type { AtomicHub } from "@effect/core/io/Hub/operations/_internal/AtomicHub"
import { makeSubscription } from "@effect/core/io/Hub/operations/_internal/makeSubscription"
import type { Subscription } from "@effect/core/io/Hub/operations/_internal/Subscription"
import { unsafePublishAll } from "@effect/core/io/Hub/operations/_internal/unsafePublishAll"
import type { Strategy } from "@effect/core/io/Hub/operations/strategy"
import type { Dequeue } from "@effect/core/io/Queue/definition"
import { _In, _Out, QueueSym } from "@effect/core/io/Queue/definition"
import { Scope } from "@effect/core/io/Scope/definition"
import type { Collection } from "@tsplus/stdlib/collections/Collection"
/**
* Creates a hub with the specified strategy.
*/
export function makeHub(hub: AtomicHub, strategy: Strategy): Effect> {
return Scope.make.flatMap((scope) =>
Deferred.make().map((deferred) =>
unsafeMakeHub(
hub,
MutableHashSet.empty(),
scope,
deferred,
new AtomicBoolean(false),
strategy
)
)
)
}
class HubImpl implements Hub {
get [_In](): (_: A) => unknown {
return (a) => a
}
get [HubSym](): HubSym {
return HubSym
}
get [QueueSym](): QueueSym {
return QueueSym
}
constructor(
readonly hub: AtomicHub,
readonly subscribers: MutableHashSet<
readonly [Subscription, MutableQueue>]
>,
readonly scope: Scope.Closeable,
readonly shutdownHook: Deferred,
readonly shutdownFlag: AtomicBoolean,
readonly strategy: Strategy
) {}
publish(this: this, a: A): Effect {
return Effect.suspendSucceed(() => {
if (this.shutdownFlag.get) {
return Effect.interrupt
}
if ((this.hub as AtomicHub).publish(a)) {
this.strategy.unsafeCompleteSubscribers(this.hub, this.subscribers)
return Effect.succeed(true)
}
return this.strategy.handleSurplus(
this.hub,
this.subscribers,
Chunk.single(a),
this.shutdownFlag
)
})
}
publishAll(this: this, as: Collection): Effect {
return Effect.suspendSucceed(() => {
if (this.shutdownFlag.get) {
return Effect.interrupt
}
const surplus = unsafePublishAll(this.hub, as)
this.strategy.unsafeCompleteSubscribers(this.hub, this.subscribers)
if (surplus.isEmpty) {
return Effect.succeed(true)
}
return this.strategy.handleSurplus(
this.hub,
this.subscribers,
surplus,
this.shutdownFlag
)
})
}
get subscribe(): Effect> {
return Effect.acquireRelease(
makeSubscription(this.hub, this.subscribers, this.strategy).tap((dequeue) =>
this.scope.addFinalizer(dequeue.shutdown)
),
(dequeue) => dequeue.shutdown
)
}
offer(this: this, a: A): Effect {
return this.publish(a)
}
offerAll(this: this, as: Collection): Effect {
return this.publishAll(as)
}
get capacity(): number {
return this.hub.capacity
}
get size(): Effect {
return Effect.suspendSucceed(
this.shutdownFlag.get ?
Effect.interrupt :
Effect.sync(this.hub.size)
)
}
get awaitShutdown(): Effect {
return this.shutdownHook.await
}
get isShutdown(): Effect {
return Effect.sync(this.shutdownFlag.get)
}
get shutdown(): Effect {
return Effect.withFiberRuntime((state) => {
this.shutdownFlag.set(true)
return Effect.whenEffect(
this.shutdownHook.succeed(undefined),
this.scope.close(Exit.interrupt(state.id)) >
this.strategy.shutdown
).unit
}).uninterruptible
}
get isFull(): Effect {
return this.size.map((size) => size === this.capacity)
}
get isEmpty(): Effect {
return this.size.map((size) => size === 0)
}
}
/**
* Unsafely creates a hub with the specified strategy.
*/
export function unsafeMakeHub(
hub: AtomicHub,
subscribers: MutableHashSet, MutableQueue>]>,
scope: Scope.Closeable,
shutdownHook: Deferred,
shutdownFlag: AtomicBoolean,
strategy: Strategy
): Hub {
return new HubImpl(hub, subscribers, scope, shutdownHook, shutdownFlag, strategy)
}