import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Returns a new stream that only emits elements that are not equal to the * previous element emitted, using the specified effectual function to * determine whether two elements are equal. * * @tsplus static effect/core/stream/Stream.Aspects changesWithEffect * @tsplus pipeable effect/core/stream/Stream changesWithEffect */ export function changesWithEffect( f: (x: A, y: A) => Effect ) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal(self.channel >> writer(Maybe.none, f)) } } function writer( last: Maybe, f: (x: A, y: A) => Effect ): Channel, unknown, E | E2, Chunk, void> { return Channel.readWithCause( (chunk: Chunk) => Channel.fromEffect( Effect.reduce(chunk, [last, Chunk.empty()] as const, ([option, as], a) => option.fold( Effect.succeed([Maybe.some(a), as.append(a)] as const), (value) => f(value, a).map((b) => b ? [Maybe.some(a), as] as const : [Maybe.some(a), as.append(a)] as const ) )) ).flatMap( ([newLast, newChunk]) => Channel.write(newChunk).flatMap(() => writer(newLast, f)) ), (cause) => Channel.failCause(cause), () => Channel.unit ) }