// ets_tracing: off
import "../../Operator/index.js"
import * as Tp from "../../Collections/Immutable/Tuple/index.js"
import { constVoid, pipe } from "../../Function/index.js"
import type { Option } from "../../Option/index.js"
import { none, some } from "../../Option/index.js"
import * as P from "../../Promise/index.js"
import { matchTag } from "../../Utils/index.js"
import * as T from "../_internal/effect.js"
import * as R from "../_internal/ref.js"
type State = Empty | Full
class Empty {
readonly _tag = "Empty"
constructor(readonly notifyConsumer: P.Promise) {}
}
class Full {
readonly _tag = "Full"
constructor(readonly a: A, readonly notifyProducer: P.Promise) {}
}
/**
* A synchronous queue-like abstraction that allows a producer to offer
* an element and wait for it to be taken, and allows a consumer to wait
* for an element to be available.
*/
class Handoff {
readonly _tag = "Handoff"
constructor(readonly ref: R.Ref>) {}
}
export function make(): T.UIO> {
return pipe(
P.make(),
T.chain((p) => R.makeRef>(new Empty(p))),
T.map((ref) => new Handoff(ref))
)
}
export function offer_(h: Handoff, a: A): T.UIO {
return pipe(
P.make(),
T.chain((p) =>
pipe(
h.ref,
R.modify, State>(
matchTag({
Empty: ({ notifyConsumer }) =>
Tp.tuple(
pipe(notifyConsumer, P.succeed(constVoid()), T.zipRight(P.await(p))),
new Full(a, p)
),
Full: (s) =>
Tp.tuple(
pipe(
P.await(s.notifyProducer),
T.chain(() => offer_(h, a))
),
s
)
})
),
T.flatten
)
)
)
}
export function offer(a: A) {
return (h: Handoff) => offer_(h, a)
}
export function take(h: Handoff): T.UIO {
return pipe(
P.make(),
T.chain((p) =>
pipe(
h.ref,
R.modify, State>(
matchTag({
Empty: (s) =>
Tp.tuple(
pipe(
s.notifyConsumer,
P.await,
T.chain(() => take(h))
),
s
),
Full: ({ a, notifyProducer }) =>
Tp.tuple(
pipe(notifyProducer, P.succeed(constVoid()), T.as(a)),
new Empty(p)
)
})
),
T.flatten
)
)
)
}
export function poll(h: Handoff): T.UIO