// ets_tracing: off import * as CK from "../../../../Collections/Immutable/Chunk/index.js" import * as T from "../../../../Effect/index.js" import * as CH from "../../Channel/index.js" import type * as C from "../core.js" import * as LoopOnChunks from "./loopOnChunks.js" /** * Loops on chunks emitting partially */ export function loopOnPartialChunks_( self: C.Stream, f: (a: CK.Chunk, emit: (a: A1) => T.UIO) => T.Effect ): C.Stream { return LoopOnChunks.loopOnChunks_(self, (chunk) => CH.unwrap( T.suspend(() => { const outputChunk = CK.builder() const emit = (a: A1) => T.succeedWith(() => { outputChunk.append(a) }) return T.catchAll_( T.map_(f(chunk, emit), (cont) => CH.chain_(CH.write(outputChunk.build()), () => CH.end(cont)) ), (failure) => T.succeedWith(() => { const partialResult = outputChunk.build() if (CK.isEmpty(partialResult)) { return CH.fail(failure) } else { return CH.zipRight_(CH.write(partialResult), CH.fail(failure)) } }) ) }) ) ) } /** * Loops on chunks emitting partially * * @ets_data_first loopOnPartialChunks_ */ export function loopOnPartialChunks( f: (a: CK.Chunk, emit: (a: A1) => T.UIO) => T.Effect ) { return (self: C.Stream) => loopOnPartialChunks_(self, f) }