import { StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Allows a faster producer to progress independently of a slower consumer by * buffering up to `capacity` chunks in a queue. * * Note: prefer capacities that are powers of 2 for better performance. * * @tsplus static effect/core/stream/Stream.Aspects bufferChunks * @tsplus pipeable effect/core/stream/Stream bufferChunks */ export function bufferChunks(capacity: number) { return (self: Stream): Stream => { const queue = self.toQueue(capacity) return new StreamInternal( Channel.unwrapScoped(queue.map((queue) => { const process: Channel< never, unknown, unknown, unknown, E, Chunk, void > = Channel.fromEffect(queue.take).flatMap((take) => take.fold( Channel.unit, (cause) => Channel.failCause(cause), (a) => Channel.write(a).flatMap(() => process) ) ) return process })) ) } }