import { EventEmitter } from "node:events"; import type { WorkerMeta, WorkerMetrics } from "./ipc-types.js"; /** * master.ts — Cluster Master 进程主类 * * Master 进程是纯管理者角色,**不处理任何 HTTP 请求**: * * 1. 计算 Worker 数量(resolveWorkerCount) * 2. Fork Worker 进程(逐个启动,非并行,首个失败 Fail Fast) * 3. 写入 PID 文件 * 4. 监听 Worker 退出事件并决定是否重启(频率保护 + 指数退避) * 5. 处理 OS 信号(SIGTERM / SIGINT / SIGHUP / SIGUSR2) * 6. 执行零停机重启(Rolling Restart) * 7. 运行健康检查(heartbeat monitor) * 8. 所有 Worker 退出后清理 PID 文件并退出 * * 设计要点: * - Worker 逐个启动(避免 DB 连接风暴,首个失败可立即中止) * - 频率保护:窗口内重启次数超限 → 暂停自动重启 * - 指数退避:重启间隔随连续失败次数翻倍(上限 30s) * - Rolling Restart:逐个替换 Worker,始终保持至少 N-1 个 Worker 服务 * - 健康检查:定期检测 Worker 心跳,超时则标记死亡并触发替换 * * @module lib/cluster/master * @see 12a-master.md §3(Master 主类) * @see 12c-lifecycle.md(进程生命周期管理) */ /** * ClusterMaster 配置 * * 从 VextClusterConfig 中提取 Master 需要的字段。 * 通过 constructor 传入,运行时不可变。 */ export interface ClusterMasterConfig { /** Worker 数量配置 */ workers: "auto" | "auto-1" | number; /** Worker 崩溃后是否自动重启 */ autoRestart: boolean; /** 窗口内允许的最大重启次数 */ maxRestarts: number; /** 快速重启检测窗口(毫秒) */ restartWindow: number; /** 重启间隔退避基数(毫秒) */ restartBaseDelay: number; /** 重启间隔上限(毫秒) */ restartMaxDelay: number; /** 健康检查配置 */ healthCheck: { enabled: boolean; /** Master 检查间隔(毫秒) */ interval: number; /** 心跳超时(毫秒) */ timeout: number; }; /** 零停机重启配置 */ reload: { /** 替换下一个 Worker 前的等待时间(毫秒) */ workerDelay: number; /** Worker 就绪超时(毫秒) */ readyTimeout: number; /** Worker 关闭超时(毫秒) */ shutdownTimeout: number; }; /** PID 文件路径 */ pidFile: string; /** 进程标题前缀 */ titlePrefix: string; /** 粘性会话模式 */ sticky: "none" | "ip"; } /** * ClusterMaster 构造函数输入类型(深层 Partial) * * 与 ClusterMasterConfig 相同,但所有字段(包括嵌套对象内部字段)都是可选的。 * constructor 内部会与 DEFAULT_CLUSTER_CONFIG 深合并后生成完整的 ClusterMasterConfig。 */ export type ClusterMasterInput = Partial & { healthCheck: Partial; reload: Partial; }>; /** * ClusterMaster 事件 */ export interface ClusterMasterEvents { /** Worker 就绪 */ "worker-ready": { workerId: number; pid: number; }; /** Worker 退出 */ "worker-exit": { workerId: number; code: number | null; signal: string | null; }; /** 重启被限流 */ "restart-throttled": { workerId: number; code: number | null; signal: string | null; }; /** Rolling restart 开始 */ "reload-start": { trigger: string; workerCount: number; }; /** Rolling restart 完成 */ "reload-complete": { replaced: number; total: number; }; /** 健康检查:Worker 心跳超时 */ "heartbeat-timeout": { workerId: number; lastHeartbeat: number; }; /** 所有 Worker 已退出 */ "all-workers-dead": undefined; } export declare const DEFAULT_CLUSTER_CONFIG: ClusterMasterConfig; export declare class ClusterMaster extends EventEmitter { /** Worker 元数据表(key = cluster.Worker.id) */ private workers; private nextWorkerId; /** 窗口内的重启时间戳(用于频率保护) */ private restartTimestamps; /** 是否正在优雅关闭 */ private isShuttingDown; /** 是否正在执行 Rolling Restart */ private isReloading; /** 健康检查定时器 */ private healthCheckTimer; /** 已注册的信号处理器引用(用于 cleanup 时移除) */ private signalHandlers; /** 最新一次收到的 Worker 指标快照 */ private workerMetrics; /** 运行时配置(只读) */ readonly config: ClusterMasterConfig; /** 实际计算出的 Worker 数量 */ private workerCount; constructor(config?: ClusterMasterInput); /** * start — 启动 Cluster Master * * 完整启动流程: * 1. 计算 Worker 数量 * 2. 设置进程标题 * 3. 写入 PID 文件 * 4. 配置 cluster 调度策略 * 5. 串行 fork Worker(首个失败 Fail Fast) * 6. 注册 Worker exit 事件 * 7. 注册 OS 信号处理 * 8. 启动健康检查(如果启用) * * @throws 首个 Worker 启动失败时抛出错误 */ start(): Promise; /** * gracefulShutdown — 优雅关闭所有 Worker * * 流程: * 1. 标记 isShuttingDown(阻止 auto-restart 和 rolling restart) * 2. 停止健康检查定时器 * 3. 向所有存活 Worker 发送 shutdown 消息 * 4. 等待所有 Worker 退出(超时则 SIGKILL) * 5. 清理 PID 文件 * 6. Master 退出 * * @param trigger 触发源(如 'SIGTERM'、'SIGINT',用于日志输出) */ gracefulShutdown(trigger: string): Promise; /** * rollingRestart — 零停机滚动重启所有 Worker * * 流程(对每个旧 Worker 串行执行): * 1. Fork 新 Worker(加载最新代码) * 2. 等待新 Worker ready(超时 → 杀新 Worker,跳过本轮) * 3. 新 Worker 就绪 → 通知旧 Worker shutdown * 4. 等待旧 Worker 退出(超时 → SIGKILL) * 5. 等待 workerDelay 再处理下一个 * * 关键保证: * - 在替换过程中始终保持至少 N-1 个 Worker 服务 * - 替换失败时旧 Worker 保持运行(不会减少可用 Worker) * - 如果在重启过程中收到 SIGTERM,立即停止替换并转入 gracefulShutdown * * @param trigger 触发源(如 'SIGHUP'、'SIGUSR2',用于日志输出) */ rollingRestart(trigger: string): Promise; /** * broadcast — 向所有存活 Worker 广播消息 * * @param payload 广播内容(必须可 JSON 序列化) */ broadcast(payload: unknown): void; /** * getWorkerCount — 获取当前 Worker 总数 */ getWorkerCount(): number; /** * getReadyWorkerCount — 获取当前处于 ready 状态的 Worker 数量 */ getReadyWorkerCount(): number; /** * getWorkerMetas — 获取所有 Worker 的元数据快照(只读) */ getWorkerMetas(): ReadonlyMap>; /** * getWorkerMetrics — 获取最近一次上报的 Worker 指标 */ getLatestMetrics(): ReadonlyMap>; /** * isRunning — Master 是否正在运行(未处于关闭/重载状态) */ isRunning(): boolean; /** * getTargetWorkerCount — 获取配置计算出的目标 Worker 数量 */ getTargetWorkerCount(): number; /** * forkWorker — Fork 单个 Worker 并等待就绪 * * 流程: * 1. cluster.fork() 创建新 Worker 进程 * 2. 设置 WorkerMeta(state = 'starting') * 3. 发送 set-title 消息 * 4. 注册 IPC 消息监听 * 5. 等待 Worker 发送 'ready' 消息(超时则拒绝) * * 环境变量传递: * - VEXT_WORKER_ID: Worker 编号(从 1 开始递增) * - VEXT_MODE: 'start'(触发 bootstrap 自执行入口) * * @returns fork 出的 cluster.Worker 实例 * @throws Worker 在 readyTimeout 内未就绪或在就绪前退出 */ private forkWorker; /** * waitForWorkerReady — 等待 Worker 发送 'ready' 消息 * * 使用 Promise 封装事件驱动的等待逻辑: * - 收到 'ready' → resolve * - 超时 → reject * - Worker 提前退出 → reject * * @param worker 目标 Worker * @throws 超时或 Worker 提前退出 */ private waitForWorkerReady; /** * handleWorkerMessage — 处理 Worker → Master 的 IPC 消息 * * 消息类型: * - ready: Worker 就绪(由 waitForWorkerReady 处理) * - heartbeat: 更新 lastHeartbeat 时间戳 * - metrics: 存储最新指标快照 * - request-restart: Worker 主动请求重启(如内存超阈值) */ private handleWorkerMessage; /** * replaceWorker — 替换单个 Worker(用于 request-restart 场景) * * 流程: * 1. Fork 新 Worker * 2. 等待新 Worker ready * 3. 通知旧 Worker shutdown * 4. 等待旧 Worker 退出 */ private replaceWorker; /** * handleWorkerExit — Worker 退出事件处理 * * 决策树: * 1. 正在关闭 → 跳过(gracefulShutdown 已处理) * 2. 状态为 draining → 跳过(正常替换流程) * 3. autoRestart=false → 检查是否所有 Worker 都已退出 * 4. 频率保护命中 → 暂停自动重启 + 发射事件 * 5. 正常场景 → 指数退避后重启 */ private handleWorkerExit; /** * isRestartThrottled — 检测是否超过重启频率限制 * * 在 restartWindow 毫秒窗口内,重启次数是否超过 maxRestarts。 * 每次调用时记录当前时间戳并清理过期记录。 */ private isRestartThrottled; /** * calculateRestartDelay — 计算指数退避延迟 * * delay = baseDelay × 2^(consecutiveRestarts - 1) * 上限 = maxDelay(默认 30s) * * 避免崩溃循环时频繁 fork 消耗系统资源。 */ private calculateRestartDelay; /** * registerSignals — 注册 OS 信号处理器 * * | 信号 | 行为 | * |---------|---------------| * | SIGTERM | 优雅关闭 | * | SIGINT | 同 SIGTERM | * | SIGHUP | Rolling Restart | * | SIGUSR2 | 同 SIGHUP | * * Windows 不支持 SIGHUP / SIGUSR2,仅注册 SIGTERM / SIGINT。 */ private registerSignals; /** * removeSignalHandlers — 移除所有已注册的信号处理器 */ private removeSignalHandlers; /** * startHealthCheck — 启动定期健康检查 * * 检查逻辑(每 interval 毫秒执行一次): * 1. 遍历所有 ready 状态的 Worker * 2. 检查 lastHeartbeat 是否超过 timeout * 3. 超时 → 标记 Worker 为死亡,SIGKILL 终止 * (handleWorkerExit 会触发自动重启) */ private startHealthCheck; /** * stopHealthCheck — 停止健康检查定时器 */ private stopHealthCheck; /** * waitForWorkerExit — 等待 Worker 退出(带超时保护) * * 超时后对 Worker 发送 SIGKILL 强制终止。 */ private waitForWorkerExit; /** * checkAllDead — 检查是否所有 Worker 已退出 * * 当所有 Worker 都已退出且不在关闭/重载流程中时, * 发射 'all-workers-dead' 事件。 */ private checkAllDead; /** * cleanup — 清理资源 * * 关闭定时器 + 移除信号处理器 + 删除 PID 文件。 */ private cleanup; }