import * as A from "../../Array"; import { pipe } from "../../Function"; import type { MutableQueue } from "../../support"; import { AtomicBoolean, Bounded, Unbounded } from "../../support"; import type { XPromise } from "../XPromise"; import * as XP from "../XPromise"; import * as T from "./_internal/task"; import type { Queue } from "./model"; import { XQueue } from "./model"; export const unsafeOfferAll = (q: MutableQueue, as: readonly A[]): readonly A[] => { const bs = Array.from(as); while (bs.length > 0) { if (!q.offer(bs[0])) { return bs; } else { bs.shift(); } } return bs; }; export const unsafePollAll = (q: MutableQueue): readonly A[] => { const as = [] as A[]; while (!q.isEmpty) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion as.push(q.poll(undefined)!); } return as; }; export const unsafeCompletePromise = (p: XPromise, a: A) => XP.unsafeDone(T.pure(a))(p); export const unsafeRemove = (q: MutableQueue, a: A) => { unsafeOfferAll(q, unsafePollAll(q)).filter((b) => a !== b); }; export const unsafePollN = (q: MutableQueue, max: number): readonly A[] => { let j = 0; const as = [] as A[]; while (j < max) { const p = q.poll(undefined); if (p != null) { as.push(p); } else { return as; } j += 1; } return as; }; export const unsafeCompleteTakers = ( strategy: Strategy, queue: MutableQueue, takers: MutableQueue> ) => { let keepPolling = true; while (keepPolling && !queue.isEmpty) { const taker = takers.poll(undefined); if (taker != null) { const element = queue.poll(undefined); if (element != null) { unsafeCompletePromise(taker, element); strategy.unsafeOnQueueEmptySpace(queue); } else { unsafeOfferAll(takers, [taker, ...unsafePollAll(takers)]); } keepPolling = true; } else { keepPolling = false; } } }; export interface Strategy { readonly handleSurplus: ( as: readonly A[], queue: MutableQueue, takers: MutableQueue>, isShutdown: AtomicBoolean ) => T.IO; readonly unsafeOnQueueEmptySpace: (queue: MutableQueue) => void; readonly surplusSize: number; readonly shutdown: T.IO; } export class BackPressureStrategy implements Strategy { private putters = new Unbounded<[A, XPromise, boolean]>(); handleSurplus( as: readonly A[], queue: MutableQueue, takers: MutableQueue>, isShutdown: AtomicBoolean ): T.IO { return T.checkDescriptor((d) => T.suspend(() => { const p = XP.unsafeMake(d.id); return T.onInterrupt_( T.suspend(() => { this.unsafeOffer(as, p); this.unsafeOnQueueEmptySpace(queue); unsafeCompleteTakers(this, queue, takers); if (isShutdown.get) { return T.interrupt; } else { return XP.await(p); } }), () => T.total(() => this.unsafeRemove(p)) ); }) ); } unsafeRemove(p: XPromise) { unsafeOfferAll( this.putters, unsafePollAll(this.putters).filter(([_, __]) => __ !== p) ); } unsafeOffer(as: readonly A[], p: XPromise) { const bs = Array.from(as); while (bs.length > 0) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const head = bs.shift()!; if (bs.length === 0) { this.putters.offer([head, p, true]); } else { this.putters.offer([head, p, false]); } } } unsafeOnQueueEmptySpace(queue: MutableQueue) { let keepPolling = true; while (keepPolling && !queue.isFull) { const putter = this.putters.poll(undefined); if (putter != null) { const offered = queue.offer(putter[0]); if (offered && putter[2]) { unsafeCompletePromise(putter[1], true); } else if (!offered) { unsafeOfferAll(this.putters, [putter, ...unsafePollAll(this.putters)]); } } else { keepPolling = false; } } } get shutdown(): T.IO { return pipe( T.do, T.bindS("fiberId", () => T.checkFiberId()), T.bindS("putters", () => T.total(() => unsafePollAll(this.putters))), T.tap((s) => T.traverseIPar_(s.putters, ([_, p, lastItem]) => (lastItem ? XP.interruptAs(s.fiberId)(p) : T.unit())) ), T.asUnit ); } get surplusSize(): number { return this.putters.size; } } export class DroppingStrategy implements Strategy { handleSurplus( _as: readonly A[], _queue: MutableQueue, _takers: MutableQueue>, _isShutdown: AtomicBoolean ): T.IO { return T.pure(false); } unsafeOnQueueEmptySpace(_queue: MutableQueue) { // } get shutdown(): T.IO { return T.unit(); } get surplusSize(): number { return 0; } } export class SlidingStrategy implements Strategy { handleSurplus( as: readonly A[], queue: MutableQueue, takers: MutableQueue>, _isShutdown: AtomicBoolean ): T.IO { return T.total(() => { this.unsafeSlidingOffer(queue, as); unsafeCompleteTakers(this, queue, takers); return true; }); } unsafeOnQueueEmptySpace(_queue: MutableQueue) { // } get shutdown(): T.IO { return T.unit(); } get surplusSize(): number { return 0; } private unsafeSlidingOffer(queue: MutableQueue, as: readonly A[]) { const bs = Array.from(as); while (bs.length > 0) { if (queue.capacity === 0) { return; } // poll 1 and retry queue.poll(undefined); if (queue.offer(bs[0])) { bs.shift(); } } } } export const unsafeCreate = ( queue: MutableQueue, takers: MutableQueue>, shutdownHook: XPromise, shutdownFlag: AtomicBoolean, strategy: Strategy ): Queue => new (class extends XQueue { awaitShutdown: T.IO = XP.await(shutdownHook); capacity: number = queue.capacity; isShutdown: T.IO = T.total(() => shutdownFlag.get); offer: (a: A) => T.Task = (a) => T.suspend(() => { if (shutdownFlag.get) { return T.interrupt; } else { const taker = takers.poll(undefined); if (taker != null) { unsafeCompletePromise(taker, a); return T.pure(true); } else { const succeeded = queue.offer(a); if (succeeded) { return T.pure(true); } else { return strategy.handleSurplus([a], queue, takers, shutdownFlag); } } } }); offerAll: (as: Iterable) => T.Task = (as) => { const arr = Array.from(as); return T.suspend(() => { if (shutdownFlag.get) { return T.interrupt; } else { const pTakers = queue.isEmpty ? unsafePollN(takers, arr.length) : []; const [forTakers, remaining] = A.splitAt(pTakers.length)(arr); A.zip(forTakers)(pTakers).forEach(([taker, item]) => { unsafeCompletePromise(taker, item); }); if (remaining.length === 0) { return T.pure(true); } const surplus = unsafeOfferAll(queue, remaining); unsafeCompleteTakers(strategy, queue, takers); if (surplus.length === 0) { return T.pure(true); } else { return strategy.handleSurplus(surplus, queue, takers, shutdownFlag); } } }); }; shutdown: T.IO = T.checkDescriptor((d) => T.suspend(() => { shutdownFlag.set(true); return T.makeUninterruptible( T.whenM(XP.succeed(undefined)(shutdownHook))( T.chain_(T.traverseIPar_(unsafePollAll(takers), XP.interruptAs(d.id)), () => strategy.shutdown) ) ); }) ); size: T.IO = T.suspend(() => { if (shutdownFlag.get) { return T.interrupt; } else { return T.pure(queue.size - takers.size + strategy.surplusSize); } }); take: T.Task = T.checkDescriptor((d) => T.suspend(() => { if (shutdownFlag.get) { return T.interrupt; } const item = queue.poll(undefined); if (item != null) { strategy.unsafeOnQueueEmptySpace(queue); return T.pure(item); } else { const p = XP.unsafeMake(d.id); return T.onInterrupt_( T.suspend(() => { takers.offer(p); unsafeCompleteTakers(strategy, queue, takers); if (shutdownFlag.get) { return T.interrupt; } else { return XP.await(p); } }), () => T.total(() => unsafeRemove(takers, p)) ); } }) ); takeAll: T.Task = T.suspend(() => { if (shutdownFlag.get) { return T.interrupt; } else { return T.total(() => { const as = unsafePollAll(queue); strategy.unsafeOnQueueEmptySpace(queue); return as; }); } }); takeUpTo: (n: number) => T.Task = (max) => T.suspend(() => { if (shutdownFlag.get) { return T.interrupt; } else { return T.total(() => { const as = unsafePollN(queue, max); strategy.unsafeOnQueueEmptySpace(queue); return as; }); } }); })(); export const createQueue = (strategy: Strategy) => (queue: MutableQueue) => T.map_(XP.make(), (p) => unsafeCreate(queue, new Unbounded(), p, new AtomicBoolean(false), strategy)); export const makeSliding = (capacity: number): T.IO> => T.chain_( T.total(() => new Bounded(capacity)), createQueue(new SlidingStrategy()) ); export const makeUnbounded = (): T.IO> => T.chain_( T.total(() => new Unbounded()), createQueue(new DroppingStrategy()) ); export const makeDropping = (capacity: number): T.IO> => T.chain_( T.total(() => new Bounded(capacity)), createQueue(new DroppingStrategy()) ); export const makeBounded = (capacity: number): T.IO> => T.chain_( T.total(() => new Bounded(capacity)), createQueue(new BackPressureStrategy()) );