import * as Cause from "../Cause.js" import type * as Channel from "../Channel.js" import type * as ChildExecutorDecision from "../ChildExecutorDecision.js" import * as Chunk from "../Chunk.js" import type * as Context from "../Context.js" import * as Effect from "../Effect.js" import * as Either from "../Either.js" import type * as Exit from "../Exit.js" import { constVoid, dual, identity } from "../Function.js" import type { LazyArg } from "../Function.js" import * as Option from "../Option.js" import { pipeArguments } from "../Pipeable.js" import { hasProperty } from "../Predicate.js" import type * as SingleProducerAsyncInput from "../SingleProducerAsyncInput.js" import type * as UpstreamPullRequest from "../UpstreamPullRequest.js" import type * as UpstreamPullStrategy from "../UpstreamPullStrategy.js" import * as childExecutorDecision from "./channel/childExecutorDecision.js" import type { ErasedContinuationK } from "./channel/continuation.js" import { ContinuationKImpl } from "./channel/continuation.js" import * as upstreamPullStrategy from "./channel/upstreamPullStrategy.js" import * as OpCodes from "./opCodes/channel.js" /** @internal */ const ChannelSymbolKey = "effect/Channel" /** @internal */ export const ChannelTypeId: Channel.ChannelTypeId = Symbol.for( ChannelSymbolKey ) as Channel.ChannelTypeId const channelVariance = { /* c8 ignore next */ _Env: (_: never) => _, /* c8 ignore next */ _InErr: (_: unknown) => _, /* c8 ignore next */ _InElem: (_: unknown) => _, /* c8 ignore next */ _InDone: (_: unknown) => _, /* c8 ignore next */ _OutErr: (_: never) => _, /* c8 ignore next */ _OutElem: (_: never) => _, /* c8 ignore next */ _OutDone: (_: never) => _ } /** @internal */ const proto = { [ChannelTypeId]: channelVariance, pipe() { return pipeArguments(this, arguments) } } /** @internal */ type ErasedChannel = Channel.Channel /** @internal */ export type Op = & ErasedChannel & Body & { readonly _tag: Tag } export type Primitive = | BracketOut | Bridge | ConcatAll | Emit | Ensuring | Fail | Fold | FromEffect | PipeTo | Provide | Read | Succeed | SucceedNow | Suspend /** @internal */ export interface BracketOut extends Op finalizer(resource: unknown, exit: Exit.Exit): Effect.Effect }> {} /** @internal */ export interface Bridge extends Op readonly channel: ErasedChannel }> {} /** @internal */ export interface ConcatAll extends Op ): UpstreamPullStrategy.UpstreamPullStrategy onEmit(outElem: unknown): ChildExecutorDecision.ChildExecutorDecision value(): ErasedChannel k(outElem: unknown): ErasedChannel }> {} /** @internal */ export interface Emit extends Op {} /** @internal */ export interface Ensuring extends Op): Effect.Effect }> {} /** @internal */ export interface Fail extends Op }> {} /** @internal */ export interface Fold extends Op {} /** @internal */ export interface FromEffect extends Op }> {} /** @internal */ export interface PipeTo extends Op {} /** @internal */ export interface Provide extends Op readonly inner: ErasedChannel }> {} /** @internal */ export interface Read extends Op {} /** @internal */ export interface Succeed extends Op {} /** @internal */ export interface SucceedNow extends Op {} /** @internal */ export interface Suspend extends Op {} /** @internal */ export const isChannel = (u: unknown): u is Channel.Channel< unknown, unknown, unknown, unknown, unknown, unknown, unknown > => hasProperty(u, ChannelTypeId) || Effect.isEffect(u) /** @internal */ export const acquireReleaseOut = dual< ( release: (z: Z, e: Exit.Exit) => Effect.Effect ) => (self: Effect.Effect) => Channel.Channel, ( self: Effect.Effect, release: (z: Z, e: Exit.Exit) => Effect.Effect ) => Channel.Channel >(2, (self, release) => { const op = Object.create(proto) op._tag = OpCodes.OP_BRACKET_OUT op.acquire = () => self op.finalizer = release return op }) /** @internal */ export const catchAllCause = dual< ( f: (cause: Cause.Cause) => Channel.Channel ) => ( self: Channel.Channel ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr1, InErr & InErr1, OutDone1 | OutDone, InDone & InDone1, Env1 | Env >, ( self: Channel.Channel, f: (cause: Cause.Cause) => Channel.Channel ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr1, InErr & InErr1, OutDone1 | OutDone, InDone & InDone1, Env1 | Env > >( 2, ( self: Channel.Channel, f: (cause: Cause.Cause) => Channel.Channel ): Channel.Channel< OutElem | OutElem1, InElem & InElem1, OutErr1, InErr & InErr1, OutDone | OutDone1, InDone & InDone1, Env | Env1 > => { const op = Object.create(proto) op._tag = OpCodes.OP_FOLD op.channel = self op.k = new ContinuationKImpl(succeed, f) return op } ) /** @internal */ export const collectElements = ( self: Channel.Channel ): Channel.Channel, OutDone], InDone, Env> => { return suspend(() => { const builder: Array = [] return flatMap( pipeTo(self, collectElementsReader(builder)), (value) => sync(() => [Chunk.fromIterable(builder), value]) ) }) } /** @internal */ const collectElementsReader = ( builder: Array ): Channel.Channel => readWith({ onInput: (outElem) => flatMap( sync(() => { builder.push(outElem) }), () => collectElementsReader(builder) ), onFailure: fail, onDone: succeedNow }) /** @internal */ export const concatAll = ( channels: Channel.Channel< Channel.Channel, InElem, OutErr, InErr, any, InDone, Env > ): Channel.Channel => concatAllWith(channels, constVoid, constVoid) /** @internal */ export const concatAllWith = < OutElem, InElem2, OutErr2, InErr2, OutDone, InDone2, Env2, InElem, OutErr, InErr, OutDone2, InDone, Env, OutDone3 >( channels: Channel.Channel< Channel.Channel, InElem, OutErr, InErr, OutDone2, InDone, Env >, f: (o: OutDone, o1: OutDone) => OutDone, g: (o: OutDone, o2: OutDone2) => OutDone3 ): Channel.Channel< OutElem, InElem & InElem2, OutErr | OutErr2, InErr & InErr2, OutDone3, InDone & InDone2, Env | Env2 > => { const op = Object.create(proto) op._tag = OpCodes.OP_CONCAT_ALL op.combineInners = f op.combineAll = g op.onPull = () => upstreamPullStrategy.PullAfterNext(Option.none()) op.onEmit = () => childExecutorDecision.Continue op.value = () => channels op.k = identity return op } /** @internal */ export const concatMapWith = dual< ( f: (o: OutElem) => Channel.Channel, g: (o: OutDone, o1: OutDone) => OutDone, h: (o: OutDone, o2: OutDone2) => OutDone3 ) => ( self: Channel.Channel ) => Channel.Channel< OutElem2, InElem & InElem2, OutErr2 | OutErr, InErr & InErr2, OutDone3, InDone & InDone2, Env2 | Env >, < OutElem, InElem, OutErr, InErr, OutDone2, InDone, Env, OutElem2, InElem2, OutErr2, InErr2, OutDone, InDone2, Env2, OutDone3 >( self: Channel.Channel, f: (o: OutElem) => Channel.Channel, g: (o: OutDone, o1: OutDone) => OutDone, h: (o: OutDone, o2: OutDone2) => OutDone3 ) => Channel.Channel< OutElem2, InElem & InElem2, OutErr2 | OutErr, InErr & InErr2, OutDone3, InDone & InDone2, Env2 | Env > >(4, < Env, InErr, InElem, InDone, OutErr, OutElem, OutElem2, OutDone, OutDone2, OutDone3, Env2, InErr2, InElem2, InDone2, OutErr2 >( self: Channel.Channel, f: ( o: OutElem ) => Channel.Channel, g: (o: OutDone, o1: OutDone) => OutDone, h: (o: OutDone, o2: OutDone2) => OutDone3 ): Channel.Channel< OutElem2, InElem & InElem2, OutErr | OutErr2, InErr & InErr2, OutDone3, InDone & InDone2, Env | Env2 > => { const op = Object.create(proto) op._tag = OpCodes.OP_CONCAT_ALL op.combineInners = g op.combineAll = h op.onPull = () => upstreamPullStrategy.PullAfterNext(Option.none()) op.onEmit = () => childExecutorDecision.Continue op.value = () => self op.k = f return op }) /** @internal */ export const concatMapWithCustom = dual< ( f: (o: OutElem) => Channel.Channel, g: (o: OutDone, o1: OutDone) => OutDone, h: (o: OutDone, o2: OutDone2) => OutDone3, onPull: ( upstreamPullRequest: UpstreamPullRequest.UpstreamPullRequest ) => UpstreamPullStrategy.UpstreamPullStrategy, onEmit: (elem: OutElem2) => ChildExecutorDecision.ChildExecutorDecision ) => ( self: Channel.Channel ) => Channel.Channel< OutElem2, InElem & InElem2, OutErr2 | OutErr, InErr & InErr2, OutDone3, InDone & InDone2, Env2 | Env >, < OutElem, InElem, OutErr, InErr, OutDone2, InDone, Env, OutElem2, InElem2, OutErr2, InErr2, OutDone, InDone2, Env2, OutDone3 >( self: Channel.Channel, f: (o: OutElem) => Channel.Channel, g: (o: OutDone, o1: OutDone) => OutDone, h: (o: OutDone, o2: OutDone2) => OutDone3, onPull: ( upstreamPullRequest: UpstreamPullRequest.UpstreamPullRequest ) => UpstreamPullStrategy.UpstreamPullStrategy, onEmit: (elem: OutElem2) => ChildExecutorDecision.ChildExecutorDecision ) => Channel.Channel< OutElem2, InElem & InElem2, OutErr2 | OutErr, InErr & InErr2, OutDone3, InDone & InDone2, Env2 | Env > >(6, < Env, InErr, InElem, InDone, OutErr, OutElem, OutElem2, OutDone, OutDone2, OutDone3, Env2, InErr2, InElem2, InDone2, OutErr2 >( self: Channel.Channel, f: ( o: OutElem ) => Channel.Channel, g: (o: OutDone, o1: OutDone) => OutDone, h: (o: OutDone, o2: OutDone2) => OutDone3, onPull: ( upstreamPullRequest: UpstreamPullRequest.UpstreamPullRequest ) => UpstreamPullStrategy.UpstreamPullStrategy, onEmit: (elem: OutElem2) => ChildExecutorDecision.ChildExecutorDecision ): Channel.Channel< OutElem2, InElem & InElem2, OutErr | OutErr2, InErr & InErr2, OutDone3, InDone & InDone2, Env | Env2 > => { const op = Object.create(proto) op._tag = OpCodes.OP_CONCAT_ALL op.combineInners = g op.combineAll = h op.onPull = onPull op.onEmit = onEmit op.value = () => self op.k = f return op }) /** @internal */ export const embedInput = dual< ( input: SingleProducerAsyncInput.AsyncInputProducer ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, input: SingleProducerAsyncInput.AsyncInputProducer ) => Channel.Channel >( 2, ( self: Channel.Channel, input: SingleProducerAsyncInput.AsyncInputProducer ): Channel.Channel => { const op = Object.create(proto) op._tag = OpCodes.OP_BRIDGE op.input = input op.channel = self return op } ) /** @internal */ export const ensuringWith = dual< ( finalizer: (e: Exit.Exit) => Effect.Effect ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, finalizer: (e: Exit.Exit) => Effect.Effect ) => Channel.Channel >( 2, ( self: Channel.Channel, finalizer: (e: Exit.Exit) => Effect.Effect ): Channel.Channel => { const op = Object.create(proto) op._tag = OpCodes.OP_ENSURING op.channel = self op.finalizer = finalizer return op } ) /** @internal */ export const fail = (error: E): Channel.Channel => failCause(Cause.fail(error)) /** @internal */ export const failSync = ( evaluate: LazyArg ): Channel.Channel => failCauseSync(() => Cause.fail(evaluate())) /** @internal */ export const failCause = ( cause: Cause.Cause ): Channel.Channel => failCauseSync(() => cause) /** @internal */ export const failCauseSync = ( evaluate: LazyArg> ): Channel.Channel => { const op = Object.create(proto) op._tag = OpCodes.OP_FAIL op.error = evaluate return op } /** @internal */ export const flatMap = dual< ( f: (d: OutDone) => Channel.Channel ) => ( self: Channel.Channel ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr1 | OutErr, InErr & InErr1, OutDone2, InDone & InDone1, Env1 | Env >, ( self: Channel.Channel, f: (d: OutDone) => Channel.Channel ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr1 | OutErr, InErr & InErr1, OutDone2, InDone & InDone1, Env1 | Env > >( 2, ( self: Channel.Channel, f: (d: OutDone) => Channel.Channel ): Channel.Channel< OutElem | OutElem1, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, OutDone2, InDone & InDone1, Env | Env1 > => { const op = Object.create(proto) op._tag = OpCodes.OP_FOLD op.channel = self op.k = new ContinuationKImpl(f, failCause) return op } ) /** @internal */ export const foldCauseChannel = dual< < OutErr, OutElem1, InElem1, OutErr2, InErr1, OutDone2, InDone1, Env1, OutDone, OutElem2, InElem2, OutErr3, InErr2, OutDone3, InDone2, Env2 >( options: { readonly onFailure: ( c: Cause.Cause ) => Channel.Channel readonly onSuccess: (o: OutDone) => Channel.Channel } ) => ( self: Channel.Channel ) => Channel.Channel< OutElem1 | OutElem2 | OutElem, InElem & InElem1 & InElem2, OutErr2 | OutErr3, InErr & InErr1 & InErr2, OutDone2 | OutDone3, InDone & InDone1 & InDone2, Env1 | Env2 | Env >, < OutElem, InElem, OutErr, InErr, OutDone, InDone, Env, OutElem1, InElem1, OutErr2, InErr1, OutDone2, InDone1, Env1, OutElem2, InElem2, OutErr3, InErr2, OutDone3, InDone2, Env2 >( self: Channel.Channel, options: { readonly onFailure: ( c: Cause.Cause ) => Channel.Channel readonly onSuccess: (o: OutDone) => Channel.Channel } ) => Channel.Channel< OutElem1 | OutElem2 | OutElem, InElem & InElem1 & InElem2, OutErr2 | OutErr3, InErr & InErr1 & InErr2, OutDone2 | OutDone3, InDone & InDone1 & InDone2, Env1 | Env2 | Env > >( 2, < OutElem, InElem, OutErr, InErr, OutDone, InDone, Env, OutElem1, InElem1, OutErr2, InErr1, OutDone2, InDone1, Env1, OutElem2, InElem2, OutErr3, InErr2, OutDone3, InDone2, Env2 >( self: Channel.Channel, options: { readonly onFailure: ( c: Cause.Cause ) => Channel.Channel readonly onSuccess: (o: OutDone) => Channel.Channel } ): Channel.Channel< OutElem | OutElem1 | OutElem2, InElem & InElem1 & InElem2, OutErr2 | OutErr3, InErr & InErr1 & InErr2, OutDone2 | OutDone3, InDone & InDone1 & InDone2, Env | Env1 | Env2 > => { const op = Object.create(proto) op._tag = OpCodes.OP_FOLD op.channel = self op.k = new ContinuationKImpl(options.onSuccess, options.onFailure as any) return op } ) /** @internal */ export const fromEffect = ( effect: Effect.Effect ): Channel.Channel => { const op = Object.create(proto) op._tag = OpCodes.OP_FROM_EFFECT op.effect = () => effect return op } /** @internal */ export const pipeTo = dual< ( that: Channel.Channel ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, that: Channel.Channel ) => Channel.Channel >( 2, ( self: Channel.Channel, that: Channel.Channel ): Channel.Channel => { const op = Object.create(proto) op._tag = OpCodes.OP_PIPE_TO op.left = () => self op.right = () => that return op } ) /** @internal */ export const provideContext = dual< ( env: Context.Context ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, env: Context.Context ) => Channel.Channel >( 2, ( self: Channel.Channel, env: Context.Context ): Channel.Channel => { const op = Object.create(proto) op._tag = OpCodes.OP_PROVIDE op.context = () => env op.inner = self return op } ) /** @internal */ export const readOrFail = ( error: E ): Channel.Channel => { const op = Object.create(proto) op._tag = OpCodes.OP_READ op.more = succeed op.done = new ContinuationKImpl(() => fail(error), () => fail(error)) return op } /** @internal */ export const readWith = < InElem, OutElem, OutErr, InErr, OutDone, InDone, Env, OutElem2, OutErr2, OutDone2, Env2, OutElem3, OutErr3, OutDone3, Env3 >( options: { readonly onInput: (input: InElem) => Channel.Channel readonly onFailure: (error: InErr) => Channel.Channel readonly onDone: (done: InDone) => Channel.Channel } ): Channel.Channel< OutElem | OutElem2 | OutElem3, InElem, OutErr | OutErr2 | OutErr3, InErr, OutDone | OutDone2 | OutDone3, InDone, Env | Env2 | Env3 > => readWithCause({ onInput: options.onInput, onFailure: (cause) => Either.match(Cause.failureOrCause(cause), { onLeft: options.onFailure, onRight: failCause }), onDone: options.onDone }) /** @internal */ export const readWithCause = < InElem, OutElem, OutErr, InErr, OutDone, InDone, Env, OutElem2, OutErr2, OutDone2, Env2, OutElem3, OutErr3, OutDone3, Env3 >( options: { readonly onInput: (input: InElem) => Channel.Channel readonly onFailure: ( cause: Cause.Cause ) => Channel.Channel readonly onDone: (done: InDone) => Channel.Channel } ): Channel.Channel< OutElem | OutElem2 | OutElem3, InElem, OutErr | OutErr2 | OutErr3, InErr, OutDone | OutDone2 | OutDone3, InDone, Env | Env2 | Env3 > => { const op = Object.create(proto) op._tag = OpCodes.OP_READ op.more = options.onInput op.done = new ContinuationKImpl(options.onDone, options.onFailure as any) return op } /** @internal */ export const succeed = ( value: A ): Channel.Channel => sync(() => value) /** @internal */ export const succeedNow = ( result: OutDone ): Channel.Channel => { const op = Object.create(proto) op._tag = OpCodes.OP_SUCCEED_NOW op.terminal = result return op } /** @internal */ export const suspend = ( evaluate: LazyArg> ): Channel.Channel => { const op = Object.create(proto) op._tag = OpCodes.OP_SUSPEND op.channel = evaluate return op } export const sync = ( evaluate: LazyArg ): Channel.Channel => { const op = Object.create(proto) op._tag = OpCodes.OP_SUCCEED op.evaluate = evaluate return op } const void_: Channel.Channel = succeedNow(void 0) export { /** @internal */ void_ as void } /** @internal */ export const write = ( out: OutElem ): Channel.Channel => { const op = Object.create(proto) op._tag = OpCodes.OP_EMIT op.out = out return op }