import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Takes all elements of the stream until the specified predicate evaluates to * `true`. * * @tsplus static effect/core/stream/Stream.Aspects takeUntil * @tsplus pipeable effect/core/stream/Stream takeUntil */ export function takeUntil(f: Predicate) { return (self: Stream): Stream => { const loop: Channel, unknown, E, Chunk, unknown> = Channel.readWith( (chunk: Chunk) => { const taken = chunk.takeWhile((a) => !f(a)) const last = chunk.drop(taken.length).take(1) return last.isEmpty ? Channel.write(taken).flatMap(() => loop) : Channel.write(taken + last) }, (err) => Channel.fail(err), (done) => Channel.succeed(done) ) concreteStream(self) return new StreamInternal(self.channel >> loop) } }