import type {NapiWorkerPool as INapiWorkerPool} from '@atlaspack/types'; import {Worker} from 'worker_threads'; import path from 'path'; import process from 'process'; // @ts-expect-error TS2724 import type {Transferable} from '@atlaspack/rust'; import {getAvailableThreads} from '@atlaspack/rust'; import logger from '@atlaspack/logger'; import type {LogEvent} from '@atlaspack/types-internal'; const WORKER_PATH = path.join(__dirname, 'worker', 'index.js'); const ATLASPACK_NAPI_WORKERS = process.env.ATLASPACK_NAPI_WORKERS && parseInt(process.env.ATLASPACK_NAPI_WORKERS, 10); export type NapiWorkerPoolOptions = { workerCount?: number; }; export class NapiWorkerPool implements INapiWorkerPool { #workers: Worker[]; #napiWorkers: Array>; #workerCount: number; constructor({workerCount}: NapiWorkerPoolOptions = {workerCount: undefined}) { // @ts-expect-error TS2322 this.#workerCount = workerCount ?? ATLASPACK_NAPI_WORKERS ?? // Default to a maximum of 4 workers as performance worsens beyond that // point in most cases Math.min(getAvailableThreads(), 4); if (!this.#workerCount) { // TODO use main thread if workerCount is 0 } this.#workers = []; this.#napiWorkers = []; for (let i = 0; i < this.#workerCount; i++) { let worker = new Worker(WORKER_PATH); this.#workers.push(worker); this.#napiWorkers.push( new Promise((res: (result: Promise) => void) => worker.once('message', res), ), ); // Re-emit log events from the worker thread into the main-thread logger // so they reach reporters and are subject to log-level filtering. worker.on('message', (message: unknown) => { if ( message != null && typeof message === 'object' && (message as {type?: unknown}).type === 'logEvent' ) { const event = (message as {type: string; event: LogEvent}).event; switch (event.level) { case 'verbose': logger.verbose(event.diagnostics); break; case 'info': logger.info(event.diagnostics); break; case 'warn': logger.warn(event.diagnostics); break; case 'error': logger.error(event.diagnostics); break; case 'progress': logger.progress(event.message); break; } } }); } } clearAllWorkerState(): Promise { return Promise.all( this.#workers.map( (worker) => new Promise((res) => { worker.postMessage('clearState'); // Set up a message handler that only resolves on 'stateCleared' // and ignores all other messages (like the initial napiWorker Transferable) const messageHandler = (message: unknown) => { if (message === 'stateCleared') { worker.removeListener('message', messageHandler); res(); } else if ( message != null && typeof message === 'object' && (message as {type?: unknown}).type === 'logEvent' ) { // logEvent messages are forwarded asynchronously from the worker // logger bridge and are expected at any time; ignore them here. } else { // Log unexpected messages for debugging // eslint-disable-next-line no-console console.warn( `[NapiWorkerPool] Received unexpected message during clearAllWorkerState: ${JSON.stringify(message)} (type: ${typeof message})`, ); // Keep listening for 'stateCleared' - don't remove the listener } }; worker.on('message', messageHandler); }), ), ); } workerCount(): number { return this.#workerCount; } getWorkers(): Promise> { return Promise.all(this.#napiWorkers); } shutdown(): void { for (const worker of this.#workers) { worker.terminate(); } } }