import { Effect } from "@effect/core/io/Effect/definition" import type { AtomicHub } from "@effect/core/io/Hub/operations/_internal/AtomicHub" import type { Subscription } from "@effect/core/io/Hub/operations/_internal/Subscription" import { unsafePollAllSubscription } from "@effect/core/io/Hub/operations/_internal/unsafePollAllSubscription" import { unsafePollN } from "@effect/core/io/Hub/operations/_internal/unsafePollN" import { unsafeRemove } from "@effect/core/io/Hub/operations/_internal/unsafeRemove" import type { Strategy } from "@effect/core/io/Hub/operations/strategy" import { _In, _Out, QueueSym } from "@effect/core/io/Queue/definition" import { unsafePollAll } from "@effect/core/io/Queue/operations/_internal/unsafePollAll" import { Chunk } from "@tsplus/stdlib/collections/Chunk" import type { Maybe } from "@tsplus/stdlib/data/Maybe" /** * Creates a subscription with the specified strategy. */ export function makeSubscription( hub: AtomicHub, subscribers: MutableHashSet, MutableQueue>]>, strategy: Strategy ): Effect> { return Deferred.make().map((deferred) => unsafeMakeSubscription( hub, subscribers, hub.subscribe(), MutableQueue.unbounded>(), deferred, new AtomicBoolean(false), strategy ) ) } class SubscriptionImpl implements Dequeue { get [QueueSym](): QueueSym { return QueueSym } get [_Out](): (_: never) => A { return (a) => a } constructor( readonly hub: AtomicHub, readonly subscribers: MutableHashSet< readonly [Subscription, MutableQueue>] >, readonly subscription: Subscription, readonly pollers: MutableQueue>, readonly shutdownHook: Deferred, readonly shutdownFlag: AtomicBoolean, readonly strategy: Strategy ) {} get take(): Effect { return Effect.withFiberRuntime((state) => { if (this.shutdownFlag.get) { return Effect.interrupt } const message = this.pollers.isEmpty ? this.subscription.poll(EmptyMutableQueue) : EmptyMutableQueue if (message === EmptyMutableQueue) { const deferred = Deferred.unsafeMake(state.id) return Effect.suspendSucceed(() => { this.pollers.offer(deferred) this.subscribers.add([this.subscription, this.pollers] as const) this.strategy.unsafeCompletePollers( this.hub, this.subscribers, this.subscription, this.pollers ) return this.shutdownFlag.get ? Effect.interrupt : deferred.await }).onInterrupt(() => Effect.sync(unsafeRemove(this.pollers, deferred))) } else { this.strategy.unsafeOnHubEmptySpace(this.hub, this.subscribers) return Effect.succeed(message) } }) } get takeAll(): Effect> { return Effect.suspendSucceed(() => { if (this.shutdownFlag.get) { return Effect.interrupt } const as = this.pollers.isEmpty ? unsafePollAllSubscription(this.subscription) : Chunk.empty() this.strategy.unsafeOnHubEmptySpace(this.hub, this.subscribers) return Effect.succeed(as) }) } takeUpTo(this: this, max: number): Effect> { return Effect.suspendSucceed(() => { if (this.shutdownFlag.get) { return Effect.interrupt } const as = this.pollers.isEmpty ? unsafePollN(this.subscription, max) : Chunk.empty() this.strategy.unsafeOnHubEmptySpace(this.hub, this.subscribers) return Effect.succeed(as) }) } takeRemainderLoop( self: Dequeue, min: number, max: number, acc: Chunk ): Effect> { if (max < min) { return Effect.succeed(acc) } return self.takeUpTo(max).flatMap((bs) => { const remaining = min - bs.length if (remaining === 1) { return self.take.map((b) => (acc + bs).append(b)) } if (remaining > 1) { return self.take.flatMap((b) => this.takeRemainderLoop( self, remaining - 1, max - bs.length - 1, (acc + bs).append(b) ) ) } return Effect.succeed(acc + bs) }) } takeBetween(this: this, min: number, max: number): Effect> { return Effect.suspendSucceed(this.takeRemainderLoop(this, min, max, Chunk.empty())) } takeN(this: this, n: number): Effect> { return this.takeBetween(n, n) } get poll(): Effect> { return this.takeUpTo(1).map((chunk) => chunk.head) } get capacity(): number { return this.hub.capacity } get size(): Effect { return Effect.suspendSucceed( this.shutdownFlag.get ? Effect.interrupt : Effect.succeed(this.subscription.size) ) } get awaitShutdown(): Effect { return this.shutdownHook.await } get isShutdown(): Effect { return Effect.sync(this.shutdownFlag.get) } get shutdown(): Effect { return Effect.withFiberRuntime((state) => { this.shutdownFlag.set(true) return Effect.whenEffect( this.shutdownHook.succeed(undefined), Effect.forEachPar( unsafePollAll(this.pollers), (deferred) => deferred.interruptAs(state.id) ) > Effect.sync(this.subscription.unsubscribe()) > Effect.sync(this.strategy.unsafeOnHubEmptySpace(this.hub, this.subscribers)) ).unit }).uninterruptible } get isFull(): Effect { return this.size.map((size) => size === this.capacity) } get isEmpty(): Effect { return this.size.map((size) => size === 0) } } /** * Unsafely creates a subscription with the specified strategy. */ export function unsafeMakeSubscription( hub: AtomicHub, subscribers: MutableHashSet, MutableQueue>]>, subscription: Subscription, pollers: MutableQueue>, shutdownHook: Deferred, shutdownFlag: AtomicBoolean, strategy: Strategy ): Dequeue { return new SubscriptionImpl( hub, subscribers, subscription, pollers, shutdownHook, shutdownFlag, strategy ) }