import * as A from "../../Array";
import { pipe } from "../../Function";
import type { MutableQueue } from "../../support";
import { AtomicBoolean, Bounded, Unbounded } from "../../support";
import type { XPromise } from "../XPromise";
import * as XP from "../XPromise";
import * as T from "./_internal/task";
import type { Queue } from "./model";
import { XQueue } from "./model";
export const unsafeOfferAll = (q: MutableQueue, as: readonly A[]): readonly A[] => {
const bs = Array.from(as);
while (bs.length > 0) {
if (!q.offer(bs[0])) {
return bs;
} else {
bs.shift();
}
}
return bs;
};
export const unsafePollAll = (q: MutableQueue): readonly A[] => {
const as = [] as A[];
while (!q.isEmpty) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
as.push(q.poll(undefined)!);
}
return as;
};
export const unsafeCompletePromise = (p: XPromise, a: A) => XP.unsafeDone(T.pure(a))(p);
export const unsafeRemove = (q: MutableQueue, a: A) => {
unsafeOfferAll(q, unsafePollAll(q)).filter((b) => a !== b);
};
export const unsafePollN = (q: MutableQueue, max: number): readonly A[] => {
let j = 0;
const as = [] as A[];
while (j < max) {
const p = q.poll(undefined);
if (p != null) {
as.push(p);
} else {
return as;
}
j += 1;
}
return as;
};
export const unsafeCompleteTakers = (
strategy: Strategy,
queue: MutableQueue,
takers: MutableQueue>
) => {
let keepPolling = true;
while (keepPolling && !queue.isEmpty) {
const taker = takers.poll(undefined);
if (taker != null) {
const element = queue.poll(undefined);
if (element != null) {
unsafeCompletePromise(taker, element);
strategy.unsafeOnQueueEmptySpace(queue);
} else {
unsafeOfferAll(takers, [taker, ...unsafePollAll(takers)]);
}
keepPolling = true;
} else {
keepPolling = false;
}
}
};
export interface Strategy {
readonly handleSurplus: (
as: readonly A[],
queue: MutableQueue,
takers: MutableQueue>,
isShutdown: AtomicBoolean
) => T.IO;
readonly unsafeOnQueueEmptySpace: (queue: MutableQueue) => void;
readonly surplusSize: number;
readonly shutdown: T.IO;
}
export class BackPressureStrategy implements Strategy {
private putters = new Unbounded<[A, XPromise, boolean]>();
handleSurplus(
as: readonly A[],
queue: MutableQueue,
takers: MutableQueue>,
isShutdown: AtomicBoolean
): T.IO {
return T.checkDescriptor((d) =>
T.suspend(() => {
const p = XP.unsafeMake(d.id);
return T.onInterrupt_(
T.suspend(() => {
this.unsafeOffer(as, p);
this.unsafeOnQueueEmptySpace(queue);
unsafeCompleteTakers(this, queue, takers);
if (isShutdown.get) {
return T.interrupt;
} else {
return XP.await(p);
}
}),
() => T.total(() => this.unsafeRemove(p))
);
})
);
}
unsafeRemove(p: XPromise) {
unsafeOfferAll(
this.putters,
unsafePollAll(this.putters).filter(([_, __]) => __ !== p)
);
}
unsafeOffer(as: readonly A[], p: XPromise) {
const bs = Array.from(as);
while (bs.length > 0) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const head = bs.shift()!;
if (bs.length === 0) {
this.putters.offer([head, p, true]);
} else {
this.putters.offer([head, p, false]);
}
}
}
unsafeOnQueueEmptySpace(queue: MutableQueue) {
let keepPolling = true;
while (keepPolling && !queue.isFull) {
const putter = this.putters.poll(undefined);
if (putter != null) {
const offered = queue.offer(putter[0]);
if (offered && putter[2]) {
unsafeCompletePromise(putter[1], true);
} else if (!offered) {
unsafeOfferAll(this.putters, [putter, ...unsafePollAll(this.putters)]);
}
} else {
keepPolling = false;
}
}
}
get shutdown(): T.IO {
return pipe(
T.do,
T.bindS("fiberId", () => T.checkFiberId()),
T.bindS("putters", () => T.total(() => unsafePollAll(this.putters))),
T.tap((s) =>
T.traverseIPar_(s.putters, ([_, p, lastItem]) => (lastItem ? XP.interruptAs(s.fiberId)(p) : T.unit()))
),
T.asUnit
);
}
get surplusSize(): number {
return this.putters.size;
}
}
export class DroppingStrategy implements Strategy {
handleSurplus(
_as: readonly A[],
_queue: MutableQueue,
_takers: MutableQueue>,
_isShutdown: AtomicBoolean
): T.IO {
return T.pure(false);
}
unsafeOnQueueEmptySpace(_queue: MutableQueue) {
//
}
get shutdown(): T.IO {
return T.unit();
}
get surplusSize(): number {
return 0;
}
}
export class SlidingStrategy implements Strategy {
handleSurplus(
as: readonly A[],
queue: MutableQueue,
takers: MutableQueue>,
_isShutdown: AtomicBoolean
): T.IO {
return T.total(() => {
this.unsafeSlidingOffer(queue, as);
unsafeCompleteTakers(this, queue, takers);
return true;
});
}
unsafeOnQueueEmptySpace(_queue: MutableQueue) {
//
}
get shutdown(): T.IO {
return T.unit();
}
get surplusSize(): number {
return 0;
}
private unsafeSlidingOffer(queue: MutableQueue, as: readonly A[]) {
const bs = Array.from(as);
while (bs.length > 0) {
if (queue.capacity === 0) {
return;
}
// poll 1 and retry
queue.poll(undefined);
if (queue.offer(bs[0])) {
bs.shift();
}
}
}
}
export const unsafeCreate = (
queue: MutableQueue,
takers: MutableQueue>,
shutdownHook: XPromise,
shutdownFlag: AtomicBoolean,
strategy: Strategy
): Queue =>
new (class extends XQueue {
awaitShutdown: T.IO = XP.await(shutdownHook);
capacity: number = queue.capacity;
isShutdown: T.IO = T.total(() => shutdownFlag.get);
offer: (a: A) => T.Task = (a) =>
T.suspend(() => {
if (shutdownFlag.get) {
return T.interrupt;
} else {
const taker = takers.poll(undefined);
if (taker != null) {
unsafeCompletePromise(taker, a);
return T.pure(true);
} else {
const succeeded = queue.offer(a);
if (succeeded) {
return T.pure(true);
} else {
return strategy.handleSurplus([a], queue, takers, shutdownFlag);
}
}
}
});
offerAll: (as: Iterable) => T.Task = (as) => {
const arr = Array.from(as);
return T.suspend(() => {
if (shutdownFlag.get) {
return T.interrupt;
} else {
const pTakers = queue.isEmpty ? unsafePollN(takers, arr.length) : [];
const [forTakers, remaining] = A.splitAt(pTakers.length)(arr);
A.zip(forTakers)(pTakers).forEach(([taker, item]) => {
unsafeCompletePromise(taker, item);
});
if (remaining.length === 0) {
return T.pure(true);
}
const surplus = unsafeOfferAll(queue, remaining);
unsafeCompleteTakers(strategy, queue, takers);
if (surplus.length === 0) {
return T.pure(true);
} else {
return strategy.handleSurplus(surplus, queue, takers, shutdownFlag);
}
}
});
};
shutdown: T.IO = T.checkDescriptor((d) =>
T.suspend(() => {
shutdownFlag.set(true);
return T.makeUninterruptible(
T.whenM(XP.succeed(undefined)(shutdownHook))(
T.chain_(T.traverseIPar_(unsafePollAll(takers), XP.interruptAs(d.id)), () => strategy.shutdown)
)
);
})
);
size: T.IO = T.suspend(() => {
if (shutdownFlag.get) {
return T.interrupt;
} else {
return T.pure(queue.size - takers.size + strategy.surplusSize);
}
});
take: T.Task = T.checkDescriptor((d) =>
T.suspend(() => {
if (shutdownFlag.get) {
return T.interrupt;
}
const item = queue.poll(undefined);
if (item != null) {
strategy.unsafeOnQueueEmptySpace(queue);
return T.pure(item);
} else {
const p = XP.unsafeMake(d.id);
return T.onInterrupt_(
T.suspend(() => {
takers.offer(p);
unsafeCompleteTakers(strategy, queue, takers);
if (shutdownFlag.get) {
return T.interrupt;
} else {
return XP.await(p);
}
}),
() => T.total(() => unsafeRemove(takers, p))
);
}
})
);
takeAll: T.Task = T.suspend(() => {
if (shutdownFlag.get) {
return T.interrupt;
} else {
return T.total(() => {
const as = unsafePollAll(queue);
strategy.unsafeOnQueueEmptySpace(queue);
return as;
});
}
});
takeUpTo: (n: number) => T.Task = (max) =>
T.suspend(() => {
if (shutdownFlag.get) {
return T.interrupt;
} else {
return T.total(() => {
const as = unsafePollN(queue, max);
strategy.unsafeOnQueueEmptySpace(queue);
return as;
});
}
});
})();
export const createQueue = (strategy: Strategy) => (queue: MutableQueue) =>
T.map_(XP.make(), (p) => unsafeCreate(queue, new Unbounded(), p, new AtomicBoolean(false), strategy));
export const makeSliding = (capacity: number): T.IO> =>
T.chain_(
T.total(() => new Bounded(capacity)),
createQueue(new SlidingStrategy())
);
export const makeUnbounded = (): T.IO> =>
T.chain_(
T.total(() => new Unbounded()),
createQueue(new DroppingStrategy())
);
export const makeDropping = (capacity: number): T.IO> =>
T.chain_(
T.total(() => new Bounded(capacity)),
createQueue(new DroppingStrategy())
);
export const makeBounded = (capacity: number): T.IO> =>
T.chain_(
T.total(() => new Bounded(capacity)),
createQueue(new BackPressureStrategy())
);