// /** // * Executes the specified effect, acquiring the specified number of permits // * immediately before the effect begins execution and releasing them // * delayed by duration after the effect completes execution, whether by success, // * failure, or interruption. // */ // export function withPermitsDuration(permits: number, duration: Duration) { // return (self: TSemaphore): (effect: Effect.Effect) => Effect.Effect => { // return effect => // Effect.uninterruptibleMask( // restore => // restore(self.acquireN(permits).commit) // > restore(effect) // .ensuring( // self.releaseN(permits) // .commit // .delay(duration) // ) // ) // } // } import { Array, type Duration, Effect, type NonEmptyArray } from "effect-app" import { dual } from "effect-app/Function" import type { Semaphore } from "effect/Semaphore" import type { Concurrency } from "effect/Types" /** * Executes the specified effect, acquiring the specified number of permits * immediately before the effect begins execution and releasing them * delayed by duration after the effect completes execution, whether by success, * failure, or interruption. */ export function SEM_withPermitsDuration(permits: number, duration: Duration.Duration) { return (self: Semaphore): (effect: Effect.Effect) => Effect.Effect => { return (effect: Effect.Effect) => Effect.uninterruptibleMask( (restore) => restore(self.take(permits)) .pipe(Effect.andThen( restore(effect) .pipe(Effect.ensuring( Effect.delay(self.release(permits), duration) )) )) ) } } export interface BatchOptions { readonly concurrency?: Concurrency | undefined } export const batch: { ( n: number, forEachItem: (item: T, iWithinBatch: number, batchI: number) => Effect.Effect, forEachBatch: (a: NonEmptyArray, i: number) => Effect.Effect, options?: BatchOptions ): (items: Iterable) => Effect.Effect, E | E2, R | R2> ( items: Iterable, n: number, forEachItem: (item: T, iWithinBatch: number, batchI: number) => Effect.Effect, forEachBatch: (a: NonEmptyArray, i: number) => Effect.Effect, options?: BatchOptions ): Effect.Effect, E | E2, R | R2> } = dual( (args) => typeof args[0] !== "number", ( items: Iterable, n: number, forEachItem: (item: T, iWithinBatch: number, batchI: number) => Effect.Effect, forEachBatch: (a: NonEmptyArray, i: number) => Effect.Effect, options?: BatchOptions ) => Effect.forEach( Array.chunksOf(items, n), (_, i) => Effect .forEach(_, (_, j) => forEachItem(_, j, i), { concurrency: "inherit" }) .pipe(Effect.flatMap((_) => forEachBatch(_, i))), { concurrency: options?.concurrency } ) ) // export function rateLimit( // n: number, // d: DUR // ) { // return (items: Iterable) => // ( // forEachItem: (i: T) => Effect.Effect, // forEachBatch: (a: Chunk) => Effect.Effect // ) => // Stream.fromCollection(items) // .rechunk(n) // .throttleShape(n, d, () => n) // .mapChunksEffect(_ => _.forEachEffectPar(forEachItem).tap(forEachBatch)) // .runCollect // } export function naiveRateLimit( n: number, d: Duration.Duration ) { return (items: Iterable) => (( forEachItem: (i: T) => Effect.Effect, forEachBatch: (a: A[]) => Effect.Effect ) => Effect.forEach( Array.chunksOf(items, n), (batch, i) => ((i === 0) ? Effect.void : Effect.sleep(d)) .pipe(Effect.andThen( Effect .forEach(batch, forEachItem, { concurrency: n }) .pipe(Effect.flatMap(forEachBatch)) )) )) }