import { SinkInternal } from "@effect/core/stream/Sink/operations/_internal/SinkInternal" /** * Creates a sink that effectfully folds elements of type `In` into a * structure of type `S`, until `max` worth of elements (determined by the * `costFn`) have been folded. * * The `decompose` function will be used for decomposing elements that cause * an `S` aggregate to cross `max` into smaller elements. Be vigilant with * this function, it has to generate "simpler" values or the fold may never * end. A value is considered indivisible if `decompose` yields the empty * chunk or a single-valued chunk. In these cases, there is no other choice * than to yield a value that will cross the threshold. * * @tsplus static effect/core/stream/Sink.Ops foldWeightedDecomposeEffect */ export function foldWeightedDecomposeEffect( z: S, costFn: (s: S, input: In) => Effect, max: number, decompose: (input: In) => Effect>, f: (s: S, input: In) => Effect ): Sink { return Sink.suspend(new SinkInternal(go(z, costFn, max, decompose, f, false, 0))) } function go( s: S, costFn: (s: S, input: In) => Effect, max: number, decompose: (input: In) => Effect>, f: (s: S, input: In) => Effect, dirty: boolean, cost: number ): Channel, unknown, E | E2 | E3, Chunk, S> { return Channel.readWith( (chunk: Chunk) => Channel.fromEffect( fold(chunk, s, costFn, max, decompose, f, dirty, cost, 0) ).flatMap(([nextS, nextCost, nextDirty, leftovers]) => leftovers.isNonEmpty ? Channel.write(leftovers).flatMap(() => Channel.succeed(nextS)) : cost > max ? Channel.succeed(nextS) : go(nextS, costFn, max, decompose, f, nextDirty, nextCost) ), (err) => Channel.fail(err), (): Channel< R | R2 | R3, E | E2 | E3, Chunk, unknown, E | E2 | E3, Chunk, S > => Channel.succeed(s) ) } function fold( input: Chunk, s: S, costFn: (s: S, input: In) => Effect, max: number, decompose: (input: In) => Effect>, f: (s: S, input: In) => Effect, dirty: boolean, cost: number, index: number ): Effect]> { if (index === input.length) { return Effect.sync([s, cost, dirty, Chunk.empty()]) } const elem = input.unsafeGet(index) return costFn(s, elem) .map((addedCost) => cost + addedCost) .flatMap((total) => { if (total <= max) { return f(s, elem).flatMap((s) => fold(input, s, costFn, max, decompose, f, true, total, index + 1) ) } return decompose(elem).flatMap((decomposed) => { if (decomposed.length <= 1 && !dirty) { // If `elem` cannot be decomposed, we need to cross the `max` threshold. To // minimize "injury", we only allow this when we haven't added anything else // to the aggregate (dirty = false). return f(s, elem).map((s) => [s, total, true, input.drop(index + 1)] as const) } if (decomposed.length <= 1 && dirty) { // If the state is dirty and `elem` cannot be decomposed, we stop folding // and include `elem` in the leftovers. return Effect.sync([s, cost, dirty, input.drop(index)] as const) } // `elem` got decomposed, so we will recurse with the decomposed elements pushed // into the chunk we're processing and see if we can aggregate further. return fold( decomposed + input.drop(index + 1), s, costFn, max, decompose, f, dirty, cost, 0 ) }) }) }