import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Returns a new stream that only emits elements that are not equal to the * previous element emitted, using the specified function to determine whether * two elements are equal. * * @tsplus static effect/core/stream/Stream.Aspects changesWith * @tsplus pipeable effect/core/stream/Stream changesWith */ export function changesWith(f: (x: A, y: A) => boolean) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal(self.channel >> writer(Maybe.none, f)) } } function writer( last: Maybe, f: (x: A, y: A) => boolean ): Channel, unknown, E, Chunk, void> { return Channel.readWithCause( (chunk: Chunk) => { const [newLast, newChunk] = chunk.reduce( [last, Chunk.empty()] as const, ([option, as], a) => option.isSome() && f(option.value, a) ? [Maybe.some(a), as] : [Maybe.some(a), as.append(a)] ) return Channel.write(newChunk).flatMap(() => writer(newLast, f)) }, (cause) => Channel.failCause(cause), () => Channel.unit ) }