import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Takes all elements of the stream until the specified effectual predicate * evaluates to `true`. * * @tsplus static effect/core/stream/Stream.Aspects takeUntilEffect * @tsplus pipeable effect/core/stream/Stream takeUntilEffect */ export function takeUntilEffect( f: (a: A) => Effect ) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal( self.channel >> loop(Chunk.empty()[Symbol.iterator](), f) ) } } function loop( chunkIterator: Iterator, f: (a: A) => Effect ): Channel, unknown, E | E1, Chunk, unknown> { const next = chunkIterator.next() if (next.done) { return Channel.readWithCause( elem => loop(elem[Symbol.iterator](), f), err => Channel.failCause(err), done => Channel.succeed(done) ) } else { return Channel.unwrap( f(next.value).map(b => b ? Channel.write(Chunk.single(next.value)) : Channel.write(Chunk.single(next.value)).flatMap(() => loop(chunkIterator, f) ) ) ) } }