import * as Arr from "../Array.js" import * as Chunk from "../Chunk.js" import type * as Deferred from "../Deferred.js" import type * as Effect from "../Effect.js" import * as Effectable from "../Effectable.js" import { dual, pipe } from "../Function.js" import * as MutableQueue from "../MutableQueue.js" import * as MutableRef from "../MutableRef.js" import * as Option from "../Option.js" import { pipeArguments } from "../Pipeable.js" import { hasProperty } from "../Predicate.js" import type * as Queue from "../Queue.js" import * as core from "./core.js" import * as fiberRuntime from "./fiberRuntime.js" /** @internal */ const EnqueueSymbolKey = "effect/QueueEnqueue" /** @internal */ export const EnqueueTypeId: Queue.EnqueueTypeId = Symbol.for(EnqueueSymbolKey) as Queue.EnqueueTypeId /** @internal */ const DequeueSymbolKey = "effect/QueueDequeue" /** @internal */ export const DequeueTypeId: Queue.DequeueTypeId = Symbol.for(DequeueSymbolKey) as Queue.DequeueTypeId /** @internal */ const QueueStrategySymbolKey = "effect/QueueStrategy" /** @internal */ export const QueueStrategyTypeId: Queue.QueueStrategyTypeId = Symbol.for( QueueStrategySymbolKey ) as Queue.QueueStrategyTypeId /** @internal */ const BackingQueueSymbolKey = "effect/BackingQueue" /** @internal */ export const BackingQueueTypeId: Queue.BackingQueueTypeId = Symbol.for( BackingQueueSymbolKey ) as Queue.BackingQueueTypeId const queueStrategyVariance = { /* c8 ignore next */ _A: (_: any) => _ } const backingQueueVariance = { /* c8 ignore next */ _A: (_: any) => _ } /** @internal */ export const enqueueVariance = { /* c8 ignore next */ _In: (_: unknown) => _ } /** @internal */ export const dequeueVariance = { /* c8 ignore next */ _Out: (_: never) => _ } /** @internal */ class QueueImpl extends Effectable.Class implements Queue.Queue { readonly [EnqueueTypeId] = enqueueVariance readonly [DequeueTypeId] = dequeueVariance constructor( /** @internal */ readonly queue: Queue.BackingQueue, /** @internal */ readonly takers: MutableQueue.MutableQueue>, /** @internal */ readonly shutdownHook: Deferred.Deferred, /** @internal */ readonly shutdownFlag: MutableRef.MutableRef, /** @internal */ readonly strategy: Queue.Strategy ) { super() } pipe() { return pipeArguments(this, arguments) } commit() { return this.take } capacity(): number { return this.queue.capacity() } get size(): Effect.Effect { return core.suspend(() => core.catchAll(this.unsafeSize(), () => core.interrupt)) } unsafeSize() { if (MutableRef.get(this.shutdownFlag)) { return Option.none() } return Option.some( this.queue.length() - MutableQueue.length(this.takers) + this.strategy.surplusSize() ) } get isEmpty(): Effect.Effect { return core.map(this.size, (size) => size <= 0) } get isFull(): Effect.Effect { return core.map(this.size, (size) => size >= this.capacity()) } get shutdown(): Effect.Effect { return core.uninterruptible( core.withFiberRuntime((state) => { pipe(this.shutdownFlag, MutableRef.set(true)) return pipe( fiberRuntime.forEachConcurrentDiscard( unsafePollAll(this.takers), (d) => core.deferredInterruptWith(d, state.id()), false, false ), core.zipRight(this.strategy.shutdown), core.whenEffect(core.deferredSucceed(this.shutdownHook, void 0)), core.asVoid ) }) ) } get isShutdown(): Effect.Effect { return core.sync(() => MutableRef.get(this.shutdownFlag)) } get awaitShutdown(): Effect.Effect { return core.deferredAwait(this.shutdownHook) } isActive() { return !MutableRef.get(this.shutdownFlag) } unsafeOffer(value: A): boolean { if (MutableRef.get(this.shutdownFlag)) { return false } let noRemaining: boolean if (this.queue.length() === 0) { const taker = pipe( this.takers, MutableQueue.poll(MutableQueue.EmptyMutableQueue) ) if (taker !== MutableQueue.EmptyMutableQueue) { unsafeCompleteDeferred(taker, value) noRemaining = true } else { noRemaining = false } } else { noRemaining = false } if (noRemaining) { return true } // Not enough takers, offer to the queue const succeeded = this.queue.offer(value) unsafeCompleteTakers(this.strategy, this.queue, this.takers) return succeeded } offer(value: A): Effect.Effect { return core.suspend(() => { if (MutableRef.get(this.shutdownFlag)) { return core.interrupt } let noRemaining: boolean if (this.queue.length() === 0) { const taker = pipe( this.takers, MutableQueue.poll(MutableQueue.EmptyMutableQueue) ) if (taker !== MutableQueue.EmptyMutableQueue) { unsafeCompleteDeferred(taker, value) noRemaining = true } else { noRemaining = false } } else { noRemaining = false } if (noRemaining) { return core.succeed(true) } // Not enough takers, offer to the queue const succeeded = this.queue.offer(value) unsafeCompleteTakers(this.strategy, this.queue, this.takers) return succeeded ? core.succeed(true) : this.strategy.handleSurplus([value], this.queue, this.takers, this.shutdownFlag) }) } offerAll(iterable: Iterable): Effect.Effect { return core.suspend(() => { if (MutableRef.get(this.shutdownFlag)) { return core.interrupt } const values = Arr.fromIterable(iterable) const pTakers = this.queue.length() === 0 ? Arr.fromIterable(unsafePollN(this.takers, values.length)) : Arr.empty const [forTakers, remaining] = pipe(values, Arr.splitAt(pTakers.length)) for (let i = 0; i < pTakers.length; i++) { const taker = (pTakers as any)[i] const item = forTakers[i] unsafeCompleteDeferred(taker, item) } if (remaining.length === 0) { return core.succeed(true) } // Not enough takers, offer to the queue const surplus = this.queue.offerAll(remaining) unsafeCompleteTakers(this.strategy, this.queue, this.takers) return Chunk.isEmpty(surplus) ? core.succeed(true) : this.strategy.handleSurplus(surplus, this.queue, this.takers, this.shutdownFlag) }) } get take(): Effect.Effect { return core.withFiberRuntime((state) => { if (MutableRef.get(this.shutdownFlag)) { return core.interrupt } const item = this.queue.poll(MutableQueue.EmptyMutableQueue) if (item !== MutableQueue.EmptyMutableQueue) { this.strategy.unsafeOnQueueEmptySpace(this.queue, this.takers) return core.succeed(item) } else { // Add the deferred to takers, then: // - Try to take again in case a value was added since // - Wait for the deferred to be completed // - Clean up resources in case of interruption const deferred = core.deferredUnsafeMake(state.id()) return pipe( core.suspend(() => { pipe(this.takers, MutableQueue.offer(deferred)) unsafeCompleteTakers(this.strategy, this.queue, this.takers) return MutableRef.get(this.shutdownFlag) ? core.interrupt : core.deferredAwait(deferred) }), core.onInterrupt(() => { return core.sync(() => unsafeRemove(this.takers, deferred)) }) ) } }) } get takeAll(): Effect.Effect> { return core.suspend(() => { return MutableRef.get(this.shutdownFlag) ? core.interrupt : core.sync(() => { const values = this.queue.pollUpTo(Number.POSITIVE_INFINITY) this.strategy.unsafeOnQueueEmptySpace(this.queue, this.takers) return Chunk.fromIterable(values) }) }) } takeUpTo(max: number): Effect.Effect> { return core.suspend(() => MutableRef.get(this.shutdownFlag) ? core.interrupt : core.sync(() => { const values = this.queue.pollUpTo(max) this.strategy.unsafeOnQueueEmptySpace(this.queue, this.takers) return Chunk.fromIterable(values) }) ) } takeBetween(min: number, max: number): Effect.Effect> { return core.suspend(() => takeRemainderLoop( this, min, max, Chunk.empty() ) ) } } /** @internal */ const takeRemainderLoop = ( self: Queue.Dequeue, min: number, max: number, acc: Chunk.Chunk ): Effect.Effect> => { if (max < min) { return core.succeed(acc) } return pipe( takeUpTo(self, max), core.flatMap((bs) => { const remaining = min - bs.length if (remaining === 1) { return pipe( take(self), core.map((b) => pipe(acc, Chunk.appendAll(bs), Chunk.append(b))) ) } if (remaining > 1) { return pipe( take(self), core.flatMap((b) => takeRemainderLoop( self, remaining - 1, max - bs.length - 1, pipe(acc, Chunk.appendAll(bs), Chunk.append(b)) ) ) ) } return core.succeed(pipe(acc, Chunk.appendAll(bs))) }) ) } /** @internal */ export const isQueue = (u: unknown): u is Queue.Queue => isEnqueue(u) && isDequeue(u) /** @internal */ export const isEnqueue = (u: unknown): u is Queue.Enqueue => hasProperty(u, EnqueueTypeId) /** @internal */ export const isDequeue = (u: unknown): u is Queue.Dequeue => hasProperty(u, DequeueTypeId) /** @internal */ export const bounded = (requestedCapacity: number): Effect.Effect> => pipe( core.sync(() => MutableQueue.bounded(requestedCapacity)), core.flatMap((queue) => make(backingQueueFromMutableQueue(queue), backPressureStrategy())) ) /** @internal */ export const dropping = (requestedCapacity: number): Effect.Effect> => pipe( core.sync(() => MutableQueue.bounded(requestedCapacity)), core.flatMap((queue) => make(backingQueueFromMutableQueue(queue), droppingStrategy())) ) /** @internal */ export const sliding = (requestedCapacity: number): Effect.Effect> => pipe( core.sync(() => MutableQueue.bounded(requestedCapacity)), core.flatMap((queue) => make(backingQueueFromMutableQueue(queue), slidingStrategy())) ) /** @internal */ export const unbounded = (): Effect.Effect> => pipe( core.sync(() => MutableQueue.unbounded()), core.flatMap((queue) => make(backingQueueFromMutableQueue(queue), droppingStrategy())) ) /** @internal */ const unsafeMake = ( queue: Queue.BackingQueue, takers: MutableQueue.MutableQueue>, shutdownHook: Deferred.Deferred, shutdownFlag: MutableRef.MutableRef, strategy: Queue.Strategy ): Queue.Queue => { return new QueueImpl(queue, takers, shutdownHook, shutdownFlag, strategy) } /** @internal */ export const make = ( queue: Queue.BackingQueue, strategy: Queue.Strategy ): Effect.Effect> => pipe( core.deferredMake(), core.map((deferred) => unsafeMake( queue, MutableQueue.unbounded(), deferred, MutableRef.make(false), strategy ) ) ) /** @internal */ export class BackingQueueFromMutableQueue implements Queue.BackingQueue { readonly [BackingQueueTypeId] = backingQueueVariance constructor(readonly mutable: MutableQueue.MutableQueue) {} poll(def: Def): A | Def { return MutableQueue.poll(this.mutable, def) } pollUpTo(limit: number): Chunk.Chunk { return MutableQueue.pollUpTo(this.mutable, limit) } offerAll(elements: Iterable): Chunk.Chunk { return MutableQueue.offerAll(this.mutable, elements) } offer(element: A): boolean { return MutableQueue.offer(this.mutable, element) } capacity(): number { return MutableQueue.capacity(this.mutable) } length(): number { return MutableQueue.length(this.mutable) } } /** @internal */ export const backingQueueFromMutableQueue = (mutable: MutableQueue.MutableQueue): Queue.BackingQueue => new BackingQueueFromMutableQueue(mutable) /** @internal */ export const capacity = (self: Queue.Dequeue | Queue.Enqueue): number => self.capacity() /** @internal */ export const size = (self: Queue.Dequeue | Queue.Enqueue): Effect.Effect => self.size /** @internal */ export const isFull = (self: Queue.Dequeue | Queue.Enqueue): Effect.Effect => self.isFull /** @internal */ export const isEmpty = (self: Queue.Dequeue | Queue.Enqueue): Effect.Effect => self.isEmpty /** @internal */ export const isShutdown = (self: Queue.Dequeue | Queue.Enqueue): Effect.Effect => self.isShutdown /** @internal */ export const awaitShutdown = (self: Queue.Dequeue | Queue.Enqueue): Effect.Effect => self.awaitShutdown /** @internal */ export const shutdown = (self: Queue.Dequeue | Queue.Enqueue): Effect.Effect => self.shutdown /** @internal */ export const offer = dual< (value: A) => (self: Queue.Enqueue) => Effect.Effect, (self: Queue.Enqueue, value: A) => Effect.Effect >(2, (self, value) => self.offer(value)) /** @internal */ export const unsafeOffer = dual< (value: A) => (self: Queue.Enqueue) => boolean, (self: Queue.Enqueue, value: A) => boolean >(2, (self, value) => self.unsafeOffer(value)) /** @internal */ export const offerAll = dual< ( iterable: Iterable ) => (self: Queue.Enqueue) => Effect.Effect, ( self: Queue.Enqueue, iterable: Iterable ) => Effect.Effect >(2, (self, iterable) => self.offerAll(iterable)) /** @internal */ export const poll = (self: Queue.Dequeue): Effect.Effect> => core.map(self.takeUpTo(1), Chunk.head) /** @internal */ export const take = (self: Queue.Dequeue): Effect.Effect => self.take /** @internal */ export const takeAll = (self: Queue.Dequeue): Effect.Effect> => self.takeAll /** @internal */ export const takeUpTo = dual< (max: number) => (self: Queue.Dequeue) => Effect.Effect>, (self: Queue.Dequeue, max: number) => Effect.Effect> >(2, (self, max) => self.takeUpTo(max)) /** @internal */ export const takeBetween = dual< (min: number, max: number) => (self: Queue.Dequeue) => Effect.Effect>, (self: Queue.Dequeue, min: number, max: number) => Effect.Effect> >(3, (self, min, max) => self.takeBetween(min, max)) /** @internal */ export const takeN = dual< (n: number) => (self: Queue.Dequeue) => Effect.Effect>, (self: Queue.Dequeue, n: number) => Effect.Effect> >(2, (self, n) => self.takeBetween(n, n)) // ----------------------------------------------------------------------------- // Strategy // ----------------------------------------------------------------------------- /** @internal */ export const backPressureStrategy = (): Queue.Strategy => new BackPressureStrategy() /** @internal */ export const droppingStrategy = (): Queue.Strategy => new DroppingStrategy() /** @internal */ export const slidingStrategy = (): Queue.Strategy => new SlidingStrategy() /** @internal */ class BackPressureStrategy implements Queue.Strategy { readonly [QueueStrategyTypeId] = queueStrategyVariance readonly putters = MutableQueue.unbounded, boolean]>() surplusSize(): number { return MutableQueue.length(this.putters) } onCompleteTakersWithEmptyQueue(takers: MutableQueue.MutableQueue>): void { while (!MutableQueue.isEmpty(this.putters) && !MutableQueue.isEmpty(takers)) { const taker = MutableQueue.poll(takers, void 0)! const putter = MutableQueue.poll(this.putters, void 0)! if (putter[2]) { unsafeCompleteDeferred(putter[1], true) } unsafeCompleteDeferred(taker, putter[0]) } } get shutdown(): Effect.Effect { return pipe( core.fiberId, core.flatMap((fiberId) => pipe( core.sync(() => unsafePollAll(this.putters)), core.flatMap((putters) => fiberRuntime.forEachConcurrentDiscard( putters, ([_, deferred, isLastItem]) => isLastItem ? pipe( core.deferredInterruptWith(deferred, fiberId), core.asVoid ) : core.void, false, false ) ) ) ) ) } handleSurplus( iterable: Iterable, queue: Queue.BackingQueue, takers: MutableQueue.MutableQueue>, isShutdown: MutableRef.MutableRef ): Effect.Effect { return core.withFiberRuntime((state) => { const deferred = core.deferredUnsafeMake(state.id()) return pipe( core.suspend(() => { this.unsafeOffer(iterable, deferred) this.unsafeOnQueueEmptySpace(queue, takers) unsafeCompleteTakers(this, queue, takers) return MutableRef.get(isShutdown) ? core.interrupt : core.deferredAwait(deferred) }), core.onInterrupt(() => core.sync(() => this.unsafeRemove(deferred))) ) }) } unsafeOnQueueEmptySpace( queue: Queue.BackingQueue, takers: MutableQueue.MutableQueue> ): void { let keepPolling = true while (keepPolling && (queue.capacity() === Number.POSITIVE_INFINITY || queue.length() < queue.capacity())) { const putter = pipe(this.putters, MutableQueue.poll(MutableQueue.EmptyMutableQueue)) if (putter === MutableQueue.EmptyMutableQueue) { keepPolling = false } else { const offered = queue.offer(putter[0]) if (offered && putter[2]) { unsafeCompleteDeferred(putter[1], true) } else if (!offered) { unsafeOfferAll(this.putters, pipe(unsafePollAll(this.putters), Chunk.prepend(putter))) } unsafeCompleteTakers(this, queue, takers) } } } unsafeOffer(iterable: Iterable, deferred: Deferred.Deferred): void { const stuff = Arr.fromIterable(iterable) for (let i = 0; i < stuff.length; i++) { const value = stuff[i] if (i === stuff.length - 1) { pipe(this.putters, MutableQueue.offer([value, deferred, true as boolean] as const)) } else { pipe(this.putters, MutableQueue.offer([value, deferred, false as boolean] as const)) } } } unsafeRemove(deferred: Deferred.Deferred): void { unsafeOfferAll( this.putters, pipe(unsafePollAll(this.putters), Chunk.filter(([, _]) => _ !== deferred)) ) } } /** @internal */ class DroppingStrategy implements Queue.Strategy { readonly [QueueStrategyTypeId] = queueStrategyVariance surplusSize(): number { return 0 } get shutdown(): Effect.Effect { return core.void } onCompleteTakersWithEmptyQueue(): void { } handleSurplus( _iterable: Iterable, _queue: Queue.BackingQueue, _takers: MutableQueue.MutableQueue>, _isShutdown: MutableRef.MutableRef ): Effect.Effect { return core.succeed(false) } unsafeOnQueueEmptySpace( _queue: Queue.BackingQueue, _takers: MutableQueue.MutableQueue> ): void { // } } /** @internal */ class SlidingStrategy implements Queue.Strategy { readonly [QueueStrategyTypeId] = queueStrategyVariance surplusSize(): number { return 0 } get shutdown(): Effect.Effect { return core.void } onCompleteTakersWithEmptyQueue(): void { } handleSurplus( iterable: Iterable, queue: Queue.BackingQueue, takers: MutableQueue.MutableQueue>, _isShutdown: MutableRef.MutableRef ): Effect.Effect { return core.sync(() => { this.unsafeOffer(queue, iterable) unsafeCompleteTakers(this, queue, takers) return true }) } unsafeOnQueueEmptySpace( _queue: Queue.BackingQueue, _takers: MutableQueue.MutableQueue> ): void { // } unsafeOffer(queue: Queue.BackingQueue, iterable: Iterable): void { const iterator = iterable[Symbol.iterator]() let next: IteratorResult let offering = true while (!(next = iterator.next()).done && offering) { if (queue.capacity() === 0) { return } // Poll 1 and retry queue.poll(MutableQueue.EmptyMutableQueue) offering = queue.offer(next.value) } } } /** @internal */ const unsafeCompleteDeferred = (deferred: Deferred.Deferred, a: A): void => { return core.deferredUnsafeDone(deferred, core.succeed(a)) } /** @internal */ const unsafeOfferAll = (queue: MutableQueue.MutableQueue, as: Iterable): Chunk.Chunk => { return pipe(queue, MutableQueue.offerAll(as)) } /** @internal */ const unsafePollAll = (queue: MutableQueue.MutableQueue): Chunk.Chunk => { return pipe(queue, MutableQueue.pollUpTo(Number.POSITIVE_INFINITY)) } /** @internal */ const unsafePollN = (queue: MutableQueue.MutableQueue, max: number): Chunk.Chunk => { return pipe(queue, MutableQueue.pollUpTo(max)) } /** @internal */ export const unsafeRemove = (queue: MutableQueue.MutableQueue, a: A): void => { unsafeOfferAll( queue, pipe(unsafePollAll(queue), Chunk.filter((b) => a !== b)) ) } /** @internal */ export const unsafeCompleteTakers = ( strategy: Queue.Strategy, queue: Queue.BackingQueue, takers: MutableQueue.MutableQueue> ): void => { // Check both a taker and an item are in the queue, starting with the taker let keepPolling = true while (keepPolling && queue.length() !== 0) { const taker = pipe(takers, MutableQueue.poll(MutableQueue.EmptyMutableQueue)) if (taker !== MutableQueue.EmptyMutableQueue) { const element = queue.poll(MutableQueue.EmptyMutableQueue) if (element !== MutableQueue.EmptyMutableQueue) { unsafeCompleteDeferred(taker, element) strategy.unsafeOnQueueEmptySpace(queue, takers) } else { unsafeOfferAll(takers, pipe(unsafePollAll(takers), Chunk.prepend(taker))) } keepPolling = true } else { keepPolling = false } } if (keepPolling && queue.length() === 0 && !MutableQueue.isEmpty(takers)) { strategy.onCompleteTakersWithEmptyQueue(takers) } }