import { matchTag } from "@principia/prelude/Utils"; import { constVoid, pipe } from "../../../Function"; import type { Option } from "../../../Option"; import { none, some } from "../../../Option"; import * as T from "../../Task"; import * as XP from "../../XPromise"; import * as XR from "../../XRef"; type State = Empty | Full; class Empty { readonly _tag = "Empty"; constructor(readonly notifyConsumer: XP.XPromise) {} } class Full { readonly _tag = "Full"; constructor(readonly a: A, readonly notifyProducer: XP.XPromise) {} } /** * A synchronous queue-like abstraction that allows a producer to offer * an element and wait for it to be taken, and allows a consumer to wait * for an element to be available. */ class Handoff { readonly _tag = "Handoff"; constructor(readonly ref: XR.Ref>) {} } export function make(): T.IO> { return pipe( XP.make(), T.chain((p) => XR.makeRef>(new Empty(p))), T.map((ref) => new Handoff(ref)) ); } export function offer(a: A) { return (h: Handoff): T.IO => pipe( XP.make(), T.chain((p) => pipe( h.ref, XR.modify, State>( matchTag({ Empty: ({ notifyConsumer }) => [ pipe(notifyConsumer, XP.succeed(constVoid()), T.apSecond(XP.await(p))), new Full(a, p) ] as const, Full: (s) => [ pipe( XP.await(s.notifyProducer), T.chain(() => offer(a)(h)) ), s ] as const }) ), T.flatten ) ) ); } export function take(h: Handoff): T.IO { return pipe( XP.make(), T.chain((p) => pipe( h.ref, XR.modify, State>( matchTag({ Empty: (s) => [ pipe( s.notifyConsumer, XP.await, T.chain(() => take(h)) ), s ] as const, Full: ({ a, notifyProducer }) => [pipe(notifyProducer, XP.succeed(constVoid()), T.as(a)), new Empty(p)] as const }) ), T.flatten ) ) ); } export function poll(h: Handoff): T.IO> { return pipe( XP.make(), T.chain((p) => pipe( h.ref, XR.modify>, State>( matchTag({ Empty: (s) => [T.succeed(none()), s] as const, Full: ({ a, notifyProducer }) => [pipe(notifyProducer, XP.succeed(constVoid()), T.as(some(a))), new Empty(p)] as const }) ), T.flatten ) ) ); }