import type { IDisposable } from '@idlebox/common'; import type { IMyLogger } from '@idlebox/logger'; import { AbstractBaseGraph, type ISummary } from './base-graph.js'; import { Pauser } from './job-graph.graph.pause.js'; import { Starter } from './job-graph.graph.pre.js'; import type { Job } from './job-graph.job.js'; export class JobGraph> extends AbstractBaseGraph { /** @internal */ public concurrency: number = 4; private readonly freezer; private readonly bootstrap; /** @internal */ public declare nodes; private readonly onchangeDisposables: IDisposable[] = []; constructor(nodesIt: Iterable, logger: IMyLogger) { if (!logger.tag) { logger = logger.extend('graph'); } super(nodesIt, logger); for (const node of this.nodes) { this.onchangeDisposables.push( node.onStateChange(() => { this.onNodeChange(node); }), ); } this.bootstrap = Starter.make(this, logger); this.freezer = new Pauser(this, this.logger); } get allStarted(): boolean { for (const node of this.nodes) { if (!node.isStarted()) { return false; } } return true; } public getBlockingDependencies(parent: string): string[] { const r = []; for (const dep of this.dependenciesOf(parent, true)) { const node = this.getNodeByName(dep); if (node.isBlocking()) { r.push(dep); } } return r; } async startup() { this.bootstrap.starter.pump(); try { await this.bootstrap.promise; this.logger.verbose`startup success!`; } catch (e) { this.logger.verbose`startup failed, wait nodes to finish before resolve or reject...`; await this.joinAll(); this.logger.verbose`startup failed, reject now...`; throw e; } } async joinAll() { for (const node of this.nodes) { if (!node.isStarted() || node.isStopped()) { continue; } await node.join().catch((e) => { this.logger.error`error joining node ${node.name}: ${e.message}`; }); } } changeConcurrency(c: number) { if (this.bootstrap.starter.disposed) return; if (this.concurrency === c) return; this.concurrency = c; if (this.bootstrap.starter.isRunning) { this.bootstrap.starter.pump(); } } private newEventCounter = 0; private async onNodeChange(node: T) { this.logger.debug`node has change: ${node.name} -> ${node.state}`; if (node.isFatalError()) { if (this.bootstrap.starter.disposed) { // how to send? } else { } this.stop(); } if (this.bootstrap.starter.disposed) { this.newEventCounter++; } } protected override inspectSummary(): ISummary { const statistics = { 成功: 0, 失败: 0, 队列: 0, 结束: 0, 运行中: 0, phase: '' as string | number }; for (const node of this.nodes) { if (node.isSuccess()) { statistics.成功++; } if (node.isFailling()) { statistics.失败++; } if (!node.isStarted()) { statistics.队列++; } if (node.isStopped()) { statistics.结束++; } if (node.isRunning()) { statistics.运行中++; } } if (!this.bootstrap.starter.disposed) { statistics.phase = ' '; } else if (this.stopped === 1) { statistics.phase = ' '; } else { statistics.phase = this.newEventCounter; } let totalColor = ''; if (statistics.失败) { totalColor = `7;38;5;1`; } else if (statistics.成功) { totalColor = `7;38;5;2`; } else { totalColor = `7`; } return { totalColor, statistics }; } private stopped = 0; async stop() { if (this.stopped) return; this.logger.debug`stopping jobs...`; this.stopped = 1; for (const d of this.onchangeDisposables) { d.dispose(); } this.onchangeDisposables.length = 0; this.bootstrap.starter.dispose(); this.logger.debug` * starter disposed`; await this.freezer.dispose(); this.logger.debug` * nodes resume`; const ps = this.nodes.map((e) => e.stop()); await Promise.all(ps); this.logger.debug` * stop command sent`; await this.joinAll(); this.logger.debug` * all nodes stopped`; this.stopped = 2; } override async dispose() { await this.stop(); await super.dispose(); } }