import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Creates a stream that groups on adjacent keys, calculated by function f. * * @tsplus static effect/core/stream/Stream.Aspects groupAdjacentBy * @tsplus pipeable effect/core/stream/Stream groupAdjacentBy */ export function groupAdjacentBy(f: (a: A) => K) { return (self: Stream): Stream]> => { concreteStream(self) return new StreamInternal(self.channel >> chunkAdjacent(Maybe.none, f)) } } function chunkAdjacent( buffer: Maybe]>, f: (a: A) => K ): Channel, unknown, E, Chunk]>, unknown> { return Channel.readWithCause( (chunk: Chunk) => { const [outputs, newBuffer] = go(chunk, buffer, f) return Channel.write(outputs).flatMap(() => chunkAdjacent(newBuffer, f)) }, (cause) => Channel.failCause(cause), () => buffer.fold(Channel.unit, (o) => Channel.write(Chunk.single(o))) ) } function go( input: Chunk, state: Maybe]>, f: (a: A) => K ): readonly [Chunk]>, Maybe]>] { return input.reduce( [Chunk.empty]>(), state] as const, ([os, o], a) => o.fold([os, Maybe.some([f(a), Chunk.single(a)] as const)] as const, (agg) => { const k2 = f(a) const [k, aggregated] = agg if (k === k2) { return [os, Maybe.some([k, aggregated.append(a)] as const)] as const } else { return [os.append(agg), Maybe.some([k2, Chunk.single(a)] as const)] as const } }) ) }