import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Maps over elements of the stream with the specified effectful function. * * @tsplus static effect/core/stream/Stream.Aspects mapEffect * @tsplus pipeable effect/core/stream/Stream mapEffect */ export function mapEffect( f: (a: A) => Effect ) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal( self.channel >> loop(Chunk.empty()[Symbol.iterator](), f) ) } } function loop( chunkIterator: Iterator, f: (a: A) => Effect ): Channel, unknown, E | E1, Chunk, unknown> { const next = chunkIterator.next() if (next.done) { return Channel.readWithCause( (elem) => loop(elem[Symbol.iterator](), f), (err) => Channel.failCause(err), (done) => Channel.succeed(done) ) } else { return Channel.unwrap( Effect.suspendSucceed(f(next.value)).map( a1 => Channel.write(Chunk.single(a1)).flatMap(() => loop(chunkIterator, f)) ) ) } }