import { concreteStream } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Like `Stream.runIntoQueue`, but provides the result as a scoped effect to * allow for scope composition. * * @tsplus static effect/core/stream/Stream.Aspects runIntoQueueScoped * @tsplus pipeable effect/core/stream/Stream runIntoQueueScoped */ export function runIntoQueueScoped(queue: Enqueue>) { return (self: Stream): Effect => { const writer: Channel< R, E, Chunk, unknown, E, Take, unknown > = Channel.readWithCause( (input: Chunk) => Channel.write(Take.chunk(input)).flatMap(() => writer), (cause) => Channel.write(Take.failCause(cause)), () => Channel.write(Take.end) ) concreteStream(self) return (self.channel >> writer) .mapOutEffect((take) => queue.offer(take)) .drain .runScoped .unit } }