/** * Thin high-level helpers layered on top of the explicit worker primitives. * * @module bquery/concurrency */ import { createTaskPool } from './pool'; import { runTask } from './task'; import { validateTaskHandler } from './internal'; import type { ParallelCollectionOptions, ParallelMapHandler, ParallelMapOptions, ParallelOptions, ParallelPredicateHandler, ParallelReduceHandler, ParallelResults, ParallelTask, TaskPool, TaskRunOptions, WorkerTaskHandler, } from './types'; interface SerializedParallelTask { handlerSource: string; input: unknown; } interface SerializedChunk { items: Array<{ index: number; value: TInput; }>; handlerSource: string; } interface IndexedMapResult { index: number; value: TResult; } const executeSerializedTask = async (job: SerializedParallelTask): Promise => { const revive = new Function(`return (${job.handlerSource});`); const handler = revive() as ((input: unknown) => unknown | Promise) | undefined; if (typeof handler !== 'function') { throw new TypeError('The serialized task handler did not revive as a function.'); } return await handler(job.input); }; interface SerializedReduceJob { initialValue: TAccumulator; reducerSource: string; values: readonly TInput[]; } const executeSerializedChunk = async ( job: SerializedChunk ): Promise>> => { const revive = new Function(`return (${job.handlerSource});`); const handler = revive() as | ((value: unknown, index: number) => unknown | Promise) | undefined; if (typeof handler !== 'function') { throw new TypeError('The serialized collection handler did not revive as a function.'); } const results: Array> = []; for (const item of job.items) { results.push({ index: item.index, value: await handler(item.value, item.index), }); } return results; }; const executeSerializedReduce = async (job: SerializedReduceJob): Promise => { const revive = new Function(`return (${job.reducerSource});`); const reducer = revive() as | ((accumulator: unknown, value: unknown, index: number) => unknown | Promise) | undefined; if (typeof reducer !== 'function') { throw new TypeError('The serialized reducer did not revive as a function.'); } let accumulator = job.initialValue; for (let index = 0; index < job.values.length; index++) { accumulator = await reducer(accumulator, job.values[index], index); } return accumulator; }; const normalizeBatchSize = (batchSize: number | undefined, label: string): number => { if (batchSize === undefined) { return 1; } if (!Number.isInteger(batchSize) || batchSize < 1) { throw new RangeError(`${label} batchSize must be a positive integer.`); } return batchSize; }; const createSerializedTaskPool = ( options: ParallelOptions ): TaskPool => { return createTaskPool(executeSerializedTask, options); }; const serializeTask = ( task: ParallelTask ): SerializedParallelTask => ({ handlerSource: validateTaskHandler(task.handler), input: task.input, }); const runChunkedHandler = async ( values: readonly TInput[], handler: (value: TInput, index: number) => TResult | Promise, options: ParallelCollectionOptions = {}, label: string ): Promise => { if (values.length === 0) { return []; } const handlerSource = validateTaskHandler( handler as unknown as WorkerTaskHandler ); const { batchSize, signal, ...poolOptions } = options; const normalizedBatchSize = normalizeBatchSize(batchSize, label); const pool = createTaskPool(executeSerializedChunk, poolOptions); const chunks: Array> = []; for (let index = 0; index < values.length; index += normalizedBatchSize) { const end = Math.min(index + normalizedBatchSize, values.length); const items: SerializedChunk['items'] = []; for (let itemIndex = index; itemIndex < end; itemIndex += 1) { items.push({ index: itemIndex, value: values[itemIndex], }); } chunks.push({ items, handlerSource }); } try { const chunkResults = await Promise.all( chunks.map((chunk) => pool.run(chunk, signal ? { signal } : undefined)) ); const mapped = new Array(values.length); for (const chunk of chunkResults) { for (const item of chunk) { mapped[item.index] = item.value as TResult; } } return mapped; } finally { pool.terminate(); } }; /** * Executes multiple standalone tasks in parallel using a bounded worker pool. * * @example * ```ts * import { parallel } from '@bquery/bquery/concurrency'; * * const results = await parallel([ * { handler: (value: number) => value * 2, input: 5 }, * { handler: ({ a, b }: { a: number; b: number }) => a + b, input: { a: 1, b: 2 } }, * ]); * ``` */ export async function parallel( tasks: TTasks, options: ParallelOptions = {} ): Promise> { if (tasks.length === 0) { return [] as unknown as ParallelResults; } const pool = createSerializedTaskPool(options); try { const results = await Promise.all( tasks.map((task) => pool.run(serializeTask(task), task.options)) ); return results as ParallelResults; } finally { pool.terminate(); } } /** * Executes tasks in sequential batches while each batch still uses parallel workers. * * This adapts `threadts-universal`'s batch helper to bQuery without colliding * with the reactive module's existing `batch()` export. * * @example * ```ts * import { batchTasks } from '@bquery/bquery/concurrency'; * * const results = await batchTasks( * [ * { handler: (value: number) => value * 2, input: 1 }, * { handler: (value: number) => value * 2, input: 2 }, * { handler: (value: number) => value * 2, input: 3 }, * ], * 2 * ); * ``` */ export async function batchTasks( tasks: TTasks, batchSize?: number, options: ParallelOptions = {} ): Promise> { if (tasks.length === 0) { return [] as unknown as ParallelResults; } const normalizedBatchSize = normalizeBatchSize(batchSize, 'batchTasks'); const pool = createSerializedTaskPool(options); const results: unknown[] = []; try { for (let index = 0; index < tasks.length; index += normalizedBatchSize) { const batch = tasks.slice(index, index + normalizedBatchSize); const batchResults = await Promise.all( batch.map((task) => pool.run(serializeTask(task), task.options)) ); results.push(...batchResults); } return results as ParallelResults; } finally { pool.terminate(); } } /** * Maps an array in parallel using optional chunking on top of `createTaskPool()`. * * @example * ```ts * import { map } from '@bquery/bquery/concurrency'; * * const results = await map([1, 2, 3], (value, index) => value + index, { * batchSize: 2, * concurrency: 2, * }); * ``` */ export async function map( values: readonly TInput[], mapper: ParallelMapHandler, options: ParallelMapOptions = {} ): Promise { return runChunkedHandler(values, mapper, options, 'map'); } /** * Filters an array in parallel using a standalone predicate with optional chunking. */ export async function filter( values: readonly TInput[], predicate: ParallelPredicateHandler, options: ParallelCollectionOptions = {} ): Promise { const matches = await runChunkedHandler(values, predicate, options, 'filter'); const filtered: TInput[] = []; for (let index = 0; index < values.length; index += 1) { if (!matches[index]) { continue; } if (index in values) { filtered.push(values[index] as TInput); } else { filtered.length += 1; } } return filtered; } /** * Returns whether at least one array item matches a standalone predicate. * * The current implementation evaluates predicate chunks explicitly and reduces * the final boolean result on the main thread instead of using hidden globals * or speculative worker cancellation. */ export async function some( values: readonly TInput[], predicate: ParallelPredicateHandler, options: ParallelCollectionOptions = {} ): Promise { if (values.length === 0) { return false; } const matches = await runChunkedHandler(values, predicate, options, 'some'); return matches.some(Boolean); } /** * Returns whether every array item matches a standalone predicate. * * The current implementation evaluates predicate chunks explicitly and reduces * the final boolean result on the main thread instead of using hidden globals * or speculative worker cancellation. */ export async function every( values: readonly TInput[], predicate: ParallelPredicateHandler, options: ParallelCollectionOptions = {} ): Promise { if (values.length === 0) { return true; } const matches = await runChunkedHandler(values, predicate, options, 'every'); return matches.every(Boolean); } /** * Finds the first array item that matches a standalone predicate. */ export async function find( values: readonly TInput[], predicate: ParallelPredicateHandler, options: ParallelCollectionOptions = {} ): Promise { if (values.length === 0) { return undefined; } const matches = await runChunkedHandler(values, predicate, options, 'find'); const index = matches.findIndex(Boolean); return index === -1 ? undefined : values[index]; } /** * Reduces an array inside one isolated worker while preserving standard * left-to-right accumulator semantics. */ export async function reduce( values: readonly TInput[], reducer: ParallelReduceHandler, initialValue: TAccumulator, options: TaskRunOptions = {} ): Promise { if (values.length === 0) { return initialValue; } const reducerSource = validateTaskHandler( reducer as unknown as WorkerTaskHandler ); return runTask( executeSerializedReduce, { initialValue, reducerSource, values, }, options ) as Promise; }