import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Performs an effectful filter and map in a single step. * * @tsplus static effect/core/stream/Stream.Aspects collectEffect * @tsplus pipeable effect/core/stream/Stream collectEffect */ export function collectEffect( pf: (a: A) => Maybe> ) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal( self.channel >> loop(Chunk.empty()[Symbol.iterator](), pf) ) } } function loop( chunkIterator: Iterator, pf: (a: A) => Maybe> ): Channel, unknown, E | E1, Chunk, unknown> { const next = chunkIterator.next() if (next.done) { return Channel.readWithCause( (elem) => loop(elem[Symbol.iterator](), pf), (err) => Channel.failCause(err), (done) => Channel.succeed(done) ) } else { return Channel.unwrap( pf(next.value).fold( () => Effect.sync(loop(chunkIterator, pf)), effect => effect.map( a1 => Channel.write(Chunk.single(a1)).flatMap(() => loop(chunkIterator, pf) ) ) ) ) } }