// ets_tracing: off import "../../Operator/index.js" import * as Tp from "../../Collections/Immutable/Tuple/index.js" import { constVoid, pipe } from "../../Function/index.js" import type { Option } from "../../Option/index.js" import { none, some } from "../../Option/index.js" import * as P from "../../Promise/index.js" import { matchTag } from "../../Utils/index.js" import * as T from "../_internal/effect.js" import * as R from "../_internal/ref.js" type State = Empty | Full class Empty { readonly _tag = "Empty" constructor(readonly notifyConsumer: P.Promise) {} } class Full { readonly _tag = "Full" constructor(readonly a: A, readonly notifyProducer: P.Promise) {} } /** * A synchronous queue-like abstraction that allows a producer to offer * an element and wait for it to be taken, and allows a consumer to wait * for an element to be available. */ class Handoff { readonly _tag = "Handoff" constructor(readonly ref: R.Ref>) {} } export function make(): T.UIO> { return pipe( P.make(), T.chain((p) => R.makeRef>(new Empty(p))), T.map((ref) => new Handoff(ref)) ) } export function offer_(h: Handoff, a: A): T.UIO { return pipe( P.make(), T.chain((p) => pipe( h.ref, R.modify, State>( matchTag({ Empty: ({ notifyConsumer }) => Tp.tuple( pipe(notifyConsumer, P.succeed(constVoid()), T.zipRight(P.await(p))), new Full(a, p) ), Full: (s) => Tp.tuple( pipe( P.await(s.notifyProducer), T.chain(() => offer_(h, a)) ), s ) }) ), T.flatten ) ) ) } export function offer(a: A) { return (h: Handoff) => offer_(h, a) } export function take(h: Handoff): T.UIO { return pipe( P.make(), T.chain((p) => pipe( h.ref, R.modify, State>( matchTag({ Empty: (s) => Tp.tuple( pipe( s.notifyConsumer, P.await, T.chain(() => take(h)) ), s ), Full: ({ a, notifyProducer }) => Tp.tuple( pipe(notifyProducer, P.succeed(constVoid()), T.as(a)), new Empty(p) ) }) ), T.flatten ) ) ) } export function poll(h: Handoff): T.UIO> { return pipe( P.make(), T.chain((p) => pipe( h.ref, R.modify>, State>( matchTag({ Empty: (s) => Tp.tuple(T.succeed(none), s), Full: ({ a, notifyProducer }) => Tp.tuple( pipe(notifyProducer, P.succeed(constVoid()), T.as(some(a))), new Empty(p) ) }) ), T.flatten ) ) ) }