import { ChannelExecutor, readUpstream } from "@effect/core/stream/Channel/ChannelExecutor" import type { ChannelState } from "@effect/core/stream/Channel/ChannelState" import { concreteChannelState } from "@effect/core/stream/Channel/ChannelState" /** * Interpret a `Channel` to a managed pull. * * @tsplus getter effect/core/stream/Channel toPull * @tsplus static effect/core/stream/Channel.Ops toPull */ export function toPull( self: Channel ): Effect>> { return Effect.acquireReleaseExit( Effect.sync(new ChannelExecutor(() => self, undefined, identity)), (exec, exit) => { const finalize = exec.close(exit) return finalize == null ? Effect.unit : finalize } ).map((exec) => Effect.suspendSucceed(interpret(exec.run() as ChannelState, exec))) } function interpret( channelState: ChannelState, exec: ChannelExecutor ): Effect> { concreteChannelState(channelState) switch (channelState._tag) { case "Done": { return exec.getDone().fold( (cause) => Effect.failCause(cause), (done): Effect> => Effect.sync(Either.left(done)) ) } case "Emit": { return Effect.sync(Either.right(exec.getEmit())) } case "Effect": { return channelState.effect.flatMap(() => interpret(exec.run() as ChannelState, exec) ) } case "Read": { return readUpstream( channelState, interpret(exec.run() as ChannelState, exec) ) } } }