import { Config } from '@jest/types'; import { SerializableError } from '@jest/test-result'; import chalk from 'chalk'; import exit from 'exit'; import Worker from 'jest-worker'; import throat from 'throat'; import runTest from './runTest'; import { worker } from './testWorker'; import { TEST_WORKER_PATH } from './utils/paths'; import { OnTestFailure, OnTestStart, OnTestSuccess, SerializableResolver, Test, TestRunnerContext, TestRunnerOptions, TestWatcher, WatcherState, WorkerInterface } from './types'; export default class TestRunner { private readonly _globalConfig: Config.GlobalConfig; private readonly _context: TestRunnerContext; constructor(globalConfig: Config.GlobalConfig, context?: TestRunnerContext) { this._globalConfig = globalConfig; this._context = context || {}; } async runTests( tests: Test[], watcher: TestWatcher, onStart: OnTestStart, onResult: OnTestSuccess, onFailure: OnTestFailure, options: TestRunnerOptions ): Promise { return await ( options.serial ? this._createInBandTestRun(tests, watcher, onStart, onResult, onFailure) : this._createParallelTestRun(tests, watcher, onStart, onResult, onFailure) ); } private async _createInBandTestRun( tests: Test[], watcher: TestWatcher, onStart: OnTestStart, onResult: OnTestSuccess, onFailure: OnTestFailure ) { process.env.JEST_WORKER_ID = '1'; const mutex = throat(1); return tests.reduce( (promise, test) => mutex(() => promise .then(async () => { if (watcher.isInterrupted()) { throw new CancelRun(); } await onStart(test); return await runTest( test, this._globalConfig, test.context.config, test.context.resolver, this._context ); }) .then(result => onResult(test, result)) .catch(err => onFailure(test, err)) ), Promise.resolve() ); } private async _createParallelTestRun( tests: Test[], watcher: TestWatcher, onStart: OnTestStart, onResult: OnTestSuccess, onFailure: OnTestFailure ) { const resolvers: Map = new Map(); for (const test of tests) { if (!resolvers.has(test.context.config.name)) { resolvers.set(test.context.config.name, { config: test.context.config, serializableModuleMap: test.context.moduleMap.toJSON() }); } } const worker = new Worker(TEST_WORKER_PATH, { exposedMethods: ['worker'], forkOptions: {stdio: 'pipe'}, maxRetries: 3, numWorkers: this._globalConfig.maxWorkers, setupArgs: [ { serializableResolvers: Array.from(resolvers.values()) } ] }) as WorkerInterface; if (worker.getStdout()) worker.getStdout().pipe(process.stdout); if (worker.getStderr()) worker.getStderr().pipe(process.stderr); const mutex = throat(this._globalConfig.maxWorkers); // Send test suites to workers continuously instead of all at once to track // the start time of individual tests. const runTestInWorker = (test: Test) => ( mutex(async () => { if (watcher.isInterrupted()) { return Promise.reject(); } await onStart(test); return worker.worker({ config: test.context.config, context: { ...this._context, changedFiles: this._context.changedFiles && Array.from(this._context.changedFiles) }, globalConfig: this._globalConfig, test, path: test.path }); }) ); const onError = async (err: SerializableError, test: Test) => { await onFailure(test, err); if (err.type === 'ProcessTerminatedError') { console.error( 'A worker process has quit unexpectedly! ' + 'Most likely this is an initialization error.' ); exit(1); } }; const onInterrupt = new Promise((_, reject) => { watcher.on('change', (state: WatcherState) => { if (state.interrupted) { reject(new CancelRun()); } }); }); const runAllTests = Promise.all( tests.map(test => runTestInWorker(test) .then(testResult => onResult(test, testResult)) .catch(error => onError(error, test)) ) ); const cleanup = async () => { // await worker.end(); // todo:: figure out what is leaking and how to fix it const {forceExited} = await worker.end(); if (forceExited) { console.log( chalk.yellow( 'A worker process has failed to exit gracefully and has been force exited. ' + 'This is likely caused by tests leaking due to improper teardown. ' + 'Try running with --runInBand --detectOpenHandles to find leaks.' ) ); } }; return Promise.race([runAllTests, onInterrupt]).then(cleanup, cleanup); } } class CancelRun extends Error { constructor(message?: string) { super(message); this.name = 'CancelRun'; } }