import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Transforms all elements of the stream for as long as the specified partial * function is defined. * * @tsplus static effect/core/stream/Stream.Aspects collectWhile * @tsplus pipeable effect/core/stream/Stream collectWhile */ export function collectWhile(pf: (a: A) => Maybe) { return (self: Stream): Stream => { const loop: Channel< R, E, Chunk, unknown, E, Chunk, unknown > = Channel.readWith( (input: Chunk) => { const mapped = input.collectWhile(pf) return mapped.size === input.size ? Channel.write(mapped).flatMap(() => loop) : Channel.write(mapped) }, (err) => Channel.fail(err), (done) => Channel.succeed(done) ) concreteStream(self) return new StreamInternal(self.channel >> loop) } }