import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Re-chunks the elements of the stream into chunks of `n` elements each. The * last chunk might contain less than `n` elements. * * @tsplus static effect/core/stream/Stream.Aspects rechunk * @tsplus pipeable effect/core/stream/Stream rechunk */ export function rechunk(n: number) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal(self.channel >> process(new Rechunker(n), n)) } } function process( rechunker: Rechunker, target: number ): Channel, unknown, E, Chunk, unknown> { return Channel.readWithCause( (chunk: Chunk) => { if (chunk.size === target && rechunker.isEmpty()) { return Channel.write(chunk).flatMap(() => process(rechunker, target)) } if (chunk.size > 0) { let chunks = List.empty>() let result: Chunk | undefined = undefined let i = 0 while (i < chunk.size) { while (i < chunk.size && result == null) { result = rechunker.write(chunk.unsafeGet(i)) i = i + 1 } if (result != null) { chunks = chunks.prepend(result) result = undefined } } return ( Channel.writeAll(...chunks.reverse).flatMap(() => process(rechunker, target)) ) } return process(rechunker, target) }, (cause) => rechunker.emitIfNotEmpty().flatMap(() => Channel.failCause(cause)), () => rechunker.emitIfNotEmpty() ) } class Rechunker { private builder = Chunk.builder() private pos = 0 constructor(readonly n: number) {} isEmpty(): boolean { return this.pos === 0 } write(elem: A): Chunk | undefined { this.builder.append(elem) this.pos += 1 if (this.pos === this.n) { const result = this.builder.build() this.builder = Chunk.builder() this.pos = 0 return result } return undefined } emitIfNotEmpty(): Channel, void> { if (this.pos !== 0) { return Channel.write(this.builder.build()) } else { return Channel.unit } } }