import {
concreteStream,
StreamInternal
} from "@effect/core/stream/Stream/operations/_internal/StreamInternal"
import { DurationInternal } from "@tsplus/stdlib/data/Duration"
/**
* Delays the chunks of this stream according to the given bandwidth
* parameters using the token bucket algorithm. Allows for burst in the
* processing of elements by allowing the token bucket to accumulate tokens up
* to a `units + burst` threshold. The weight of each chunk is determined by
* the `costFn` effectful function.
*
* @tsplus static effect/core/stream/Stream.Aspects throttleShapeEffect
* @tsplus pipeable effect/core/stream/Stream throttleShapeEffect
*/
export function throttleShapeEffect(
units: number,
duration: Duration,
costFn: (input: Chunk) => Effect,
burst = 0
) {
return (self: Stream): Stream => {
concreteStream(self)
return new StreamInternal(
Channel.fromEffect(Clock.currentTime)
.flatMap(
(timestamp) =>
self.channel >>
loop(units, duration, costFn, burst, units, timestamp)
)
)
}
}
function loop(
units: number,
duration: Duration,
costFn: (input: Chunk) => Effect,
burst: number,
tokens: number,
timestamp: number
): Channel, unknown, E | E2, Chunk, void> {
return Channel.readWith(
(input: Chunk) =>
Channel.unwrap(
costFn(input)
.zip(Clock.currentTime)
.map(([weight, current]) => {
const elapsed = current - timestamp
const cycles = elapsed / duration.millis
const sum = tokens + cycles * units
const max = units + burst < 0 ? Number.MAX_SAFE_INTEGER : units + burst
const available = sum < 0 ? max : Math.min(sum, max)
const remaining = available - weight
const waitCycles = remaining >= 0 ? 0 : -remaining / units
const delay = new DurationInternal(Math.floor(waitCycles * duration.millis))
return delay > (0).millis
? Channel.fromEffect(Clock.sleep(delay))
.flatMap(() => Channel.write(input))
.flatMap(() =>
loop(units, duration, costFn, burst, remaining, current)
)
: Channel.write(input).flatMap(() =>
loop(units, duration, costFn, burst, remaining, current)
)
})
),
(err) => Channel.fail(err),
() => Channel.unit
)
}