import { Transform } from 'node:stream' import { createCommonLoggerAtLevel } from '@naturalcycles/js-lib/log' import type { DeferredPromise } from '@naturalcycles/js-lib/promise' import { pDefer } from '@naturalcycles/js-lib/promise/pDefer.js' import type { NumberOfSeconds, PositiveInteger } from '@naturalcycles/js-lib/types' import type { TransformOptions, TransformTyped } from '../stream.model.js' export interface TransformWarmupOptions extends TransformOptions { /** * Target concurrency after warmup completes. */ concurrency: PositiveInteger /** * Time in seconds to gradually increase concurrency from 1 to `concurrency`. * Set to 0 to disable warmup (pass-through mode from the start). */ warmupSeconds: NumberOfSeconds } /** * Transform that gradually increases concurrency from 1 to the configured maximum * over a warmup period. Useful for scenarios where you want to avoid overwhelming * a system at startup (e.g., database connections, API rate limits). * * During warmup: limits concurrent items based on elapsed time. * After warmup: passes items through immediately with zero overhead. * * @experimental */ export function transformWarmup(opt: TransformWarmupOptions): TransformTyped { const { concurrency, warmupSeconds, objectMode = true, highWaterMark = 1 } = opt const warmupMs = warmupSeconds * 1000 const logger = createCommonLoggerAtLevel(opt.logger, opt.logLevel) let startTime = 0 let warmupComplete = warmupSeconds <= 0 || concurrency <= 1 let inFlight = 0 const waiters: DeferredPromise[] = [] return new Transform({ objectMode, highWaterMark, async transform(item: T, _, cb) { // Initialize start time on first item if (startTime === 0) { startTime = Date.now() } // Fast-path: after warmup, just pass through with zero overhead if (warmupComplete) { cb(null, item) return } const currentConcurrency = getCurrentConcurrency() if (inFlight < currentConcurrency) { // Have room, proceed immediately inFlight++ logger.debug(`inFlight++ ${inFlight}/${currentConcurrency}, waiters ${waiters.length}`) } else { // Wait for a slot const waiter = pDefer() waiters.push(waiter) logger.debug(`inFlight ${inFlight}/${currentConcurrency}, waiters++ ${waiters.length}`) await waiter logger.debug(`waiter resolved, inFlight ${inFlight}/${getCurrentConcurrency()}`) } // Push the item cb(null, item) // Release slot on next microtask - essential for concurrency control. // Without this, the slot would be freed immediately and items would // flow through without any limiting effect. queueMicrotask(release) }, }) function getCurrentConcurrency(): number { if (warmupComplete) return concurrency const elapsed = Date.now() - startTime if (elapsed >= warmupMs) { warmupComplete = true logger.debug('warmup complete') return concurrency } // Linear interpolation from 1 to concurrency const progress = elapsed / warmupMs return Math.max(1, Math.floor(1 + (concurrency - 1) * progress)) } function release(): void { inFlight-- // Wake up waiters based on current concurrency (may have increased) const currentConcurrency = getCurrentConcurrency() while (waiters.length && inFlight < currentConcurrency) { inFlight++ waiters.shift()!.resolve() } } }