import * as Deferred from "../../Deferred.js" import * as Effect from "../../Effect.js" import { dual, pipe } from "../../Function.js" import * as Option from "../../Option.js" import * as Ref from "../../Ref.js" /** @internal */ export const HandoffTypeId = Symbol.for("effect/Stream/Handoff") /** @internal */ export type HandoffTypeId = typeof HandoffTypeId /** * A synchronous queue-like abstraction that allows a producer to offer an * element and wait for it to be taken, and allows a consumer to wait for an * element to be available. * * @internal */ export interface Handoff extends Handoff.Variance { readonly ref: Ref.Ref> } /** @internal */ export const OP_HANDOFF_STATE_EMPTY = "Empty" as const /** @internal */ export type OP_HANDOFF_STATE_EMPTY = typeof OP_HANDOFF_STATE_EMPTY /** @internal */ export const OP_HANDOFF_STATE_FULL = "Full" as const /** @internal */ export type OP_HANDOFF_STATE_FULL = typeof OP_HANDOFF_STATE_FULL /** @internal */ export declare namespace Handoff { /** @internal */ export interface Variance { readonly [HandoffTypeId]: { readonly _A: (_: A) => A } } /** @internal */ export type State = Empty | Full /** @internal */ export interface Empty { readonly _tag: OP_HANDOFF_STATE_EMPTY readonly notifyConsumer: Deferred.Deferred } /** @internal */ export interface Full { readonly _tag: OP_HANDOFF_STATE_FULL readonly value: A readonly notifyProducer: Deferred.Deferred } } /** @internal */ const handoffStateEmpty = (notifyConsumer: Deferred.Deferred): Handoff.State => ({ _tag: OP_HANDOFF_STATE_EMPTY, notifyConsumer }) /** @internal */ const handoffStateFull = (value: A, notifyProducer: Deferred.Deferred): Handoff.State => ({ _tag: OP_HANDOFF_STATE_FULL, value, notifyProducer }) /** @internal */ const handoffStateMatch = ( onEmpty: (notifyConsumer: Deferred.Deferred) => Z, onFull: (value: A, notifyProducer: Deferred.Deferred) => Z ) => { return (self: Handoff.State): Z => { switch (self._tag) { case OP_HANDOFF_STATE_EMPTY: { return onEmpty(self.notifyConsumer) } case OP_HANDOFF_STATE_FULL: { return onFull(self.value, self.notifyProducer) } } } } const handoffVariance = { /* c8 ignore next */ _A: (_: any) => _ } /** @internal */ export const make = (): Effect.Effect> => pipe( Deferred.make(), Effect.flatMap((deferred) => Ref.make(handoffStateEmpty(deferred))), Effect.map((ref): Handoff => ({ [HandoffTypeId]: handoffVariance, ref })) ) /** @internal */ export const offer = dual< (value: A) => (self: Handoff) => Effect.Effect, (self: Handoff, value: A) => Effect.Effect >(2, (self, value): Effect.Effect => { return Effect.flatMap(Deferred.make(), (deferred) => Effect.flatten( Ref.modify(self.ref, (state) => pipe( state, handoffStateMatch( (notifyConsumer) => [ Effect.zipRight( Deferred.succeed(notifyConsumer, void 0), Deferred.await(deferred) ), handoffStateFull(value, deferred) ], (_, notifyProducer) => [ Effect.flatMap( Deferred.await(notifyProducer), () => pipe(self, offer(value)) ), state ] ) )) )) }) /** @internal */ export const take = (self: Handoff): Effect.Effect => Effect.flatMap(Deferred.make(), (deferred) => Effect.flatten( Ref.modify(self.ref, (state) => pipe( state, handoffStateMatch( (notifyConsumer) => [ Effect.flatMap( Deferred.await(notifyConsumer), () => take(self) ), state ] as const, (value, notifyProducer) => [ Effect.as( Deferred.succeed(notifyProducer, void 0), value ), handoffStateEmpty(deferred) ] ) )) )) /** @internal */ export const poll = (self: Handoff): Effect.Effect> => Effect.flatMap(Deferred.make(), (deferred) => Effect.flatten( Ref.modify(self.ref, (state) => pipe( state, handoffStateMatch( () => [ Effect.succeed(Option.none()), state ] as const, (value, notifyProducer) => [ Effect.as( Deferred.succeed(notifyProducer, void 0), Option.some(value) ), handoffStateEmpty(deferred) ] ) )) ))