import { SinkInternal } from "@effect/core/stream/Sink/operations/_internal/SinkInternal" /** * A sink that effectfully folds its inputs with the provided function, * termination predicate and initial state. * * @tsplus static effect/core/stream/Sink.Ops foldEffect */ export function foldEffect( z: S, cont: Predicate, f: (s: S, input: In) => Effect ): Sink { return Sink.suspend(new SinkInternal(reader(z, cont, f))) } function reader( z: S, cont: Predicate, f: (s: S, input: In) => Effect ): Channel, unknown, E, Chunk, S> { if (!cont(z)) { return Channel.succeed(z) } return Channel.readWith( (chunk: Chunk) => Channel.fromEffect(foldChunkSplitEffect(z, chunk, cont, f)).flatMap( ([nextS, leftovers]) => leftovers.fold(reader(nextS, cont, f), (leftover) => Channel.write(leftover).as(nextS)) ), (err) => Channel.fail(err), () => Channel.succeed(z) ) } function foldChunkSplitEffect( z: S, chunk: Chunk, cont: Predicate, f: (s: S, input: In) => Effect ): Effect>]> { return foldEffectInternal(z, chunk, cont, f, 0, chunk.length) } function foldEffectInternal( z: S, chunk: Chunk, cont: Predicate, f: (s: S, input: In) => Effect, index: number, length: number ): Effect>]> { if (index === length) { return Effect.succeed([z, Maybe.none]) } return f(z, chunk.unsafeGet(index)).flatMap((z1) => cont(z1) ? foldEffectInternal(z1, chunk, cont, f, index + 1, length) : Effect.succeed([z1, Maybe.some(chunk.drop(index + 1))]) ) }