import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Statefully and effectfully maps over the elements of this stream to produce * new elements. * * @tsplus static effect/core/stream/Stream.Aspects mapAccumEffect * @tsplus pipeable effect/core/stream/Stream mapAccumEffect */ export function mapAccumEffect( s: S, f: (s: S, a: A) => Effect ) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal(self.channel >> accumulator(s, f)) } } function accumulator( s: S, f: (s: S, a: A) => Effect ): Channel, unknown, E2, Chunk, unknown> { return Channel.readWith( (chunk: Chunk) => Channel.unwrap( Effect.suspendSucceed(() => { const outputChunk = Chunk.builder() const emit = (out: A2) => Effect.sync(() => { outputChunk.append(out) }) return Effect.reduce(chunk, s, (s, a) => f(s, a).flatMap(([s, a2]) => emit(a2).as(s))) .fold( (failure) => { const partialResult = outputChunk.build() return partialResult.isNonEmpty ? Channel.write(partialResult).flatMap(() => Channel.fail(failure)) : Channel.fail(failure) }, (out) => Channel.write(outputChunk.build()).flatMap(() => accumulator(out, f)) ) }) ), (err) => Channel.fail(err), () => Channel.unit ) as Channel, unknown, E2, Chunk, unknown> }