import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Statefully maps over the elements of this stream to produce new elements. * * @tsplus static effect/core/stream/Stream.Aspects mapAccum * @tsplus pipeable effect/core/stream/Stream mapAccum */ export function mapAccum(s: S, f: (s: S, a: A) => readonly [S, A1]) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal(self.channel >> accumulator(s, f)) } } function accumulator( current: S, f: (s: S, a: A) => readonly [S, A1] ): Channel, unknown, E, Chunk, void> { return Channel.readWith( (input: Chunk) => { const [nextS, a1s] = input.mapAccum(current, f) return Channel.write(a1s).flatMap(() => accumulator(nextS, f)) }, (err: E) => Channel.fail(err), () => Channel.unit ) }