// ets_tracing: off import "../Operator/index.js" import * as AR from "../Collections/Immutable/Array/index.js" import * as Chunk from "../Collections/Immutable/Chunk/index.js" import * as Tp from "../Collections/Immutable/Tuple/index.js" import * as HS from "../Collections/Mutable/HashSet/index.js" import * as ES from "../Effect/ExecutionStrategy.js" import * as T from "../Effect/index.js" import * as Ex from "../Exit/index.js" import * as F from "../Fiber/index.js" import { pipe } from "../Function/index.js" import * as M from "../Managed/index.js" import * as RM from "../Managed/ReleaseMap/index.js" import * as P from "../Promise/index.js" import * as Q from "../Queue/index.js" import { XQueueInternal } from "../Queue/index.js" import * as Ref from "../Ref/index.js" import * as AB from "../Support/AtomicBoolean/index.js" import * as MQ from "../Support/MutableQueue/index.js" import type * as InternalHub from "./_internal/Hub.js" import * as HF from "./_internal/hubFactory.js" import * as U from "./_internal/unsafe.js" import * as PR from "./primitives.js" import * as S from "./Strategy.js" export type HubDequeue = Q.XQueue export type HubEnqueue = Q.XQueue export type Hub = XHub export const HubTypeId = Symbol() /** * A `Hub` is an asynchronous message hub. Publishers * can publish messages of type `A` to the hub and subscribers can subscribe to * take messages of type `B` from the hub. Publishing messages can require an * environment of type `RA` and fail with an error of type `EA`. Taking * messages can require an environment of type `RB` and fail with an error of * type `EB`. */ export interface XHub { readonly typeId: typeof HubTypeId readonly [PR._RA]: (_: RA) => void readonly [PR._RB]: (_: RB) => void readonly [PR._EA]: () => EA readonly [PR._EB]: () => EB readonly [PR._A]: (_: A) => void readonly [PR._B]: () => B } export abstract class XHubInternal implements XHub { readonly typeId: typeof HubTypeId = HubTypeId; readonly [PR._RA]!: (_: RA) => void; readonly [PR._RB]!: (_: RB) => void; readonly [PR._EA]!: () => EA; readonly [PR._EB]!: () => EB; readonly [PR._A]!: (_: A) => void; readonly [PR._B]!: () => B /** * Waits for the hub to be shut down. */ abstract awaitShutdown: T.UIO /** * The maximum capacity of the hub. */ abstract capacity: number /** * Checks whether the hub is shut down. */ abstract isShutdown: T.UIO /** * Publishes a message to the hub, returning whether the message was * published to the hub. */ abstract publish(a: A): T.Effect /** * Publishes all of the specified messages to the hub, returning whether * they were published to the hub. */ abstract publishAll(as: Iterable): T.Effect /** * Shuts down the hub. */ abstract shutdown: T.UIO /** * The current number of messages in the hub. */ abstract size: T.UIO /** * Subscribes to receive messages from the hub. The resulting subscription * can be evaluated multiple times within the scope of the managed to take a * message from the hub each time. */ abstract subscribe: M.Managed> } /** * @ets_optimize remove */ export function concrete( _: XHub ): asserts _ is XHubInternal { // } /** * Waits for the hub to be shut down. */ export function awaitShutdown( self: XHub ): T.UIO { concrete(self) return self.awaitShutdown } /** * The maximum capacity of the hub. */ export function capacity( self: XHub ): number { concrete(self) return self.capacity } /** * Checks whether the hub is shut down. */ export function isShutdown( self: XHub ): T.UIO { concrete(self) return self.isShutdown } /** * Publishes a message to the hub, returning whether the message was * published to the hub. */ export function publish_( self: XHub, a: A ): T.Effect { concrete(self) return self.publish(a) } /** * Publishes a message to the hub, returning whether the message was * published to the hub. * * @ets_data_first publish_ */ export function publish(a: A) { return (self: XHub) => publish_(self, a) } /** * Publishes all of the specified messages to the hub, returning whether * they were published to the hub. */ export function publishAll_( self: XHub, as: Iterable ): T.Effect { concrete(self) return self.publishAll(as) } /** * Publishes all of the specified messages to the hub, returning whether * they were published to the hub. * * @ets_data_first publishAll_ */ export function publishAll(as: Iterable) { return (self: XHub) => publishAll_(self, as) } /** * Shuts down the hub. */ export function shutdown( self: XHub ): T.UIO { concrete(self) return self.shutdown } /** * The current number of messages in the hub. */ export function size( self: XHub ): T.UIO { concrete(self) return self.size } /** * Subscribes to receive messages from the hub. The resulting subscription * can be evaluated multiple times within the scope of the managed to take a * message from the hub each time. */ export function subscribe( self: XHub ): M.Managed> { concrete(self) return self.subscribe } /** * Transforms messages published to the hub using the specified effectual * function. */ export function contramapM_( self: XHub, f: (c: C) => T.Effect ): XHub { return dimapM_(self, f, T.succeed) } /** * Transforms messages published to the hub using the specified effectual * function. * * @ets_data_first contramapM_ */ export function contramapM(f: (c: C) => T.Effect) { return (self: XHub) => contramapM_(self, f) } /** * Transforms messages published to and taken from the hub using the * specified functions. */ export function dimap_( self: XHub, f: (c: C) => A, g: (b: B) => D ): XHub { return dimapM_( self, (c) => T.succeed(f(c)), (b) => T.succeed(g(b)) ) } /** * Transforms messages published to and taken from the hub using the * specified functions. * * @ets_data_first dimap_ */ export function dimap(f: (c: C) => A, g: (b: B) => D) { return (self: XHub) => dimap_(self, f, g) } class DimapMImplementation< RA, RB, RC, RD, EA, EB, EC, ED, A, B, C, D > extends XHubInternal { awaitShutdown: T.UIO capacity: number isShutdown: T.UIO shutdown: T.UIO size: T.UIO subscribe: M.Managed> constructor( readonly source: XHubInternal, readonly f: (c: C) => T.Effect, g: (b: B) => T.Effect ) { super() this.awaitShutdown = source.awaitShutdown this.capacity = source.capacity this.isShutdown = source.isShutdown this.shutdown = source.shutdown this.size = source.size this.subscribe = M.map_(source.subscribe, Q.mapM(g)) } publish(c: C) { return T.chain_(this.f(c), (a) => this.source.publish(a)) } publishAll(cs: Iterable) { return T.chain_(T.forEach_(cs, this.f), (as) => this.source.publishAll(as)) } } /** * Transforms messages published to and taken from the hub using the * specified effectual functions. */ export function dimapM_( self: XHub, f: (c: C) => T.Effect, g: (b: B) => T.Effect ): XHub { concrete(self) return new DimapMImplementation(self, f, g) } /** * Transforms messages published to and taken from the hub using the * specified effectual functions. * * @ets_data_first dimapM_ */ export function dimapM( f: (c: C) => T.Effect, g: (b: B) => T.Effect ) { return (self: XHub) => dimapM_(self, f, g) } class filterInputMImplementation extends XHubInternal< RA & RA1, RB, EA | EA1, EB, A, B > { awaitShutdown: T.UIO capacity: number isShutdown: T.UIO shutdown: T.UIO size: T.UIO subscribe: M.Managed> constructor( readonly source: XHubInternal, readonly f: (a: A) => T.Effect ) { super() this.awaitShutdown = source.awaitShutdown this.capacity = source.capacity this.isShutdown = source.isShutdown this.shutdown = source.shutdown this.size = source.size this.subscribe = source.subscribe } publish(a: A) { return T.chain_(this.f(a), (b) => (b ? this.source.publish(a) : T.succeed(false))) } publishAll(as: Iterable) { return T.chain_(T.filter_(as, this.f), (as) => AR.isNonEmpty(as) ? this.source.publishAll(as) : T.succeed(false) ) } } /** * Filters messages published to the hub using the specified function. */ export function filterInput_( self: XHub, f: (a: A) => boolean ) { return filterInputM_(self, (a) => T.succeed(f(a))) } /** * Filters messages published to the hub using the specified function. * * @ets_data_first filterInput_ */ export function filterInput(f: (a: A) => boolean) { return (self: XHub) => filterInput_(self, f) } /** * Filters messages published to the hub using the specified effectual * function. */ export function filterInputM_( self: XHub, f: (a: A) => T.Effect ): XHub { concrete(self) return new filterInputMImplementation(self, f) } /** * Filters messages published to the hub using the specified effectual * function. * * @ets_data_first filterInputM_ */ export function filterInputM(f: (a: A) => T.Effect) { return (self: XHub) => filterInputM_(self, f) } /** * Filters messages taken from the hub using the specified function. */ export function filterOutput_( self: XHub, f: (b: B) => boolean ): XHub { return filterOutputM_(self, (b) => T.succeed(f(b))) } /** * Filters messages taken from the hub using the specified function. * * @ets_data_first filterOutput_ */ export function filterOutput(f: (b: B) => boolean) { return (self: XHub) => filterOutput_(self, f) } class filterOutputMImplementation extends XHubInternal< RA, RB & RB1, EA, EB | EB1, A, B > { awaitShutdown: T.UIO capacity: number isShutdown: T.UIO shutdown: T.UIO size: T.UIO subscribe: M.Managed> constructor( readonly source: XHubInternal, readonly f: (b: B) => T.Effect ) { super() this.awaitShutdown = source.awaitShutdown this.capacity = source.capacity this.isShutdown = source.isShutdown this.shutdown = source.shutdown this.size = source.size this.subscribe = M.map_(source.subscribe, Q.filterOutputM(f)) } publish(a: A) { return this.source.publish(a) } publishAll(as: Iterable) { return this.source.publishAll(as) } } /** * Filters messages taken from the hub using the specified effectual * function. */ export function filterOutputM_( self: XHub, f: (a: B) => T.Effect ): XHub { concrete(self) return new filterOutputMImplementation(self, f) } /** * Filters messages taken from the hub using the specified effectual * function. * * @ets_data_first filterOutputM_ */ export function filterOutputM(f: (a: B) => T.Effect) { return (self: XHub) => filterOutputM_(self, f) } /** * Transforms messages taken from the hub using the specified function. */ export function map_( self: XHub, f: (b: B) => C ): XHub { return mapM_(self, (b) => T.succeed(f(b))) } /** * Transforms messages taken from the hub using the specified function. * * @ets_data_first map_ */ export function map(f: (b: B) => C) { return (self: XHub) => map_(self, f) } /** * Transforms messages taken from the hub using the specified effectual * function. */ export function mapM_( self: XHub, f: (b: B) => T.Effect ): XHub { return dimapM_(self, (a) => T.succeed(a), f) } /** * Transforms messages taken from the hub using the specified effectual * function. * * @ets_data_first mapM_ */ export function mapM(f: (b: B) => T.Effect) { return (self: XHub) => mapM_(self, f) } class ToQueueImplementation extends XQueueInternal< RA, never, EA, unknown, A, never > { awaitShutdown: T.UIO capacity: number isShutdown: T.UIO shutdown: T.UIO size: T.UIO take: T.Effect takeAll: T.Effect> constructor(readonly source: XHubInternal) { super() this.awaitShutdown = source.awaitShutdown this.capacity = source.capacity this.isShutdown = source.isShutdown this.shutdown = source.shutdown this.size = source.size this.take = T.never this.takeAll = T.succeed(Chunk.empty()) } offer(a: A): T.Effect { return this.source.publish(a) } offerAll(as: Iterable): T.Effect { return this.source.publishAll(as) } takeUpTo(): T.Effect> { return T.succeed(Chunk.empty()) } } /** * Views the hub as a queue that can only be written to. */ export function toQueue( self: XHub ): HubEnqueue { concrete(self) return new ToQueueImplementation(self) } /** * Creates a bounded hub with the back pressure strategy. The hub will retain * messages until they have been taken by all subscribers, applying back * pressure to publishers if the hub is at capacity. * * For best performance use capacities that are powers of two. */ export function makeBounded(requestedCapacity: number): T.UIO> { return T.chain_( T.succeedWith(() => HF.makeBounded(requestedCapacity)), (_) => makeHub(_, new S.BackPressure()) ) } /** * Creates a bounded hub with the back pressure strategy. The hub will retain * messages until they have been taken by all subscribers, applying back * pressure to publishers if the hub is at capacity. * * For best performance use capacities that are powers of two. */ export function unsafeMakeBounded(requestedCapacity: number): Hub { const releaseMap = new RM.ReleaseMap( Ref.unsafeMakeRef(new RM.Running(0, new Map())) ) return unsafeMakeHub( HF.makeBounded(requestedCapacity), makeSubscribersHashSet(), releaseMap, P.unsafeMake(F.None), new AB.AtomicBoolean(false), new S.BackPressure() ) } /** * Creates a bounded hub with the dropping strategy. The hub will drop new * messages if the hub is at capacity. * * For best performance use capacities that are powers of two. */ export function makeDropping(requestedCapacity: number): T.UIO> { return T.chain_( T.succeedWith(() => { return HF.makeBounded(requestedCapacity) }), (_) => makeHub(_, new S.Dropping()) ) } /** * Creates a bounded hub with the dropping strategy. The hub will drop new * messages if the hub is at capacity. * * For best performance use capacities that are powers of two. */ export function unsafeMakeDropping(requestedCapacity: number): Hub { const releaseMap = new RM.ReleaseMap( Ref.unsafeMakeRef(new RM.Running(0, new Map())) ) return unsafeMakeHub( HF.makeBounded(requestedCapacity), makeSubscribersHashSet(), releaseMap, P.unsafeMake(F.None), new AB.AtomicBoolean(false), new S.Dropping() ) } /** * Creates a bounded hub with the sliding strategy. The hub will add new * messages and drop old messages if the hub is at capacity. * * For best performance use capacities that are powers of two. */ export function makeSliding(requestedCapacity: number): T.UIO> { return T.chain_( T.succeedWith(() => { return HF.makeBounded(requestedCapacity) }), (_) => makeHub(_, new S.Sliding()) ) } /** * Creates a bounded hub with the sliding strategy. The hub will add new * messages and drop old messages if the hub is at capacity. * * For best performance use capacities that are powers of two. */ export function unsafeMakeSliding(requestedCapacity: number): Hub { const releaseMap = new RM.ReleaseMap( Ref.unsafeMakeRef(new RM.Running(0, new Map())) ) return unsafeMakeHub( HF.makeBounded(requestedCapacity), makeSubscribersHashSet(), releaseMap, P.unsafeMake(F.None), new AB.AtomicBoolean(false), new S.Sliding() ) } /** * Creates an unbounded hub. */ export function makeUnbounded(): T.UIO> { return T.chain_( T.succeedWith(() => { return HF.makeUnbounded() }), (_) => makeHub(_, new S.Dropping()) ) } /** * Creates an unbounded hub. */ export function unsafeMakeUnbounded(): Hub { const releaseMap = new RM.ReleaseMap( Ref.unsafeMakeRef(new RM.Running(0, new Map())) ) return unsafeMakeHub( HF.makeUnbounded(), makeSubscribersHashSet(), releaseMap, P.unsafeMake(F.None), new AB.AtomicBoolean(false), new S.Dropping() ) } class UnsafeMakeHubImplementation extends XHubInternal< unknown, unknown, never, never, A, A > { awaitShutdown: T.UIO capacity: number isShutdown: T.UIO shutdown: T.UIO size: T.UIO subscribe: M.Managed> constructor( private hub: InternalHub.Hub, private subscribers: HS.HashSet< Tp.Tuple<[InternalHub.Subscription, MQ.MutableQueue>]> >, releaseMap: RM.ReleaseMap, shutdownHook: P.Promise, private shutdownFlag: AB.AtomicBoolean, private strategy: S.Strategy ) { super() this.awaitShutdown = P.await(shutdownHook) this.capacity = hub.capacity this.isShutdown = T.succeedWith(() => shutdownFlag.get) this.shutdown = T.uninterruptible( T.suspend((_, fiberId) => { shutdownFlag.set(true) return T.asUnit( T.whenM_( T.zipRight_( RM.releaseAll(Ex.interrupt(fiberId), ES.parallel)(releaseMap), strategy.shutdown ), P.succeed_(shutdownHook, undefined) ) ) }) ) this.size = T.suspend(() => { if (shutdownFlag.get) { return T.interrupt } return T.succeed(hub.size()) }) this.subscribe = pipe( M.do, M.bind("dequeue", () => T.toManaged(makeSubscription(hub, subscribers, strategy)) ), M.tap(({ dequeue }) => M.makeExit_(RM.add((_) => Q.shutdown(dequeue))(releaseMap), (finalizer, exit) => finalizer(exit) ) ), M.map(({ dequeue }) => dequeue) ) } publish(a: A): T.Effect { return T.suspend(() => { if (this.shutdownFlag.get) { return T.interrupt } if (this.hub.publish(a)) { this.strategy.unsafeCompleteSubscribers(this.hub, this.subscribers) return T.succeed(true) } return this.strategy.handleSurplus( this.hub, this.subscribers, Chunk.single(a), this.shutdownFlag ) }) } publishAll(as: Iterable): T.Effect { return T.suspend(() => { if (this.shutdownFlag.get) { return T.interrupt } const surplus = U.unsafePublishAll(this.hub, as) this.strategy.unsafeCompleteSubscribers(this.hub, this.subscribers) if (Chunk.isEmpty(surplus)) { return T.succeed(true) } return this.strategy.handleSurplus( this.hub, this.subscribers, surplus, this.shutdownFlag ) }) } } function makeHub(hub: InternalHub.Hub, strategy: S.Strategy): T.UIO> { return T.chain_(RM.makeReleaseMap, (releaseMap) => { return T.map_(P.make(), (promise) => { return unsafeMakeHub( hub, makeSubscribersHashSet(), releaseMap, promise, new AB.AtomicBoolean(false), strategy ) }) }) } /** * Unsafely creates a hub with the specified strategy. */ function unsafeMakeHub( hub: InternalHub.Hub, subscribers: HS.HashSet< Tp.Tuple<[InternalHub.Subscription, MQ.MutableQueue>]> >, releaseMap: RM.ReleaseMap, shutdownHook: P.Promise, shutdownFlag: AB.AtomicBoolean, strategy: S.Strategy ): Hub { return new UnsafeMakeHubImplementation( hub, subscribers, releaseMap, shutdownHook, shutdownFlag, strategy ) } /** * Creates a subscription with the specified strategy. */ function makeSubscription( hub: InternalHub.Hub, subscribers: HS.HashSet< Tp.Tuple<[InternalHub.Subscription, MQ.MutableQueue>]> >, strategy: S.Strategy ): T.UIO> { return T.map_(P.make(), (promise) => { return unsafeMakeSubscription( hub, subscribers, hub.subscribe(), new MQ.Unbounded>(), promise, new AB.AtomicBoolean(false), strategy ) }) } class UnsafeMakeSubscriptionImplementation extends XQueueInternal< never, unknown, unknown, never, never, A > { constructor( private hub: InternalHub.Hub, private subscribers: HS.HashSet< Tp.Tuple<[InternalHub.Subscription, MQ.MutableQueue>]> >, private subscription: InternalHub.Subscription, private pollers: MQ.MutableQueue>, private shutdownHook: P.Promise, private shutdownFlag: AB.AtomicBoolean, private strategy: S.Strategy ) { super() } awaitShutdown: T.UIO = P.await(this.shutdownHook) capacity: number = this.hub.capacity isShutdown: T.UIO = T.succeedWith(() => this.shutdownFlag.get) shutdown: T.UIO = T.uninterruptible( T.suspend((_, fiberId) => { this.shutdownFlag.set(true) return T.asUnit( T.whenM_( T.zipRight_( T.forEachPar_(U.unsafePollAllQueue(this.pollers), (_) => { return P.interruptAs(fiberId)(_) }), T.succeedWith(() => this.subscription.unsubscribe()) ), P.succeed_(this.shutdownHook, undefined) ) ) }) ) size: T.UIO = T.suspend(() => { if (this.shutdownFlag.get) { return T.interrupt } return T.succeed(this.subscription.size()) }) offer(_: never): T.Effect { return T.succeed(false) } offerAll(_: Iterable): T.Effect { return T.succeed(false) } take: T.Effect = T.suspend((_, fiberId) => { if (this.shutdownFlag.get) { return T.interrupt } const message = this.pollers.isEmpty ? this.subscription.poll(MQ.EmptyQueue) : MQ.EmptyQueue if (message === MQ.EmptyQueue) { const promise = P.unsafeMake(fiberId) return T.onInterrupt_( T.suspend(() => { this.pollers.offer(promise) this.subscribers.add(Tp.tuple(this.subscription, this.pollers)) this.strategy.unsafeCompletePollers( this.hub, this.subscribers, this.subscription, this.pollers ) if (this.shutdownFlag.get) { return T.interrupt } else { return P.await(promise) } }), () => T.succeedWith(() => { U.unsafeRemove(this.pollers, promise) }) ) } else { this.strategy.unsafeOnHubEmptySpace(this.hub, this.subscribers) return T.succeed(message) } }) takeAll: T.Effect> = T.suspend(() => { if (this.shutdownFlag.get) { return T.interrupt } const as = this.pollers.isEmpty ? U.unsafePollAllSubscription(this.subscription) : Chunk.empty() this.strategy.unsafeOnHubEmptySpace(this.hub, this.subscribers) return T.succeed(as) }) takeUpTo(n: number): T.Effect> { return T.suspend(() => { if (this.shutdownFlag.get) { return T.interrupt } const as = this.pollers.isEmpty ? U.unsafePollN(this.subscription, n) : Chunk.empty() this.strategy.unsafeOnHubEmptySpace(this.hub, this.subscribers) return T.succeed(as) }) } } /** * Unsafely creates a subscription with the specified strategy. */ function unsafeMakeSubscription( hub: InternalHub.Hub, subscribers: HS.HashSet< Tp.Tuple<[InternalHub.Subscription, MQ.MutableQueue>]> >, subscription: InternalHub.Subscription, pollers: MQ.MutableQueue>, shutdownHook: P.Promise, shutdownFlag: AB.AtomicBoolean, strategy: S.Strategy ): Q.Dequeue { return new UnsafeMakeSubscriptionImplementation( hub, subscribers, subscription, pollers, shutdownHook, shutdownFlag, strategy ) } function makeSubscribersHashSet(): HS.HashSet< Tp.Tuple<[InternalHub.Subscription, MQ.MutableQueue>]> > { return HS.make< Tp.Tuple<[InternalHub.Subscription, MQ.MutableQueue>]> >() }