/////////////////////////////////////////////////////////////////////////////// // Copyright (C) 2019 AcceleratXR, Inc. All rights reserved. /////////////////////////////////////////////////////////////////////////////// import { parentPort, workerData, threadId } from "worker_threads"; import { Logger } from "@acceleratxr/service_utilities"; import { Job, JobStatus } from "./Job"; import * as ms from "ms"; const requireFromString = require("require-from-string"); // This enables TypeScript script compilation require("ts-node").register(); console.log("Initializing local thread worker: " + threadId); const jobs: Map = new Map(); const scripts: Map = new Map(); const sendStatus = function(job: Job, status: JobStatus): void { parentPort.postMessage({ ...status, type: "JobStatus", uid: job.uid, name: job.name, vus: job.vus, }); }; /** * Executes the script of the given job once and reports the status to the job runner. * @param job The job to execute. */ const runJob = function(job: Job): void { try { // Execute the job's script const test: Function = scripts.get(job.uid); const result: any = test(job.args); if (result instanceof Promise) { result.then(() => { // Notify the job runner of our outcome sendStatus( job, new JobStatus({ successes: 1, state: "RUNNING", }) ); // Schedule the next execution scheduleRun(job); }); result.catch(error => { console.error("Failed to execute job script. Error=" + error); // Notify the job runner of our outcome sendStatus( job, new JobStatus({ failures: 1, state: "RUNNING", error, }) ); // Schedule the next execution scheduleRun(job); }); } else { // Notify the job runner of our outcome sendStatus( job, new JobStatus({ successes: 1, state: "RUNNING", }) ); // Schedule the next execution scheduleRun(job); } } catch (error) { console.error("Failed to execute job script. Error=" + error); // Notify the job runner of our outcome sendStatus( job, new JobStatus({ failures: 1, state: "RUNNING", error, }) ); scheduleRun(job); } }; /** * Schedules the job to be run at the next earliest interval. * * @param job The job to schedule. */ const scheduleRun = function(job: Job): void { // Verify that the job isn't completed either because we've reached the desired number of vus or the full duration // has elapsed. const completeTime: number = job.duration ? job.startTime + (ms(job.duration) as number) : 0; if ( !jobs.has(job.uid) || (!job.duration && jobs.get(job.uid) >= job.vus) || (job.duration && Date.now() >= completeTime) ) { // Job is complete sendStatus( job, new JobStatus({ state: "COMPLETE", }) ); jobs.delete(job.uid); return; } // Schedule the next execution setTimeout(() => runJob(job), 10); }; /** * Starts execution of the given job at the next available update interval. * @param job The job that will begin execution. */ const startJob = function(job: Job): void { if (job.vus >= 1) { // Attempt to compile the script try { const script: any = requireFromString(job.script); if (script && script.default instanceof Function) { // Store the default function for future job execution scripts.set(job.uid, script.default); } else { throw new Error("Script does not have a default function exported."); } } catch (error) { console.error(error.message); sendStatus( job, new JobStatus({ state: "FAILED", error, }) ); return; } console.log(`Starting job: name=${job.name}, vus=${job.vus}, duration=${job.duration}`); job.startTime = Date.now(); jobs.set(job.uid, 0); // Queue the job for execution for each vus setTimeout(() => { for (let i = 0; i < job.vus; i++) { runJob(job); } }, 10); } else { console.error("Can't start new job with no vus set."); } }; /** * Stops execution of the given job and reports the final status to the job runner. * @param job The job to stop. * @param state The final status to notify the job runner with. */ const stopJob = function(job: Job, status: JobStatus = undefined): void { jobs.delete(job.uid); if (!status) { status = new JobStatus({ state: "COMPLETE", }); } sendStatus( job, new JobStatus({ ...status, }) ); }; // Set a dummy interval to keep the thread alive at all times const interval: NodeJS.Timeout = setInterval(() => {}, 1000); // Register for parent messages parentPort.on("message", msg => { if (msg.type === "NewJob") { const job: Job = new Job(msg); startJob(job); } else if (msg.type === "StopJob") { jobs.delete(msg.id); } }); parentPort.on("close", () => { console.log("Shutting down local thread worker: " + threadId); clearInterval(interval); });