/** * Producer-side view of `SingleProducerAsyncInput` for variance purposes. */ export interface AsyncInputProducer { readonly emit: (el: Elem) => Effect readonly done: (a: Done) => Effect readonly error: (cause: Cause) => Effect readonly awaitRead: Effect } /** * Consumer-side view of `SingleProducerAsyncInput` for variance purposes. */ export interface AsyncInputConsumer { readonly takeWith: ( onError: (cause: Cause) => A, onElement: (element: Elem) => A, onDone: (done: Done) => A ) => Effect } export type State = | StateEmpty | StateEmit | StateError | StateDone export const DoneTypeId = Symbol.for("@effect/core/stream/Channel/Producer/Done") export type DoneTypeId = typeof DoneTypeId export class StateDone { readonly _typeId: DoneTypeId = DoneTypeId constructor(readonly a: Elem) {} } export const ErrorTypeId = Symbol.for("@effect/core/stream/Channel/Producer/Error") export type ErrorTypeId = typeof ErrorTypeId export class StateError { readonly _typeId: ErrorTypeId = ErrorTypeId constructor(readonly cause: Cause) {} } export const EmptyTypeId = Symbol.for("@effect/core/stream/Channel/Producer/Empty") export type EmptyTypeId = typeof EmptyTypeId export class StateEmpty { readonly _typeId: EmptyTypeId = EmptyTypeId constructor(readonly notifyProducer: Deferred) {} } export const EmitTypeId = Symbol.for("@effect/core/stream/Channel/Producer/Emit") export type EmitTypeId = typeof EmitTypeId export class StateEmit { readonly _typeId: EmitTypeId = EmitTypeId constructor( readonly notifyConsumers: ImmutableQueue>> ) {} } /** * An MVar-like abstraction for sending data to channels asynchronously. * Designed for one producer and multiple consumers. * * Features the following semantics: * - Buffer of size 1. * - When emitting, the producer waits for a consumer to pick up the value to * prevent "reading ahead" too much. * - Once an emitted element is read by a consumer, it is cleared from the * buffer, so that at most one consumer sees every emitted element. * - When sending a done or error signal, the producer does not wait for a * consumer to pick up the signal. The signal stays in the buffer after * being read by a consumer, so it can be propagated to multiple consumers. * - Trying to publish another emit/error/done after an error/done have * already been published results in an interruption. * * @tsplus type effect/core/stream/Channel/SingleProducerAsyncInput * @tsplus companion effect/core/stream/Channel/SingleProducerAsyncInput.Ops */ export class SingleProducerAsyncInput implements AsyncInputProducer, AsyncInputConsumer { constructor(readonly ref: Ref>) {} get take(): Effect, Elem>> { return this.takeWith, Elem>>( (cause) => Exit.failCause(cause.map(Either.left)), (element) => Exit.succeed(element), (done) => Exit.fail(Either.right(done)) ) } get close(): Effect { return Effect.fiberId.flatMap((fiberId) => this.error(Cause.interrupt(fiberId))) } get awaitRead(): Effect { return this.ref .modify((state) => state._typeId === EmptyTypeId ? [state.notifyProducer.await, state] as const : [Effect.unit, state] as const ) .flatten } emit(el: Elem): Effect { return Deferred.make().flatMap((deferred) => this.ref .modify((state) => { switch (state._typeId) { case EmitTypeId: { const dequeued = state.notifyConsumers.dequeue if (dequeued._tag === "Some") { const [notifyConsumer, notifyConsumers] = dequeued.value return [ notifyConsumer.succeed(Either.right(el)), notifyConsumers.size === 0 ? new StateEmpty(deferred) : new StateEmit(notifyConsumers) ] as const } throw new Error("SingleProducerAsyncInput#emit: queue was empty") } case ErrorTypeId: { return [Effect.interrupt, state] as const } case DoneTypeId: { return [Effect.interrupt, state] as const } case EmptyTypeId: { return [state.notifyProducer.await, state] as const } } }) .flatten ) } done(a: Done): Effect { return this.ref .modify((state) => { switch (state._typeId) { case EmitTypeId: { return [ Effect.forEachDiscard( state.notifyConsumers, (promise) => promise.succeed(Either.left(a)) ), new StateDone(a) ] as const } case ErrorTypeId: { return [Effect.interrupt, state] as const } case DoneTypeId: { return [Effect.interrupt, state] as const } case EmptyTypeId: { return [state.notifyProducer.await, state] as const } } }) .flatten } error(cause: Cause): Effect { return this.ref .modify((state) => { switch (state._typeId) { case EmitTypeId: { return [ Effect.forEachDiscard(state.notifyConsumers, (promise) => promise.failCause(cause)), new StateError(cause) ] as const } case ErrorTypeId: { return [Effect.interrupt, state] as const } case DoneTypeId: { return [Effect.interrupt, state] as const } case EmptyTypeId: { return [state.notifyProducer.await, state] as const } } }) .flatten } takeWith( onError: (cause: Cause) => X, onElement: (element: Elem) => X, onDone: (done: Done) => X ): Effect { return Deferred.make>().flatMap((deferred) => this.ref .modify((state) => { switch (state._typeId) { case EmitTypeId: { return [ deferred .await .foldCause(onError, (either) => either.fold(onDone, onElement)), new StateEmit(state.notifyConsumers.append(deferred)) ] as const } case ErrorTypeId: { return [Effect.sync(onError(state.cause)), state] as const } case DoneTypeId: { return [Effect.sync(onDone(state.a)), state] as const } case EmptyTypeId: { return [ state.notifyProducer.succeed(undefined) > deferred .await .foldCause(onError, (either) => either.fold(onDone, onElement)), new StateEmit(ImmutableQueue.single(deferred)) ] as const } } }) .flatten ) } } /** * Creates a `SingleProducerAsyncInput`. * * @tsplus static effect/core/stream/Channel/SingleProducerAsyncInput.Ops make */ export function make(): Effect< never, never, SingleProducerAsyncInput > { return Deferred.make() .flatMap((deferred) => Ref.make>(new StateEmpty(deferred))) .map((ref) => new SingleProducerAsyncInput(ref)) }