import { SinkInternal } from "@effect/core/stream/Sink/operations/_internal/SinkInternal" /** * A sink that executes the provided effectful function for every element fed * to it until `f` evaluates to `false`. * * @tsplus static effect/core/stream/Sink.Ops forEachWhile */ export function forEachWhile( f: (input: In) => Effect ): Sink { const process: Channel< R, E, Chunk, unknown, E, Chunk, void > = Channel.readWithCause( (input: Chunk) => go(f, input, 0, input.length, process), (cause) => Channel.failCause(cause), () => Channel.unit ) return new SinkInternal(process) } function go( f: (input: In) => Effect, chunk: Chunk, index: number, length: number, cont: Channel, unknown, E, Chunk, void> ): Channel, unknown, E, Chunk, void> { return index === length ? cont : Channel.fromEffect(f(chunk.unsafeGet(index))) .flatMap((b) => b ? go(f, chunk, index + 1, length, cont) : Channel.write(chunk.drop(index))) .catchAll((e) => Channel.write(chunk.drop(index)).flatMap(() => Channel.fail(e))) }