import { concreteSink, SinkInternal } from "@effect/core/stream/Sink/operations/_internal/SinkInternal" /** * Effectfully transforms this sink's input chunks. `f` must preserve * chunking-invariance. * * @tsplus static effect/core/stream/Sink.Aspects contramapChunksEffect * @tsplus pipeable effect/core/stream/Sink contramapChunksEffect */ export function contramapChunksEffect( f: (input: Chunk) => Effect> ) { return (self: Sink): Sink => { const loop: Channel< R | R2, never, Chunk, unknown, E | E2, Chunk, unknown > = Channel.readWith( (chunk: Chunk) => Channel.fromEffect(f(chunk)).flatMap((chunk) => Channel.write(chunk)).flatMap(() => loop), (err) => Channel.fail(err), (done) => Channel.succeed(done) ) concreteSink(self) return new SinkInternal(loop.pipeToOrFail(self.channel)) } }