import { attempt, chunk, type MaybePromise, type Func, type AsyncFunc, isFunction, assert, assertOptional, } from '../index.ts'; type BatchFunction = ( AsyncFunc | Func ); export type OnChunkParam = { index: number; total: number; items: T[]; processedCount: number; remainingCount: number; completionPercent: number; }; export type BatchOptions = { items: T[]; concurrency?: number; failureMode?: 'abort' | 'continue'; onError?: (error: Error, item: T, itemIndex: number) => MaybePromise; onStart?: (total: number) => MaybePromise; onEnd?: (results: BatchResult[]) => MaybePromise; onChunkStart?: (params: OnChunkParam) => MaybePromise; onChunkEnd?: (params: OnChunkParam) => MaybePromise; }; export type BatchResult = { result: R | null; error: Error | null; item: T; index: number; itemIndex: number; }; /** * Processes items in batches with configurable concurrency and error handling. * * Provides progress tracking through lifecycle callbacks and supports * different failure modes for robust batch processing. * * @example * * const items = [1, 2, 3, 4, 5]; // Array of arguments * * const handleOneItem = async (item: number) => { * console.log(item); * }; * * await batch(handleOneItem, { * concurrency: 10, * items, * failureMode: 'continue', * onError: (error, item) => { * console.error(error, item); * }, * onStart: (total) => { * console.log(`Starting batch with ${total} chunks`); * }, * onEnd: (results) => { * console.log(`Batch completed with ${results.length} results`); * }, * onChunkStart: ({ index, total, items }) => { * console.log(`Starting chunk ${index + 1}/${total}`); * }, * onChunkEnd: ({ index, total, items }) => { * console.log(`Finished chunk ${index + 1}/${total}`); * } * }); */ export const batch = async ( fn: BatchFunction<[T], R>, options: BatchOptions ): Promise[]> => { const results: BatchResult[] = []; const { items, concurrency = 10, failureMode = 'abort', onError, onStart, onChunkStart, onChunkEnd, onEnd } = options; assert(isFunction(fn), 'fn must be a function'); assert(Array.isArray(items), 'items must be an array'); assert(concurrency > 0, 'concurrency must be greater than 0'); assert(failureMode === 'abort' || failureMode === 'continue', 'failureMode must be either "abort" or "continue"'); assertOptional(onError, isFunction(onError), 'onError must be a function'); assertOptional(onStart, isFunction(onStart), 'onStart must be a function'); assertOptional(onChunkStart, isFunction(onChunkStart), 'onChunkStart must be a function'); assertOptional(onChunkEnd, isFunction(onChunkEnd), 'onChunkEnd must be a function'); assertOptional(onEnd, isFunction(onEnd), 'onEnd must be a function'); const chunks = chunk(items, concurrency); const totalChunks = chunks.length; let chunkIndex = 0; const processOne = async (item: T, itemIndex: number): Promise> => { const [result, error] = await attempt(() => fn(item) as Promise); if (error) { onError?.(error, item, itemIndex); if (failureMode === 'abort') { throw error; } } return { result, error, item, index: chunkIndex, itemIndex }; }; const batchExec = async (argsList: T[], chunkOffset: number): Promise => { const all = await Promise.all( argsList.map((item, index) => processOne(item, chunkOffset + index)) ); results.push(...all); }; await onStart?.(totalChunks); for (chunkIndex = 0; chunkIndex < chunks.length; chunkIndex++) { const chunk = chunks[chunkIndex]; await onChunkStart?.({ index: chunkIndex, total: totalChunks, items: chunk ?? [], processedCount: chunkIndex * concurrency, remainingCount: (totalChunks - chunkIndex - 1) * concurrency, completionPercent: ((chunkIndex + 1) / totalChunks) * 100 }); await batchExec(chunk ?? [], chunkIndex * concurrency); await onChunkEnd?.({ index: chunkIndex, total: totalChunks, items: chunk ?? [], processedCount: chunkIndex * concurrency, remainingCount: (totalChunks - chunkIndex - 1) * concurrency, completionPercent: ((chunkIndex + 1) / totalChunks) * 100 }); } await onEnd?.(results); return results; };