import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Creates a pipeline that takes elements while the specified predicate * evaluates to `true`. * * @tsplus static effect/core/stream/Stream.Aspects takeWhile * @tsplus pipeable effect/core/stream/Stream takeWhile */ export function takeWhile(f: Predicate) { return (self: Stream): Stream => { const loop: Channel, unknown, E, Chunk, unknown> = Channel.readWith( (chunk: Chunk) => { const taken = chunk.takeWhile(f) const more = taken.length === chunk.length return more ? Channel.write(taken).flatMap(() => loop) : Channel.write(taken) }, (err) => Channel.fail(err), (done) => Channel.succeed(done) ) concreteStream(self) return new StreamInternal(self.channel >> loop) } }