import * as RA from "../../Array.js" import * as Chunk from "../../Chunk.js" import { dual, pipe } from "../../Function.js" import * as Option from "../../Option.js" import { hasProperty, type Predicate } from "../../Predicate.js" import type * as STM from "../../STM.js" import type * as TQueue from "../../TQueue.js" import type * as TRef from "../../TRef.js" import * as core from "./core.js" import * as OpCodes from "./opCodes/strategy.js" import * as stm from "./stm.js" import * as tRef from "./tRef.js" const TEnqueueSymbolKey = "effect/TQueue/TEnqueue" /** @internal */ export const TEnqueueTypeId: TQueue.TEnqueueTypeId = Symbol.for(TEnqueueSymbolKey) as TQueue.TEnqueueTypeId const TDequeueSymbolKey = "effect/TQueue/TDequeue" /** @internal */ export const TDequeueTypeId: TQueue.TDequeueTypeId = Symbol.for(TDequeueSymbolKey) as TQueue.TDequeueTypeId /** * A `Strategy` describes how the queue will handle values if the queue is at * capacity. * * @internal */ export type TQueueStrategy = BackPressure | Dropping | Sliding /** * A strategy that retries if the queue is at capacity. * * @internal */ export interface BackPressure { readonly _tag: OpCodes.OP_BACKPRESSURE_STRATEGY } /** * A strategy that drops new values if the queue is at capacity. * * @internal */ export interface Dropping { readonly _tag: OpCodes.OP_DROPPING_STRATEGY } /** * A strategy that drops old values if the queue is at capacity. * * @internal */ export interface Sliding { readonly _tag: OpCodes.OP_SLIDING_STRATEGY } /** @internal */ export const BackPressure: TQueueStrategy = { _tag: OpCodes.OP_BACKPRESSURE_STRATEGY } /** @internal */ export const Dropping: TQueueStrategy = { _tag: OpCodes.OP_DROPPING_STRATEGY } /** @internal */ export const Sliding: TQueueStrategy = { _tag: OpCodes.OP_SLIDING_STRATEGY } /** @internal */ export const tDequeueVariance = { /* c8 ignore next */ _Out: (_: never) => _ } /** @internal */ export const tEnqueueVariance = { /* c8 ignore next */ _In: (_: unknown) => _ } class TQueueImpl implements TQueue.TQueue { readonly [TDequeueTypeId] = tDequeueVariance readonly [TEnqueueTypeId] = tEnqueueVariance constructor( readonly ref: TRef.TRef | undefined>, readonly requestedCapacity: number, readonly strategy: TQueueStrategy ) {} capacity(): number { return this.requestedCapacity } size: STM.STM = core.withSTMRuntime((runtime) => { const queue = tRef.unsafeGet(this.ref, runtime.journal) if (queue === undefined) { return core.interruptAs(runtime.fiberId) } return core.succeed(queue.length) }) isFull: STM.STM = core.map(this.size, (size) => size === this.requestedCapacity) isEmpty: STM.STM = core.map(this.size, (size) => size === 0) shutdown: STM.STM = core.withSTMRuntime((runtime) => { tRef.unsafeSet(this.ref, void 0, runtime.journal) return stm.void }) isShutdown: STM.STM = core.effect((journal) => { const queue = tRef.unsafeGet(this.ref, journal) return queue === undefined }) awaitShutdown: STM.STM = core.flatMap( this.isShutdown, (isShutdown) => isShutdown ? stm.void : core.retry ) offer(value: A): STM.STM { return core.withSTMRuntime((runtime) => { const queue = pipe(this.ref, tRef.unsafeGet(runtime.journal)) if (queue === undefined) { return core.interruptAs(runtime.fiberId) } if (queue.length < this.requestedCapacity) { queue.push(value) tRef.unsafeSet(this.ref, queue, runtime.journal) return core.succeed(true) } switch (this.strategy._tag) { case OpCodes.OP_BACKPRESSURE_STRATEGY: { return core.retry } case OpCodes.OP_DROPPING_STRATEGY: { return core.succeed(false) } case OpCodes.OP_SLIDING_STRATEGY: { if (queue.length === 0) { return core.succeed(true) } queue.shift() queue.push(value) tRef.unsafeSet(this.ref, queue, runtime.journal) return core.succeed(true) } } }) } offerAll(iterable: Iterable): STM.STM { return core.withSTMRuntime((runtime) => { const as = Array.from(iterable) const queue = tRef.unsafeGet(this.ref, runtime.journal) if (queue === undefined) { return core.interruptAs(runtime.fiberId) } if (queue.length + as.length <= this.requestedCapacity) { tRef.unsafeSet(this.ref, [...queue, ...as], runtime.journal) return core.succeed(true) } switch (this.strategy._tag) { case OpCodes.OP_BACKPRESSURE_STRATEGY: { return core.retry } case OpCodes.OP_DROPPING_STRATEGY: { const forQueue = as.slice(0, this.requestedCapacity - queue.length) tRef.unsafeSet(this.ref, [...queue, ...forQueue], runtime.journal) return core.succeed(false) } case OpCodes.OP_SLIDING_STRATEGY: { const forQueue = as.slice(0, this.requestedCapacity - queue.length) const toDrop = queue.length + forQueue.length - this.requestedCapacity const newQueue = queue.slice(toDrop) tRef.unsafeSet(this.ref, [...newQueue, ...forQueue], runtime.journal) return core.succeed(true) } } }) } peek: STM.STM = core.withSTMRuntime((runtime) => { const queue = tRef.unsafeGet(this.ref, runtime.journal) if (queue === undefined) { return core.interruptAs(runtime.fiberId) } if (queue.length === 0) { return core.retry } return core.succeed(queue[0]) }) peekOption: STM.STM> = core.withSTMRuntime((runtime) => { const queue = tRef.unsafeGet(this.ref, runtime.journal) if (queue === undefined) { return core.interruptAs(runtime.fiberId) } return core.succeed(Option.fromNullable(queue[0])) }) take: STM.STM = core.withSTMRuntime((runtime) => { const queue = tRef.unsafeGet(this.ref, runtime.journal) if (queue === undefined) { return core.interruptAs(runtime.fiberId) } if (queue.length === 0) { return core.retry } const dequeued = queue.shift()! tRef.unsafeSet(this.ref, queue, runtime.journal) return core.succeed(dequeued) }) takeAll: STM.STM> = core.withSTMRuntime((runtime) => { const queue = tRef.unsafeGet(this.ref, runtime.journal) if (queue === undefined) { return core.interruptAs(runtime.fiberId) } tRef.unsafeSet(this.ref, [], runtime.journal) return core.succeed(queue) }) takeUpTo(max: number): STM.STM> { return core.withSTMRuntime((runtime) => { const queue = tRef.unsafeGet(this.ref, runtime.journal) if (queue === undefined) { return core.interruptAs(runtime.fiberId) } const [toTake, remaining] = Chunk.splitAt(Chunk.unsafeFromArray(queue), max) tRef.unsafeSet | undefined>(this.ref, Array.from(remaining), runtime.journal) return core.succeed(Array.from(toTake)) }) } } /** @internal */ export const isTQueue = (u: unknown): u is TQueue.TQueue => { return isTEnqueue(u) && isTDequeue(u) } /** @internal */ export const isTEnqueue = (u: unknown): u is TQueue.TEnqueue => hasProperty(u, TEnqueueTypeId) /** @internal */ export const isTDequeue = (u: unknown): u is TQueue.TDequeue => hasProperty(u, TDequeueTypeId) /** @internal */ export const awaitShutdown = (self: TQueue.TDequeue | TQueue.TEnqueue): STM.STM => self.awaitShutdown /** @internal */ export const bounded = (requestedCapacity: number): STM.STM> => makeQueue(requestedCapacity, BackPressure) /** @internal */ export const capacity = (self: TQueue.TDequeue | TQueue.TEnqueue): number => { return self.capacity() } /** @internal */ export const dropping = (requestedCapacity: number): STM.STM> => makeQueue(requestedCapacity, Dropping) /** @internal */ export const isEmpty = (self: TQueue.TDequeue | TQueue.TEnqueue): STM.STM => self.isEmpty /** @internal */ export const isFull = (self: TQueue.TDequeue | TQueue.TEnqueue): STM.STM => self.isFull /** @internal */ export const isShutdown = (self: TQueue.TDequeue | TQueue.TEnqueue): STM.STM => self.isShutdown /** @internal */ export const offer = dual< (value: A) => (self: TQueue.TEnqueue) => STM.STM, (self: TQueue.TEnqueue, value: A) => STM.STM >(2, (self, value) => self.offer(value)) /** @internal */ export const offerAll = dual< (iterable: Iterable) => (self: TQueue.TEnqueue) => STM.STM, (self: TQueue.TEnqueue, iterable: Iterable) => STM.STM >(2, (self, iterable) => self.offerAll(iterable)) /** @internal */ export const peek = (self: TQueue.TDequeue): STM.STM => self.peek /** @internal */ export const peekOption = (self: TQueue.TDequeue): STM.STM> => self.peekOption /** @internal */ export const poll = (self: TQueue.TDequeue): STM.STM> => pipe(self.takeUpTo(1), core.map(RA.head)) /** @internal */ export const seek = dual< (predicate: Predicate) => (self: TQueue.TDequeue) => STM.STM, (self: TQueue.TDequeue, predicate: Predicate) => STM.STM >(2, (self, predicate) => seekLoop(self, predicate)) const seekLoop = (self: TQueue.TDequeue, predicate: Predicate): STM.STM => core.flatMap( self.take, (a) => predicate(a) ? core.succeed(a) : seekLoop(self, predicate) ) /** @internal */ export const shutdown = (self: TQueue.TDequeue | TQueue.TEnqueue): STM.STM => self.shutdown /** @internal */ export const size = (self: TQueue.TDequeue | TQueue.TEnqueue): STM.STM => self.size /** @internal */ export const sliding = (requestedCapacity: number): STM.STM> => makeQueue(requestedCapacity, Sliding) /** @internal */ export const take = (self: TQueue.TDequeue): STM.STM => self.take /** @internal */ export const takeAll = (self: TQueue.TDequeue): STM.STM> => self.takeAll /** @internal */ export const takeBetween = dual< (min: number, max: number) => (self: TQueue.TDequeue) => STM.STM>, (self: TQueue.TDequeue, min: number, max: number) => STM.STM> >( 3, (self: TQueue.TDequeue, min: number, max: number): STM.STM> => stm.suspend(() => { const takeRemainder = ( min: number, max: number, acc: Chunk.Chunk ): STM.STM> => { if (max < min) { return core.succeed(acc) } return pipe( self.takeUpTo(max), core.flatMap((taken) => { const remaining = min - taken.length if (remaining === 1) { return pipe( self.take, core.map((a) => pipe(acc, Chunk.appendAll(Chunk.unsafeFromArray(taken)), Chunk.append(a))) ) } if (remaining > 1) { return pipe( self.take, core.flatMap((a) => takeRemainder( remaining - 1, max - taken.length - 1, pipe(acc, Chunk.appendAll(Chunk.unsafeFromArray(taken)), Chunk.append(a)) ) ) ) } return core.succeed(pipe(acc, Chunk.appendAll(Chunk.unsafeFromArray(taken)))) }) ) } return core.map(takeRemainder(min, max, Chunk.empty()), (c) => Array.from(c)) }) ) /** @internal */ export const takeN = dual< (n: number) => (self: TQueue.TDequeue) => STM.STM>, (self: TQueue.TDequeue, n: number) => STM.STM> >(2, (self, n) => pipe(self, takeBetween(n, n))) /** @internal */ export const takeUpTo = dual< (max: number) => (self: TQueue.TDequeue) => STM.STM>, (self: TQueue.TDequeue, max: number) => STM.STM> >(2, (self, max) => self.takeUpTo(max)) /** @internal */ export const unbounded = (): STM.STM> => makeQueue(Number.MAX_SAFE_INTEGER, Dropping) const makeQueue = (requestedCapacity: number, strategy: TQueueStrategy): STM.STM> => core.map( tRef.make | undefined>([]), (ref) => new TQueueImpl(ref, requestedCapacity, strategy) )