import { concreteSink, SinkInternal } from "@effect/core/stream/Sink/operations/_internal/SinkInternal" /** * Splits the sink on the specified predicate, returning a new sink that * consumes elements until an element after the first satisfies the specified * predicate. * * @tsplus static effect/core/stream/Sink.Aspects splitWhere * @tsplus pipeable effect/core/stream/Sink splitWhere */ export function splitWhere(f: Predicate) { return (self: Sink): Sink => { concreteSink(self) return new SinkInternal( Channel.fromEffect(Ref.make(Chunk.empty())).flatMap((ref) => splitter(false, ref, f) .pipeToOrFail(self.channel) .doneCollect .flatMap(([leftovers, z]) => Channel.fromEffect(ref.get).flatMap( (leftover) => Channel.write(leftover + leftovers.flatten).flatMap(() => Channel.succeed(z)) ) ) ) ) } } function splitter( written: boolean, leftovers: Ref>, f: Predicate ): Channel, unknown, E, Chunk, unknown> { return Channel.readWithCause( (input: Chunk) => { if (input.isEmpty) { return splitter(written, leftovers, f) } if (written) { const index = input.indexWhere(f) if (index === -1) { return Channel.write(input).flatMap(() => splitter(true, leftovers, f)) } const [left, right] = input.splitAt(index) return Channel.write(left).flatMap(() => Channel.fromEffect(leftovers.set(right))) } const index = input.indexWhereFrom(1, f) if (index === -1) { return Channel.write(input).flatMap(() => splitter(true, leftovers, f)) } const [left, right] = input.splitAt(Math.max(index, 1)) return Channel.write(left).flatMap(() => Channel.fromEffect(leftovers.set(right))) }, (cause) => Channel.failCause(cause), (done) => Channel.succeed(done) ) }