import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Effectfully transforms all elements of the stream for as long as the * specified partial function is defined. * * @tsplus static effect/core/stream/Stream.Aspects collectWhileEffect * @tsplus pipeable effect/core/stream/Stream collectWhileEffect */ export function collectWhileEffect( pf: (a: A) => Maybe> ) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal( self.channel >> loop(Chunk.empty()[Symbol.iterator](), pf) ) } } function loop( chunkIterator: Iterator, pf: (a: A) => Maybe> ): Channel, unknown, E | E1, Chunk, unknown> { const next = chunkIterator.next() if (next.done) { return Channel.readWithCause( elem => loop(elem[Symbol.iterator](), pf), err => Channel.failCause(err), done => Channel.succeed(done) ) } else { return Channel.unwrap( pf(next.value).fold( () => Effect.succeed(Channel.unit), effect => effect.map(a1 => Channel.write(Chunk.single(a1)).flatMap(() => loop(chunkIterator, pf)) ) ) ) } }