import { Handoff } from "@effect/core/stream/Stream/operations/_internal/Handoff" import { HandoffSignal } from "@effect/core/stream/Stream/operations/_internal/HandoffSignal" import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" import { SinkEndReason } from "@effect/core/stream/Stream/SinkEndReason" type DebounceState = NotStarted | Previous | Current class NotStarted { readonly _tag = "NotStarted" } class Previous { readonly _tag = "Previous" constructor(readonly fiber: Fiber>) {} } class Current { readonly _tag = "Current" constructor(readonly fiber: Fiber>) {} } /** * Delays the emission of values by holding new values for a set duration. If * no new values arrive during that time the value is emitted, however if a * new value is received during the holding period the previous value is * discarded and the process is repeated with the new value. * * This operator is useful if you have a stream of "bursty" events which * eventually settle down and you only need the final event of the burst. * * @example A search engine may only want to initiate a search after a user has * paused typing so as to not prematurely recommend results. * * @tsplus static effect/core/stream/Stream.Aspects debounce * @tsplus pipeable effect/core/stream/Stream debounce */ export function debounce(duration: Duration) { return (self: Stream): Stream => Stream.unwrap( Effect.transplant((grafter) => Do(($) => { const handoff = $(Handoff.make>()) function enqueue(last: Chunk) { return grafter(Clock.sleep(duration).as(last).fork).map((fiber) => consumer(new Previous(fiber)) ) } const producer: Channel< R, E, Chunk, unknown, E, never, unknown > = Channel.readWithCause( (input: Chunk) => input.last.fold( producer, (last) => Channel.fromEffect( handoff.offer(HandoffSignal.Emit(Chunk.single(last))) ).flatMap(() => producer) ), (cause) => Channel.fromEffect(handoff.offer(HandoffSignal.Halt(cause))), () => Channel.fromEffect( handoff.offer(HandoffSignal.End(SinkEndReason.UpstreamEnd)) ) ) function consumer( state: DebounceState ): Channel, unknown> { return Channel.unwrap((() => { switch (state._tag) { case "NotStarted": { return handoff.take.map((signal) => { switch (signal._tag) { case "Emit": { return Channel.unwrap(enqueue(signal.elements)) } case "Halt": { return Channel.failCause(signal.error) } case "End": { return Channel.unit } } }) } case "Current": { return state.fiber.join.map((signal) => { switch (signal._tag) { case "Emit": { return Channel.unwrap(enqueue(signal.elements)) } case "Halt": { return Channel.failCause(signal.error) } case "End": { return Channel.unit } } }) } case "Previous": { return state.fiber.join.raceWith( handoff.take, (exit, current) => exit.fold( (cause) => current.interrupt.as(Channel.failCause(cause)), (chunk) => Effect.succeed( Channel.write(chunk).flatMap(() => consumer(new Current(current))) ) ), (exit, previous) => exit.fold( (cause) => previous.interrupt.as(Channel.failCause(cause)), (signal) => { switch (signal._tag) { case "Emit": { return previous.interrupt.flatMap(() => enqueue(signal.elements)) } case "Halt": { return previous.interrupt.as(Channel.failCause(signal.error)) } case "End": { return previous .join .map((chunk) => Channel.write(chunk).flatMap(() => Channel.unit)) } } } ) ) } } })()) } concreteStream(self) return ( Stream.scoped((self.channel >> producer).runScoped.forkScoped) > new StreamInternal(consumer(new NotStarted())) ) }) ) ) }