import * as fs from 'fs'; import * as path from 'path'; import threadManager from './thread-manager'; import { Worker } from 'worker_threads'; import { generateUUID, generateImports, generateLibraries } from './utils'; import { THREAD_FOLDER_PATH, BASE_THREAD, STOP_MESSAGE } from './environment'; import { ThreadStatus, Processor, ProcessorData, WorkerOptions, Library, StoppedThreadError, } from './typing'; import { Queue } from './queue'; export class Thread< Input extends ProcessorData = ProcessorData, Output extends ProcessorData = ProcessorData > { id = generateUUID(); status: ThreadStatus = ThreadStatus.initialization; #worker: Worker | null = null; #isStopTriggered = false; #messageQueue = new Queue((data) => { this.status = ThreadStatus.running; this.#worker?.postMessage(data); }); get canBeStopped(): boolean { return this.#messageQueue.length === 0 && this.status === ThreadStatus.waiting; } get queueLength(): number { return this.#messageQueue.length; } constructor(processor: Processor, opts?: WorkerOptions) { const filePath = this.createThreadFile(processor, opts?.libraries); this.#worker = new Worker(filePath, { workerData: opts?.workerData, }); this.#worker.on('exit', () => { fs.unlinkSync(filePath); this.#worker = null; this.status = ThreadStatus.stopped; }); this.#worker.on('message', () => { this.status = this.#messageQueue.length > 0 ? ThreadStatus.running : ThreadStatus.waiting; this.#messageQueue.resume(); if (this.canBeStopped && this.#isStopTriggered) { this.#worker?.postMessage(STOP_MESSAGE); threadManager.unregister(this); } }); this.#worker.on('online', () => { this.status = ThreadStatus.waiting; }); threadManager.register(this); } private checkIfActionCanBePerformed(): void { if (this.#isStopTriggered) { throw new StoppedThreadError(); } } private createThreadFile(processor: Processor, libraries?: Library[]): string { const filePath = path.join(THREAD_FOLDER_PATH, `${this.id}.js`); let content = BASE_THREAD.replace('$func', processor.toString()); if (libraries) { content = `${generateImports(libraries)}\n${content}`; content = content.replace('$libraries', generateLibraries(libraries)); } else { content = content.replace('$libraries', '{}'); } fs.writeFileSync(filePath, content); return filePath; } subscribe(onMessage: (data: Output) => void): Thread { this.checkIfActionCanBePerformed(); this.#worker?.on('message', onMessage); return this; } catch(onError: (err: Error) => void): Thread { this.checkIfActionCanBePerformed(); this.#worker?.on('error', onError); return this; } stop(func?: Function, force = false): Thread { this.checkIfActionCanBePerformed(); this.#isStopTriggered = true; if (func) { this.#worker?.on('exit', () => { func(); }); } if (force || this.canBeStopped) { this.#worker?.postMessage(STOP_MESSAGE); threadManager.unregister(this); } return this; } pushData(data: Input): Thread { this.checkIfActionCanBePerformed(); if (this.#isStopTriggered) { throw new StoppedThreadError(); } this.#messageQueue.push(data); return this; } } export default Thread;