import Queue, { RateLimiter } from 'bull'; import * as _ from 'lodash'; // export type TaskRunnder = (task: T, callback) => any | void export type TaskRunnder = (task, callback) => any | void // export class AScraperQueue { export class AScraperQueue { Q: Queue.Queue; uniqueCache: Record = {}; onFinish; N: number; queueLength = 0; queueLengtnLoading = false; queueName = ''; onDrain; logging = true; constructor ({ queueName, runner, onDrain, onFinish, limiter, N = 2, logging = true }: { queueName: string, runner: TaskRunnder; onDrain?: (...any) => void; onFinish?: (...any) => void; N?: number, limiter?: RateLimiter, logging?: boolean }) { this.onDrain = onDrain; this.N = N; this.queueName = queueName; console.info('queueName', queueName); this.logging = logging; this.updateQueueLength = _.throttle(this.updateQueueLength, 3000, { // leading: true, trailing: true, }); console.log('process.env.REDIS_FULL_URL', process.env.REDIS_FULL_URL); this.Q = new Queue(queueName, process.env.REDIS_FULL_URL || 'redis://127.0.0.1:6379', { limiter: limiter || { max: 1, duration: 300, bounceBack: false, }, }, ); this.Q.process(N, async function (job, done) { if (this.logging) { console.info(`\n%c--------- process ${queueName}--------- `, 'background:yellow; color:blue; font-weight:600;'); } const task = job.data; try { const result = await runner(task, done); onFinish && onFinish(null, result, task); } catch (error: any) { console.error('error', error); } done(); // job.data contains the custom data passed when the job was created // job.id contains id of this job. // transcode video asynchronously and report progress // job.progress(42); // or pass it a result // done(null, { task }); // If the job throws an unhandled exception it is also handled correctly // throw new Error('some unexpected error'); }); this.Q.on('drained', async () => { this.updateQueueLength(); this.tryOnDrain(); }); this.Q.on('completed', async () => { this.updateQueueLength(); this.tryOnDrain(); }); } tryOnDrain() { if (this.queueLength === 0) { this.onDrain && this.onDrain(); } } updateQueueLength() { this.queueLengtnLoading = true; this.length() .then((len) => { this.queueLength = len; console.info('this.queueLength', this.queueName, this.queueLength); this.queueLengtnLoading = false; this.tryOnDrain(); }); } clearQueue() { console.info('\n%c--------- clearQueue --------- \n', 'background:yellow; color:blue; font-weight:600;'); // 清空所有 return this.Q.empty(); } async empty() { console.info('\n%c--------- empty --------- \n', 'background:yellow; color:blue; font-weight:600;'); return this.Q.empty(); } async obliterate() { console.info('\n%c--------- obliterate --------- \n', 'background:yellow; color:blue; font-weight:600;'); return this.Q.obliterate({ force: true }); } add(task) { this.Q.add(task); console.info(`\n%c--------- add task ${JSON.stringify(task)}--------- \n`, 'background:yellow; color:blue; font-weight:600;'); this.updateQueueLength(); return true; } async length() { return this.Q.count(); } async running() { return this.Q.getActiveCount(); } }