import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" import { TerminationStrategy } from "@effect/core/stream/Stream/TerminationStrategy" /** * Sends all elements emitted by this stream to the specified sink in addition * to emitting them. * * @tsplus static effect/core/stream/Stream.Aspects tapSink * @tsplus pipeable effect/core/stream/Stream tapSink */ export function tapSink(sink: Sink) { return (self: Stream): Stream => Stream.fromEffect(Queue.bounded>(1)).flatMap((queue) => { const right = Stream.fromQueueWithShutdown(queue, 1).flattenTake const loop: Channel< R | R2, E, Chunk, unknown, E2, Chunk, unknown > = Channel.readWithCause( (chunk: Chunk) => Channel.fromEffect(queue.offer(Take.chunk(chunk))) .flatMap(() => Channel.write(chunk)) .flatMap(() => loop), (cause) => Channel.fromEffect(queue.offer(Take.failCause(cause))), () => Channel.fromEffect(queue.shutdown) ) concreteStream(self) return (new StreamInternal(self.channel >> loop) as Stream).merge( Stream.execute(right.run(sink)), TerminationStrategy.Both ) }) }