import { Transform } from 'node:stream' import { _ms, _since, localTime } from '@naturalcycles/js-lib/datetime' import { createCommonLoggerAtLevel } from '@naturalcycles/js-lib/log' import type { DeferredPromise } from '@naturalcycles/js-lib/promise' import { pDefer } from '@naturalcycles/js-lib/promise/pDefer.js' import type { NumberOfSeconds, PositiveInteger, UnixTimestampMillis, } from '@naturalcycles/js-lib/types' import type { TransformOptions, TransformTyped } from '../stream.model.js' export interface TransformThrottleOptions extends TransformOptions { /** * How many items to allow per `interval` of seconds. */ throughput: PositiveInteger /** * How long is the interval (in seconds) where number of items should not exceed `throughput`. */ interval: NumberOfSeconds } /** * Allows to throttle the throughput of the stream. * For example, when you have an API with rate limit of 5000 requests per minute, * `transformThrottle` can help you utilize it most efficiently. * You can define it as: * * _pipeline([ * // ... * transformThrottle({ * throughput: 5000, * interval: 60, * }), * // ... * ]) * * @experimental */ export function transformThrottle(opt: TransformThrottleOptions): TransformTyped { const { throughput, interval, objectMode = true, highWaterMark = 1 } = opt let count = 0 let start: UnixTimestampMillis let lock: DeferredPromise | undefined let timeout: NodeJS.Timeout | undefined const logger = createCommonLoggerAtLevel(opt.logger, opt.logLevel) return new Transform({ objectMode, highWaterMark, async transform(item: T, _, cb) { // console.log('incoming', item, { paused: !!paused, count }) if (!start) { start = localTime.nowUnixMillis() timeout = setTimeout(() => onInterval(), interval * 1000) logger.log(`${localTime.now().toPretty()} transformThrottle started with`, { throughput, interval, rps: Math.round(throughput / interval), }) } if (lock) { // console.log('awaiting lock', {item, count}) await lock } if (++count >= throughput) { // console.log('pausing now after', {item, count}) lock = pDefer() logger.log( `${localTime.now().toPretty()} transformThrottle activated: ${count} items passed in ${_since(start)}, will pause for ${_ms(interval * 1000 - (Date.now() - start))}`, ) } cb(null, item) // pass the item through }, final(cb) { clearTimeout(timeout) cb() }, }) function onInterval(): void { if (lock) { logger.log(`${localTime.now().toPretty()} transformThrottle resumed`) lock.resolve() lock = undefined } else { logger.log( `${localTime.now().toPretty()} transformThrottle passed ${count} (of max ${throughput}) items in ${_since(start)}`, ) } count = 0 start = localTime.nowUnixMillis() timeout = setTimeout(() => onInterval(), interval * 1000) } }