import {
attempt,
chunk,
type MaybePromise,
type Func,
type AsyncFunc,
isFunction,
assert,
assertOptional,
} from '../index.ts';
type BatchFunction = (
AsyncFunc |
Func
);
export type OnChunkParam = {
index: number;
total: number;
items: T[];
processedCount: number;
remainingCount: number;
completionPercent: number;
};
export type BatchOptions = {
items: T[];
concurrency?: number;
failureMode?: 'abort' | 'continue';
onError?: (error: Error, item: T, itemIndex: number) => MaybePromise;
onStart?: (total: number) => MaybePromise;
onEnd?: (results: BatchResult[]) => MaybePromise;
onChunkStart?: (params: OnChunkParam) => MaybePromise;
onChunkEnd?: (params: OnChunkParam) => MaybePromise;
};
export type BatchResult = {
result: R | null;
error: Error | null;
item: T;
index: number;
itemIndex: number;
};
/**
* Processes items in batches with configurable concurrency and error handling.
*
* Provides progress tracking through lifecycle callbacks and supports
* different failure modes for robust batch processing.
*
* @example
*
* const items = [1, 2, 3, 4, 5]; // Array of arguments
*
* const handleOneItem = async (item: number) => {
* console.log(item);
* };
*
* await batch(handleOneItem, {
* concurrency: 10,
* items,
* failureMode: 'continue',
* onError: (error, item) => {
* console.error(error, item);
* },
* onStart: (total) => {
* console.log(`Starting batch with ${total} chunks`);
* },
* onEnd: (results) => {
* console.log(`Batch completed with ${results.length} results`);
* },
* onChunkStart: ({ index, total, items }) => {
* console.log(`Starting chunk ${index + 1}/${total}`);
* },
* onChunkEnd: ({ index, total, items }) => {
* console.log(`Finished chunk ${index + 1}/${total}`);
* }
* });
*/
export const batch = async (
fn: BatchFunction<[T], R>,
options: BatchOptions
): Promise[]> => {
const results: BatchResult[] = [];
const {
items,
concurrency = 10,
failureMode = 'abort',
onError,
onStart,
onChunkStart,
onChunkEnd,
onEnd
} = options;
assert(isFunction(fn), 'fn must be a function');
assert(Array.isArray(items), 'items must be an array');
assert(concurrency > 0, 'concurrency must be greater than 0');
assert(failureMode === 'abort' || failureMode === 'continue', 'failureMode must be either "abort" or "continue"');
assertOptional(onError, isFunction(onError), 'onError must be a function');
assertOptional(onStart, isFunction(onStart), 'onStart must be a function');
assertOptional(onChunkStart, isFunction(onChunkStart), 'onChunkStart must be a function');
assertOptional(onChunkEnd, isFunction(onChunkEnd), 'onChunkEnd must be a function');
assertOptional(onEnd, isFunction(onEnd), 'onEnd must be a function');
const chunks = chunk(items, concurrency);
const totalChunks = chunks.length;
let chunkIndex = 0;
const processOne = async (item: T, itemIndex: number): Promise> => {
const [result, error] = await attempt(() => fn(item) as Promise);
if (error) {
onError?.(error, item, itemIndex);
if (failureMode === 'abort') {
throw error;
}
}
return {
result,
error,
item,
index: chunkIndex,
itemIndex
};
};
const batchExec = async (argsList: T[], chunkOffset: number): Promise => {
const all = await Promise.all(
argsList.map((item, index) => processOne(item, chunkOffset + index))
);
results.push(...all);
};
await onStart?.(totalChunks);
for (chunkIndex = 0; chunkIndex < chunks.length; chunkIndex++) {
const chunk = chunks[chunkIndex];
await onChunkStart?.({
index: chunkIndex,
total: totalChunks,
items: chunk ?? [],
processedCount: chunkIndex * concurrency,
remainingCount: (totalChunks - chunkIndex - 1) * concurrency,
completionPercent: ((chunkIndex + 1) / totalChunks) * 100
});
await batchExec(chunk ?? [], chunkIndex * concurrency);
await onChunkEnd?.({
index: chunkIndex,
total: totalChunks,
items: chunk ?? [],
processedCount: chunkIndex * concurrency,
remainingCount: (totalChunks - chunkIndex - 1) * concurrency,
completionPercent: ((chunkIndex + 1) / totalChunks) * 100
});
}
await onEnd?.(results);
return results;
};