import os from 'node:os'; import { type Options, type Pool, createPool } from 'generic-pool'; import { Env, Util, AsyncQueue } from '@travetto/runtime'; type IterableSource = Iterable | AsyncIterable; type WorkerExecutor = (input: I, idx: number) => Promise; /** * Worker definition */ export interface Worker { active: boolean; id: unknown; init?(): Promise; execute: WorkerExecutor; destroy?(): Promise; release?(): unknown; } type WorkerFactoryInput = Partial> & { execute: WorkerExecutor }; type WorkerInput = (() => WorkerFactoryInput) | WorkerExecutor; type WorkPoolConfig = Options & { onComplete?: (output: O, input: I, finishIdx: number) => void; onError?(event: Error, input: I, finishIdx: number): (unknown | Promise); shutdown?: AbortSignal; }; const isWorkerFactory = (value: WorkerInput): value is (() => WorkerFactoryInput) => value.length === 0; /** * Work pool support */ export class WorkPool { static MAX_SIZE = os.availableParallelism(); static DEFAULT_SIZE = Math.max(Math.trunc(WorkPool.MAX_SIZE * .75), 4); /** Build worker pool */ static #buildPool(input: WorkerInput, options?: WorkPoolConfig): Pool> { let pendingAcquires = 0; const trace = /@travetto\/worker/.test(Env.DEBUG.value ?? ''); // Create the pool const pool = createPool({ async create() { try { pendingAcquires += 1; const worker: Worker = { id: Util.uuid(), active: true, ...isWorkerFactory(input) ? await input() : { execute: input } }; await worker.init?.(); return worker; } finally { pendingAcquires -= 1; } }, async destroy(worker) { if (trace) { console.debug('Destroying', { pid: process.pid, worker: worker.id }); } return worker.destroy?.(); }, validate: async (worker: Worker) => worker.active }, { evictionRunIntervalMillis: 5000, ...(options ?? {}), max: options?.max ?? WorkPool.DEFAULT_SIZE, min: options?.min ?? 1, }); // Listen for shutdown options?.shutdown?.addEventListener('abort', async () => { while (pendingAcquires) { await Util.nonBlockingTimeout(10); } await pool.drain(); await pool.clear(); }); return pool; } /** * Process a given input source and worker, and fire on completion */ static async run(workerFactory: WorkerInput, source: IterableSource, options: WorkPoolConfig = {}): Promise { const trace = /@travetto\/worker/.test(Env.DEBUG.value ?? ''); const pending = new Set>(); const errors: Error[] = []; let inputIdx = 0; let finishIdx = 0; const pool = this.#buildPool(workerFactory, options); for await (const nextInput of source) { const worker = await pool.acquire()!; if (trace) { console.debug('Acquired', { pid: process.pid, worker: worker.id }); } const completion = worker.execute(nextInput, inputIdx += 1) .then(output => options.onComplete?.(output, nextInput, finishIdx += 1)) .catch(error => { errors.push(error); options?.onError?.(error, nextInput, finishIdx += 1); }) // Catch error .finally(async () => { if (trace) { console.debug('Releasing', { pid: process.pid, worker: worker.id }); } try { if (worker.active) { try { await worker.release?.(); } catch { } await pool.release(worker); } else { await pool.destroy(worker); } } catch { } }); completion.finally(() => pending.delete(completion)); pending.add(completion); } await Promise.all(Array.from(pending)); if (errors.length) { throw errors[0]; } } /** * Process a given input source as an async iterable */ static runStream(worker: WorkerInput, input: IterableSource, options?: WorkPoolConfig): AsyncIterable { const queue = new AsyncQueue(); const result = this.run(worker, input, { ...options, onComplete: (event, value, finishIdx) => { queue.add(event); options?.onComplete?.(event, value, finishIdx); } }); result.finally(() => queue.close()); return queue; } /** * Process a given input source as an async iterable with progress information */ static runStreamProgress(worker: WorkerInput, input: IterableSource, total: number, options?: WorkPoolConfig): AsyncIterable<{ idx: number; value: O; total: number; }> { const queue = new AsyncQueue<{ idx: number, value: O, total: number }>(); const result = this.run(worker, input, { ...options, onComplete: (event, value, finishIdx) => { queue.add({ value: event, idx: finishIdx, total }); options?.onComplete?.(event, value, finishIdx); } }); result.finally(() => queue.close()); return queue; } }