import { concreteStream } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Like `Stream.runIntoQueue`, but provides the result as a scoped effect to * allow for scope composition. * * @tsplus static effect/core/stream/Stream.Aspects runIntoQueueElementsScoped * @tsplus pipeable effect/core/stream/Stream runIntoQueueElementsScoped */ export function runIntoQueueElementsScoped(queue: Enqueue, A>>) { return (self: Stream): Effect => { const writer: Channel< R, E, Chunk, unknown, E, Exit, A>, unknown > = Channel.readWith( (input: Chunk) => input.reduce( Channel.unit as Channel< R, E, Chunk, unknown, E, Exit, A>, unknown >, (channel, a) => channel.flatMap(() => Channel.write(Exit.succeed(a))) ).flatMap(() => writer), (err) => Channel.write(Exit.fail(Maybe.some(err))), () => Channel.write(Exit.fail(Maybe.none)) ) concreteStream(self) return (self.channel >> writer) .mapOutEffect((take) => queue.offer(take)) .drain .runScoped .unit } }