import { unsafeCompleteTakers } from "@effect/core/io/Queue/operations/_internal/unsafeCompleteTakers" /** * @tsplus type effect/core/io/Queue/Strategy */ export interface Strategy { readonly handleSurplus: ( as: Chunk, queue: MutableQueue, takers: MutableQueue>, isShutdown: AtomicBoolean ) => Effect readonly unsafeOnQueueEmptySpace: ( queue: MutableQueue, takers: MutableQueue> ) => void readonly surplusSize: number readonly shutdown: Effect } /** * @tsplus type effect/core/io/Queue/Strategy.Ops */ export interface StrategyOps {} export const Strategy: StrategyOps = {} export class DroppingStrategy implements Strategy { // Do nothing, drop the surplus handleSurplus( _as: Chunk, _queue: MutableQueue, _takers: MutableQueue>, _isShutdown: AtomicBoolean ): Effect { return Effect.succeed(false) } unsafeOnQueueEmptySpace(_queue: MutableQueue): void { // } get surplusSize(): number { return 0 } get shutdown(): Effect { return Effect.unit } } export class SlidingStrategy implements Strategy { handleSurplus( as: Chunk, queue: MutableQueue, takers: MutableQueue>, _isShutdown: AtomicBoolean ): Effect { return Effect.sync(() => { this.unsafeSlidingOffer(queue, as) unsafeCompleteTakers(this, queue, takers) return true }) } unsafeOnQueueEmptySpace(_queue: MutableQueue): void { // } get surplusSize(): number { return 0 } get shutdown(): Effect { return Effect.unit } private unsafeSlidingOffer(queue: MutableQueue, as: Chunk) { let bs = as while (bs.size > 0) { if (queue.capacity === 0) { return } // Poll 1 and retry queue.poll(EmptyMutableQueue) if (queue.offer(bs.unsafeGet(0))) { bs = bs.drop(1) } } } } /** * @tsplus static effect/core/io/Queue/Strategy.Ops Sliding */ export function slidingStrategy() { return new SlidingStrategy() } /** * @tsplus static effect/core/io/Queue/Strategy.Ops Dropping */ export function dropppingStrategy() { return new DroppingStrategy() }