import {
concreteStream,
StreamInternal
} from "@effect/core/stream/Stream/operations/_internal/StreamInternal"
/**
* Takes all elements of the stream until the specified predicate evaluates to
* `true`.
*
* @tsplus static effect/core/stream/Stream.Aspects takeUntil
* @tsplus pipeable effect/core/stream/Stream takeUntil
*/
export function takeUntil(f: Predicate) {
return (self: Stream): Stream => {
const loop: Channel, unknown, E, Chunk, unknown> = Channel.readWith(
(chunk: Chunk) => {
const taken = chunk.takeWhile((a) => !f(a))
const last = chunk.drop(taken.length).take(1)
return last.isEmpty ? Channel.write(taken).flatMap(() => loop) : Channel.write(taken + last)
},
(err) => Channel.fail(err),
(done) => Channel.succeed(done)
)
concreteStream(self)
return new StreamInternal(self.channel >> loop)
}
}