import { StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Allows a faster producer to progress independently of a slower consumer by * buffering chunks into an unbounded queue. * * @tsplus getter effect/core/stream/Stream bufferUnbounded */ export function bufferUnbounded( self: Stream ): Stream { const queue = self.toQueueUnbounded return new StreamInternal( Channel.unwrapScoped(queue.map((queue) => { const process: Channel< never, unknown, unknown, unknown, E, Chunk, void > = Channel.fromEffect(queue.take).flatMap((take) => take.fold( Channel.unit, (cause) => Channel.failCause(cause), (a) => Channel.write(a).flatMap(() => process) ) ) return process })) ) }