const DefaultTaskQueueOptions = { limit: 10, } /** * 对数据数组进行并发限制的遍历 * * 返回结果中的 result 与 datas 一一对应,如果某个数据的处理失败,对应的 result 为 undefined * 处理函数(handler)中的错误默认会被忽略,不会导致整个函数失败,错误会被记录到 errors 中, * 可以通过参数 throwWithErrors: true 来改变这个行为,遇到错误时会抛出异常 * * @example * let urls = ["http://1.com", "http://2.com", "http://3.com"] * let reuslt = await eachWithLimit(urls, async (url) => {}, options:{ limit: 10 }}) */ export async function eachWithLimit( datas: TData[], handler: (data: TData, index: number) => Promise, options?: { /** 并发限制 */ limit?: number /** 遇到错误时是否抛出异常,终止队列 */ throwWithErrors?: boolean } ): Promise<{ result: (TDataRe | undefined)[]; errors: { [index: string]: Error } }> { let { limit } = Object.assign({}, DefaultTaskQueueOptions, options) let promises: Promise[] = [] let allPromises: Promise[] = [] let results: (TDataRe | undefined)[] = [] let errors: { [index: string]: any } = {} let throwError: any let shouldStop = false let rejectWithError: ((error: any) => void) | null = null const errorPromise = new Promise((_, reject) => { rejectWithError = reject }) let promises_ready_resolve: any let promises_ready: Promise for (let i = 0; i < datas.length; i++) { if (options?.throwWithErrors && shouldStop) { break } let promise = new Promise((resolve, reject) => { handler(datas[i], i) .then((result) => { results[i] = result resolve() }) .catch((error) => { results[i] = undefined errors[i] = error if (options?.throwWithErrors) { throwError = error shouldStop = true if (rejectWithError) rejectWithError(error) if (promises_ready_resolve) promises_ready_resolve() } resolve() }) .finally(() => { let index = promises.indexOf(promise) promises.splice(index, 1) if (promises.length < limit) { if (promises_ready_resolve) promises_ready_resolve() } }) }) if (options?.throwWithErrors && throwError) { throw throwError } promises.push(promise) allPromises.push(promise) if (promises.length >= limit) { await tick() if (options?.throwWithErrors && shouldStop) { break } } } function tick() { promises_ready = new Promise((resolve) => { promises_ready_resolve = resolve }) return promises_ready } if (options?.throwWithErrors) { try { await Promise.race([Promise.all(allPromises), errorPromise]) } catch (error) { throw error } // 如果没有触发 race 的 reject,但仍有错误记录,兜底抛出 if (throwError) throw throwError } else { await Promise.all(allPromises) } return { result: results, errors, } }