import { concreteSink, SinkInternal } from "@effect/core/stream/Sink/operations/_internal/SinkInternal" /** * Creates a sink that produces values until one verifies the predicate `f`. * * @tsplus static effect/core/stream/Sink.Aspects untilOutputEffect * @tsplus pipeable effect/core/stream/Sink untilOutputEffect */ export function untilOutputEffect( f: (z: Z) => Effect ) { return ( self: Sink ): Sink> => { concreteSink(self) return new SinkInternal( Channel.fromEffect(Ref.make(Chunk.empty()).zip(Ref.make(false))).flatMap( ([leftoversRef, upstreamDoneRef]) => { const upstreamMarker: Channel< never, never, Chunk, unknown, never, Chunk, unknown > = Channel.readWith( (chunk: Chunk) => Channel.write(chunk).flatMap(() => upstreamMarker), (err) => Channel.fail(err), (done) => Channel.fromEffect(upstreamDoneRef.set(true)).as(done) ) const loop: Channel< R | R2, never, Chunk, unknown, E | E2, Chunk, Maybe > = self.channel.doneCollect.foldChannel( (err) => Channel.fail(err), ([leftovers, doneValue]) => Channel.fromEffect(f(doneValue)).flatMap( (satisfied) => Channel.fromEffect(leftoversRef.set(leftovers.flatten)).flatMap(() => Channel.fromEffect(upstreamDoneRef.get).flatMap((upstreamDone) => satisfied ? Channel.write(leftovers.flatten).as(Maybe.some(doneValue)) : upstreamDone ? Channel.write(leftovers.flatten).as(Maybe.none) : loop ) ) ) ) return ( (upstreamMarker >> Channel.bufferChunk(leftoversRef)) >> loop ) } ) ) } }