import { unsafeCompleteTakers } from "@effect/core/io/Queue/operations/_internal/unsafeCompleteTakers"
/**
* @tsplus type effect/core/io/Queue/Strategy
*/
export interface Strategy {
readonly handleSurplus: (
as: Chunk,
queue: MutableQueue,
takers: MutableQueue>,
isShutdown: AtomicBoolean
) => Effect
readonly unsafeOnQueueEmptySpace: (
queue: MutableQueue,
takers: MutableQueue>
) => void
readonly surplusSize: number
readonly shutdown: Effect
}
/**
* @tsplus type effect/core/io/Queue/Strategy.Ops
*/
export interface StrategyOps {}
export const Strategy: StrategyOps = {}
export class DroppingStrategy implements Strategy {
// Do nothing, drop the surplus
handleSurplus(
_as: Chunk,
_queue: MutableQueue,
_takers: MutableQueue>,
_isShutdown: AtomicBoolean
): Effect {
return Effect.succeed(false)
}
unsafeOnQueueEmptySpace(_queue: MutableQueue): void {
//
}
get surplusSize(): number {
return 0
}
get shutdown(): Effect {
return Effect.unit
}
}
export class SlidingStrategy implements Strategy {
handleSurplus(
as: Chunk,
queue: MutableQueue,
takers: MutableQueue>,
_isShutdown: AtomicBoolean
): Effect {
return Effect.sync(() => {
this.unsafeSlidingOffer(queue, as)
unsafeCompleteTakers(this, queue, takers)
return true
})
}
unsafeOnQueueEmptySpace(_queue: MutableQueue): void {
//
}
get surplusSize(): number {
return 0
}
get shutdown(): Effect {
return Effect.unit
}
private unsafeSlidingOffer(queue: MutableQueue, as: Chunk) {
let bs = as
while (bs.size > 0) {
if (queue.capacity === 0) {
return
}
// Poll 1 and retry
queue.poll(EmptyMutableQueue)
if (queue.offer(bs.unsafeGet(0))) {
bs = bs.drop(1)
}
}
}
}
/**
* @tsplus static effect/core/io/Queue/Strategy.Ops Sliding
*/
export function slidingStrategy() {
return new SlidingStrategy()
}
/**
* @tsplus static effect/core/io/Queue/Strategy.Ops Dropping
*/
export function dropppingStrategy() {
return new DroppingStrategy()
}