import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Zips each element with the next element if present. * * @tsplus getter effect/core/stream/Stream zipWithNext */ export function zipWithNext(self: Stream): Stream]> { concreteStream(self) return new StreamInternal(self.channel >> process(Maybe.none)) } function process( last: Maybe ): Channel, unknown, E, Chunk]>, void> { return Channel.readWith( (input: Chunk) => { const [newLast, chunk] = input.mapAccum( last, (prev, curr) => [Maybe.some(curr), prev.map((a) => [a, curr] as const)] as const ) const out = chunk.collect((option) => option.isSome() ? Maybe.some([option.value[0], Maybe.some(option.value[1])] as const) : Maybe.none ) return Channel.write(out).flatMap(() => process(newLast)) }, (err: E) => Channel.fail(err), () => last.fold( Channel.unit, (a) => Channel.write(Chunk.single([a, Maybe.none] as const)).flatMap(() => Channel.unit) ) ) }