import {
concreteStream,
StreamInternal
} from "@effect/core/stream/Stream/operations/_internal/StreamInternal"
/**
* Creates a pipeline that takes elements while the specified predicate
* evaluates to `true`.
*
* @tsplus static effect/core/stream/Stream.Aspects takeWhile
* @tsplus pipeable effect/core/stream/Stream takeWhile
*/
export function takeWhile(f: Predicate) {
return (self: Stream): Stream => {
const loop: Channel, unknown, E, Chunk, unknown> = Channel.readWith(
(chunk: Chunk) => {
const taken = chunk.takeWhile(f)
const more = taken.length === chunk.length
return more ? Channel.write(taken).flatMap(() => loop) : Channel.write(taken)
},
(err) => Channel.fail(err),
(done) => Channel.succeed(done)
)
concreteStream(self)
return new StreamInternal(self.channel >> loop)
}
}