import { matchTag } from "@principia/prelude/Utils";
import { constVoid, pipe } from "../../../Function";
import type { Option } from "../../../Option";
import { none, some } from "../../../Option";
import * as T from "../../Task";
import * as XP from "../../XPromise";
import * as XR from "../../XRef";
type State = Empty | Full;
class Empty {
readonly _tag = "Empty";
constructor(readonly notifyConsumer: XP.XPromise) {}
}
class Full {
readonly _tag = "Full";
constructor(readonly a: A, readonly notifyProducer: XP.XPromise) {}
}
/**
* A synchronous queue-like abstraction that allows a producer to offer
* an element and wait for it to be taken, and allows a consumer to wait
* for an element to be available.
*/
class Handoff {
readonly _tag = "Handoff";
constructor(readonly ref: XR.Ref>) {}
}
export function make(): T.IO> {
return pipe(
XP.make(),
T.chain((p) => XR.makeRef>(new Empty(p))),
T.map((ref) => new Handoff(ref))
);
}
export function offer(a: A) {
return (h: Handoff): T.IO =>
pipe(
XP.make(),
T.chain((p) =>
pipe(
h.ref,
XR.modify, State>(
matchTag({
Empty: ({ notifyConsumer }) =>
[
pipe(notifyConsumer, XP.succeed(constVoid()), T.apSecond(XP.await(p))),
new Full(a, p)
] as const,
Full: (s) =>
[
pipe(
XP.await(s.notifyProducer),
T.chain(() => offer(a)(h))
),
s
] as const
})
),
T.flatten
)
)
);
}
export function take(h: Handoff): T.IO {
return pipe(
XP.make(),
T.chain((p) =>
pipe(
h.ref,
XR.modify, State>(
matchTag({
Empty: (s) =>
[
pipe(
s.notifyConsumer,
XP.await,
T.chain(() => take(h))
),
s
] as const,
Full: ({ a, notifyProducer }) =>
[pipe(notifyProducer, XP.succeed(constVoid()), T.as(a)), new Empty(p)] as const
})
),
T.flatten
)
)
);
}
export function poll(h: Handoff): T.IO