/////////////////////////////////////////////////////////////////////////////// // Copyright (C) 2019 AcceleratXR, Inc. All rights reserved. /////////////////////////////////////////////////////////////////////////////// import { JobRunner, JobRunnerStatus } from "./JobRunner"; import { Job, JobStatus } from "./Job"; import { ThreadPool } from "@acceleratxr/utilities"; import { JobCoordinator } from "./JobCoordinator"; import * as path from "path"; const uuid = require("uuid"); export default class LocalJobRunner implements JobRunner { private _uid: string = uuid.v4(); private coordinator: JobCoordinator; public readonly jobs: Array; private logger: any; private pool: ThreadPool; constructor(coordinator: JobCoordinator, logger: any, pool: ThreadPool) { this.coordinator = coordinator; this.jobs = new Array(); this.logger = logger; this.pool = pool; this.onMessage = this.onMessage.bind(this); this.pool.on("message", this.onMessage); } public uid(): string { return this._uid; } private onMessage(msg: any): void { if (msg.type === "JobStatus") { const status: JobStatus = new JobStatus(msg); // Did the job experience an unrecoverable failure? if (status.state === "FAILED") { this.stop(status.uid); } this.coordinator.updateJobStatus(status.uid, status); } } public async queue(job: Job): Promise { this.jobs.push(job); this.pool.sendAll({ ...job, type: "NewJob", }); } public async start(): Promise { await this.pool.start(path.join(__dirname, "ThreadWorker.js"), {}); this.coordinator.updateRunnerStatus(this._uid, this.status()); } public async stop(id: string): Promise { let idx: number = -1; for (let i = 0; i < this.jobs.length; i++) { if (this.jobs[i].uid === id) { idx = Number(i); break; } } // Remove the job from our active list if (idx >= 0) { this.jobs.splice(idx, 1); } this.pool.sendAll({ type: "StopJob", id, }); } public async shutdown(): Promise { return this.pool.stop(); } public status(): JobRunnerStatus { return { uid: this._uid, jobs: this.jobs, threads: this.pool.size, }; } }