import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" import { RingBufferNew } from "@effect/core/support/RingBufferNew" /** * Emits a sliding window of `n` elements. * * @tsplus static effect/core/stream/Stream.Aspects sliding * @tsplus pipeable effect/core/stream/Stream sliding */ export function sliding(chunkSize: number, stepSize = 1) { return (self: Stream): Stream> => { if (chunkSize <= 0 || stepSize <= 0) { return Stream.dieSync( new IllegalArgumentException( "Invalid bounds - `chunkSize` and `stepSize` must be greater than 0" ) ) } return Stream.sync(new RingBufferNew(chunkSize)).flatMap((queue) => { concreteStream(self) return new StreamInternal( self.channel >> reader(chunkSize, stepSize, queue, 0) ) }) } } function reader( chunkSize: number, stepSize: number, queue: RingBufferNew, queueSize: number ): Channel, unknown, E, Chunk>, unknown> { return Channel.readWithCause( (input: Chunk) => Channel.write( input.zipWithIndex.collect(([a, index]) => { queue.put(a) const currentIndex = queueSize + index + 1 return currentIndex < chunkSize || (currentIndex - chunkSize) % stepSize > 0 ? Maybe.none : Maybe.some(queue.toChunk()) }) ).flatMap(() => reader(chunkSize, stepSize, queue, queueSize + input.length)), (cause) => emitOnStreamEnd( chunkSize, stepSize, queue, queueSize, Channel.failCause(cause) ), () => emitOnStreamEnd(chunkSize, stepSize, queue, queueSize, Channel.unit) ) } function emitOnStreamEnd( chunkSize: number, stepSize: number, queue: RingBufferNew, queueSize: number, channelEnd: Channel, unknown, E, Chunk>, unknown> ): Channel, unknown, E, Chunk>, unknown> { if (queueSize < chunkSize) { const items = queue.toChunk() const result = items.isEmpty ? Chunk.empty>() : Chunk.single(items) return Channel.write(result).flatMap(() => channelEnd) } const lastEmitIndex = queueSize - ((queueSize - chunkSize) % stepSize) if (lastEmitIndex === queueSize) { return channelEnd } const leftovers = queueSize - (lastEmitIndex - chunkSize + stepSize) const lastItems = queue.toChunk().takeRight(leftovers) const result = lastItems.isEmpty ? Chunk.empty>() : Chunk.single(lastItems) return Channel.write(result).flatMap(() => channelEnd) }