import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Transduce a stream using a chunk processing function. * * @tsplus static effect/core/stream/Stream.Ops transducePush */ export function transducePush( push: Effect< R2 | Scope, never, (input: Maybe>) => Effect> > ) { return (stream: Stream): Stream => { const channel: Channel< R | R2 | R3, E, Chunk, unknown, E | E2, Chunk, unknown > = Channel.unwrapScoped(push.map((push) => pull(push))) concreteStream(stream) return new StreamInternal(stream.channel >> channel) } } function pull( push: (input: Maybe>) => Effect> ): Channel, unknown, E | E2, Chunk, unknown> { return Channel.readWith( (input: Chunk) => Channel.fromEffect(push(Maybe.some(input))) .flatMap((out) => Channel.write(out)) .flatMap(() => pull(push)), (err) => Channel.fail(err), () => Channel.fromEffect(push(Maybe.none)).flatMap((out) => Channel.write(out)) ) }