import { concreteSink } from "@effect/core/stream/Sink/operations/_internal/SinkInternal" import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Applies the transducer to the stream and emits its outputs. * * @tsplus static effect/core/stream/Stream.Aspects fromSink * @tsplus pipeable effect/core/stream/Stream transduce */ export function transduce(sink: Sink) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal( Channel.suspend(() => { const leftovers = new AtomicReference(Chunk.empty>()) const upstreamDone = new AtomicBoolean(false) const upstreamMarker: Channel< never, E, Chunk, unknown, E, Chunk, unknown > = Channel.readWith( (chunk: Chunk) => Channel.write(chunk).flatMap(() => upstreamMarker), (err) => Channel.fail(err), (done) => Channel.sync(upstreamDone.set(true)).flatMap(() => Channel.succeed(done)) ) const buffer: Channel< never, E, Chunk, unknown, E | E2, Chunk, unknown > = Channel.suspend(() => { const leftover = leftovers.get if (leftover.isEmpty) { return Channel.readWith( (chunk: Chunk) => Channel.write(chunk).flatMap(() => buffer), (err) => Channel.fail(err), (done) => Channel.succeed(done) ) } leftovers.set(Chunk.empty()) return Channel.writeChunk(leftover).flatMap(() => buffer) }) concreteSink(sink) const transducer: Channel< R | R2, never, Chunk, unknown, E | E2, Chunk, void > = sink.channel.doneCollect.flatMap(([leftover, z]) => Channel.sync([upstreamDone.get, concatAndGet(leftovers, leftover)] as const).flatMap( ([done, newLeftovers]) => { const nextChannel = done && newLeftovers.isEmpty ? Channel.unit : transducer return Channel.write(Chunk.single(z)).flatMap(() => nextChannel) } ) ) return (self.channel >> upstreamMarker) >> buffer.pipeToOrFail(transducer) }) ) } } function concatAndGet( leftovers: AtomicReference>>, chunk: Chunk> ): Chunk> { const ls = leftovers.get const concat = ls + chunk.filter((c) => c.isNonEmpty) leftovers.set(concat) return concat }