import { bufferSignal } from "@effect/core/stream/Stream/operations/_internal/bufferSignal" import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Allows a faster producer to progress independently of a slower consumer by * buffering up to `capacity` elements in a sliding queue. * * This combinator destroys the chunking structure. It's recommended to use * rechunk afterwards. * * Note: prefer capacities that are powers of 2 for better performance. * * @tsplus static effect/core/stream/Stream.Aspects bufferSliding * @tsplus pipeable effect/core/stream/Stream bufferSliding */ export function bufferSliding(capacity: number) { return (self: Stream): Stream => { const queue = Effect.acquireRelease( Queue.sliding, Deferred]>(capacity), (queue) => queue.shutdown ) const stream = self.rechunk(1) concreteStream(stream) return new StreamInternal(bufferSignal(queue, stream.channel)) } }