import mitt, {Emitter} from "mitt"; import {Task, TASK_ERRORS, TASK_STATUS} from "./task"; import {debounce, throttle} from "throttle-debounce"; export declare interface TaskQueueConfig { //执行任务的单位时间 ExecuteTaskTime?: number, //最大可执行的任务数量 MaxExecuteTask?: number, //最大重试次数 MaxRetry?: number, //延迟重试时间 单位:毫秒 DelayRetryTime?: number, //最大保存的执行成功任务数量 MaxKeepLiveSuccessTask?: number, //默认忽略的系统错误 会跳过错误 并且返回错误 AllowedRetrySystemErrorNames?: Array AllowedRetrySystemErrorMessages?: Array AllowedRetrySystemErrorCodes?: Array } export declare interface CreateTaskParams { level?: number, isPreCondition?: boolean, parentTaskId?: string, asyncFunction: (Task) => Promise } /** * ## TaskQueue **任务队列管理** * **/ export class TaskQueue { public readonly emitter: Emitter = mitt() /** 已经准备好的任务队列 **/ public readonly readyTaskQueue: Map = new Map() /** 处理中的任务队列 **/ public readonly processingTaskQueue: Map = new Map() /** 执行成功的任务队列 **/ public readonly successTaskQueue: Map = new Map() /** 失败的任务队列 **/ public readonly failTaskQueue: Map = new Map() /** 执行任务的单位时间 **/ public readonly ExecuteTaskTime: number = 888 /** 最大可执行的任务数量 **/ public readonly MaxExecuteTask: number = 8 /** 最大重试次数 **/ public readonly MaxRetry: number = 8 /** 延迟重试时间 单位:毫秒 **/ public readonly DelayRetryTime: number = 1888 /** 最大保存的执行成功任务数量 **/ public readonly MaxKeepLiveSuccessTask: number = 0 /** 需要处理的系统错误 当错误信息匹配的时候 才会进入重试task流程 否则任务不会重试 即任务重试需要满足两个条件:报错+报错信息匹配 **/ public readonly AllowedRetrySystemErrorNames: Array = [] /** 需要处理的系统错误 当错误信息匹配的时候 才会进入重试task流程 否则任务不会重试 即任务重试需要满足两个条件:报错+报错信息匹配 **/ public readonly AllowedRetrySystemErrorMessages: Array = [] /** 需要处理的系统错误 当错误信息匹配的时候 才会进入重试task流程 否则任务不会重试 即任务重试需要满足两个条件:报错+报错信息匹配 **/ public readonly AllowedRetrySystemErrorCodes: Array = [] constructor(config?: TaskQueueConfig) { if (config) { const { ExecuteTaskTime, MaxExecuteTask, MaxRetry, DelayRetryTime, MaxKeepLiveSuccessTask, AllowedRetrySystemErrorNames, AllowedRetrySystemErrorMessages, AllowedRetrySystemErrorCodes } = config this.ExecuteTaskTime = ExecuteTaskTime ?? this.ExecuteTaskTime this.MaxExecuteTask = MaxExecuteTask ?? this.MaxExecuteTask this.MaxRetry = MaxRetry ?? this.MaxRetry this.DelayRetryTime = DelayRetryTime ?? this.DelayRetryTime this.MaxKeepLiveSuccessTask = MaxKeepLiveSuccessTask ?? this.MaxKeepLiveSuccessTask this.AllowedRetrySystemErrorNames = AllowedRetrySystemErrorNames ?? this.AllowedRetrySystemErrorNames this.AllowedRetrySystemErrorMessages = AllowedRetrySystemErrorMessages ?? this.AllowedRetrySystemErrorMessages this.AllowedRetrySystemErrorCodes = AllowedRetrySystemErrorCodes ?? this.AllowedRetrySystemErrorCodes } this.init() } /** 队列初始化 **/ private init() { this.emitter.on("createTask", async (task: Task) => { this.startReadyTaskQueue() }) this.emitter.on("executeTask", throttle(this.ExecuteTaskTime, async (task: Task) => { this.startReadyTaskQueue() }, {})) this.emitter.on("appendToProcessingTaskQueue", async (taskList: Array) => { this.startProcessingTaskQueue() }) this.emitter.on("appendToFailTaskQueue", async (task: Task) => { this.startFailTaskQueue(task) }) this.emitter.on("appendToSuccessTaskQueue", async (task: Task) => { this.startSuccessTaskQueue(task) }) } /** 开始处理成功的队列 **/ private async startSuccessTaskQueue(task: Task) { if (this.successTaskQueue.size > this.MaxKeepLiveSuccessTask) { let list = Array.from(this.successTaskQueue.values()) const oldTask = list.slice(0, list.length - this.MaxKeepLiveSuccessTask) for (let _task of oldTask) { this.successTaskQueue.delete(_task.id) } } } /** 开始处理失败的队列 **/ private async startFailTaskQueue(task: Task): Promise { //任务重试次数超过了最大允许重试次数 丢弃任务 if (task.failTimes >= this.MaxRetry) { this.failTaskQueue.delete(task.id) } else { //定时重试任务 setTimeout(async () => { this.failTaskQueue.delete(task.id) await task.updateStatus(TASK_STATUS.READY) this.readyTaskQueue.set(task.id, task) this.emitter.emit("createTask", task) }, this.DelayRetryTime) } } /** 返回状态为ready的即将执行的任务(执行队列中) **/ private async getProcessingTaskQueueExecutableTasks(): Promise> { let list = Array.from(this.processingTaskQueue.values()) return list.filter(value => value.status === TASK_STATUS.READY) } /** 开始执行任务 **/ private async startProcessingTaskQueue(): Promise { const list = await this.getProcessingTaskQueueExecutableTasks() for (let task of list) { task .execute() .then(async () => { await task.updateStatus(TASK_STATUS.MOVING) this.processingTaskQueue.delete(task.id) this.successTaskQueue.set(task.id, task) await task.updateStatus(TASK_STATUS.SUCCESS) this.emitter.emit("appendToSuccessTaskQueue", task) }) .catch(async (error: Error) => { if (error.message !== TASK_ERRORS.INVALID_STATUS) { if (this.AllowedRetrySystemErrorNames.includes(error?.name) || this.AllowedRetrySystemErrorMessages.includes(error?.message) // @ts-ignore || this.AllowedRetrySystemErrorCodes.includes(error?.code) ) { await task.updateStatus(TASK_STATUS.MOVING) this.processingTaskQueue.delete(task.id) this.failTaskQueue.set(task.id, task) await task.updateStatus(TASK_STATUS.FAIL, error) this.emitter.emit("appendToFailTaskQueue", task) } else { await task.updateStatus(TASK_STATUS.FAIL, error) this.processingTaskQueue.delete(task.id) } } }) .finally(() => { this.emitter.emit("executeTask", task) }) } } /** 返回按照level排序的任务 **/ /** isPreCondition为true的情况下 无视level规则 直接排在普通任务前面 **/ private async getReadyTaskQueueByLevelSort(): Promise> { let readyTaskQueueValues = Array.from(this.readyTaskQueue.values()) readyTaskQueueValues.sort((a, b) => b.level - a.level) readyTaskQueueValues.sort((a, b) => Number(b.isPreCondition) - Number(a.isPreCondition)) return readyTaskQueueValues } private async getPreConditionTaskOnProcessing(): Promise { const taskList = [...this.processingTaskQueue.values()] return taskList.find((value: Task) => value.isPreCondition) } private async findPreConditionTaskChildTasks(parentTaskId: string) { const taskList = [...this.readyTaskQueue.values()] return taskList.filter(value => value.parentTaskId === parentTaskId) } /** 返回状态为ready的准备执行的任务(准备队列中) **/ private async getReadyTaskQueueExecutableTasks(): Promise> { const quantity = await this.getIdleTaskQuantity() const getPreConditionTaskOnProcessing = await this.getPreConditionTaskOnProcessing() let list = await this.getReadyTaskQueueByLevelSort() list = list.filter(value => value.status === TASK_STATUS.READY) const findPreConditionTaskIndex = list.findIndex(value => value.isPreCondition) //如果当前执行的队列里面已经有前置类型的任务 //则允许继续执行该前置任务下面的子级任务 //其他任务继续放在等待队列 直到前置任务完成 if (getPreConditionTaskOnProcessing) { const parentTaskId: string = getPreConditionTaskOnProcessing.id const childTasks = await this.findPreConditionTaskChildTasks(parentTaskId) //默认子级任务可执行数量为总任务最大数量 //避免嵌套任务的时候 会出现子级任务无法执行 例如:普通任务里面包括了前置任务 但是前置任务又包括了子级任务 return childTasks.slice(0, this.MaxExecuteTask) } //没有优先执行的任务 则按照原流程 if (findPreConditionTaskIndex === -1) { return list.slice(0, quantity) } //有且只有一个前置任务可执行 return [list[findPreConditionTaskIndex]] } /** 处理队列空闲任务数量 **/ private async getIdleTaskQuantity(): Promise { return this.MaxExecuteTask - this.processingTaskQueue.size } /** 开始处理准备阶段的任务队列 **/ private async startReadyTaskQueue(): Promise { const list = await this.getReadyTaskQueueExecutableTasks() for (let task of list) { await task.updateStatus(TASK_STATUS.MOVING) this.processingTaskQueue.set(task.id, task) this.readyTaskQueue.delete(task.id) await task.updateStatus(TASK_STATUS.READY) } this.emitter.emit("appendToProcessingTaskQueue", list) } /** 创建任务 **/ public createTask(data: CreateTaskParams): Promise { return new Promise(async (resolve, reject) => { const {asyncFunction, level, isPreCondition, parentTaskId} = data const task = new Task({ level, asyncFunction, isPreCondition, parentTaskId, onSuccess: async (task: Task, result: any) => { resolve(result) }, onFail: async (task: Task, error: Error) => { if (task.status === TASK_STATUS.FAIL) { if ( this.AllowedRetrySystemErrorNames.includes(error?.name) || this.AllowedRetrySystemErrorMessages.includes(error?.message) // @ts-ignore || this.AllowedRetrySystemErrorCodes.includes(error?.code) ) { if (task.failTimes >= this.MaxRetry) { reject(error) } } else { reject(error) } } } }) this.readyTaskQueue.set(task.id, task) this.emitter.emit("createTask", task) }) } }