/////////////////////////////////////////////////////////////////////////////// // Copyright (C) 2019 AcceleratXR, Inc. All rights reserved. /////////////////////////////////////////////////////////////////////////////// import { JobRunner, JobRunnerStatus } from "./JobRunner"; import { Job, JobStatus } from "./Job"; import { ThreadPool } from "@acceleratxr/utilities"; import LocalJobRunner from "./LocalJobRunner"; import * as ms from "ms"; /** * The `JobCoordinator` is responsible for coordinating all work across all available runners within a cluster. */ export class JobCoordinator { public readonly jobs: Map = new Map(); public readonly jobStatus: Map = new Map(); public readonly runners: Map = new Map(); public readonly runnerStatus: Map = new Map(); private logger: any; private totalThreads: number = 0; /** * Initializes the instance of `JobCoordinator` with any defaults. * * @param localRunner Set to `true` to run jobs locally as well as remotely, otherwise set to `false` to only * coordinate remote jobs. */ public async init(logger: any, localRunner: boolean = true): Promise { logger.info("Initializing job coordinator..."); this.logger = logger; if (localRunner) { logger.info("Creating local job runner..."); const pool: ThreadPool = new ThreadPool(); const runner: LocalJobRunner = new LocalJobRunner(this, logger, pool); await this.register(runner); } } /** * Queues the given job with any available runners. * * @param job The job to queue for execution. */ public queue(job: Job): void { if (this.totalThreads === 0) { throw new Error("No available runners to execute this job."); } this.logger.info("New job queued: " + job.name); job.startTime = Date.now(); this.jobs.set(job.uid, job); this.jobStatus.set( job.uid, new JobStatus({ uid: job.uid, name: job.name, successes: 0, failures: 0, vus: job.vus, state: "RUNNING", }) ); // We need to compute the number of vus per thread before we pass in the job definition job = new Job({ ...job, vus: job.vus / this.totalThreads, }); // Queue the job with every available runner for (const runner of this.runners.values()) { runner.queue(job); } } /** * Stops the active job with the given unique identifier. * @param id The unique identifier of the job to stop. */ public stop(id: string): void { for (const runner of this.runners.values()) { runner.stop(id); } } /** * Stops all active jobs and shuts down the coordinator. */ public shutdown(): void { for (const runner of this.runners.values()) { runner.shutdown(); } } /** * Registers the given runner as available to perform work. * @param runner */ public async register(runner: JobRunner): Promise { this.logger.info("Registering new job runner: " + runner.uid()); this.runners.set(runner.uid(), runner); this.logger.info("Starting job runner..."); await runner.start(); // Queue all existing jobs with the new runner for (const job of this.jobs.values()) { runner.queue(job); } } /** * Removes the given runner from being availble to perform work. * @param runner */ public unregister(runner: JobRunner): void { this.runners.delete(runner.uid()); this.runnerStatus.delete(runner.uid()); } /** * Updates the status of a given job. */ public updateJobStatus(uid: string, status: JobStatus): void { const job: Job = this.jobs.get(uid); if (job) { // The status is a delta and must be merged with the existing copy let current: JobStatus = this.jobStatus.get(uid); if (current) { current.successes = current.successes ? current.successes + status.successes : status.successes; current.failures = current.failures ? current.failures + status.failures : status.failures; current.state = status.state; } else { current = status; } // If the job failed we consider it unrecoverable. When a job has completed all its target VU executions then // the job is considered complete. Similarly, when the job completes its duration we end as well. const completeTime: number = job.duration ? job.startTime + (ms(job.duration) as number) : 0; if ( current.state === "FAILED" || (!job.duration && current.successes + current.failures >= job.vus) || (job.duration && Date.now() >= completeTime) ) { this.logger.info("Job completed: " + uid); this.jobs.delete(uid); current.state = current.state === "RUNNING" ? "COMPLETE" : current.state; } this.jobStatus.set(uid, current); } } /** * Updates the status of a given job runner. */ public updateRunnerStatus(uid: string, status: JobRunnerStatus): void { this.runnerStatus.set(uid, status); // Update the sum of total threads let sum: number = 0; for (const stat of this.runnerStatus.values()) { sum += stat.threads; } this.totalThreads = sum; } } const coordinator: JobCoordinator = new JobCoordinator(); export default coordinator;