import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Throttles the chunks of this stream according to the given bandwidth * parameters using the token bucket algorithm. Allows for burst in the * processing of elements by allowing the token bucket to accumulate tokens up * to a `units + burst` threshold. Chunks that do not meet the bandwidth * constraints are dropped. The weight of each chunk is determined by the * `costFn` effectful function. * * @tsplus static effect/core/stream/Stream.Aspects throttleEnforceEffect * @tsplus pipeable effect/core/stream/Stream throttleEnforceEffect */ export function throttleEnforceEffect( units: number, duration: Duration, costFn: (input: Chunk) => Effect, burst = 0 ) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal( Channel.fromEffect(Clock.currentTime) .flatMap((timestamp) => self.channel >> loop(units, duration, costFn, burst, units, timestamp) ) ) } } function loop( units: number, duration: Duration, costFn: (input: Chunk) => Effect, burst: number, tokens: number, timestamp: number ): Channel, unknown, E | E2, Chunk, void> { return Channel.readWith( (input: Chunk) => Channel.unwrap( costFn(input) .zip(Clock.currentTime) .map(([weight, current]) => { const elapsed = current - timestamp const cycles = elapsed / duration.millis const sum = tokens + cycles * units const max = units + burst < 0 ? Number.MAX_SAFE_INTEGER : units + burst const available = sum < 0 ? max : Math.min(sum, max) return weight <= available ? Channel.write(input).flatMap(() => loop( units, duration, costFn, burst, available - weight, current ) ) : loop(units, duration, costFn, burst, available, current) }) ), (err) => Channel.fail(err), () => Channel.unit ) }