import * as _ from 'lodash'; import moment from 'moment'; import { sendItemsToES } from './utils/send-es'; export interface IEsQueueOptions { indexName: string; esOptions: any; batch?: number; key: string; esKey: string; queueName?: string; } export class AEsQueue { list: any = []; indexName; batch; esOptions; key; esKey; queueName; constructor ({ esOptions, indexName, batch = 1, key = 'url', esKey, queueName = '', }: IEsQueueOptions) { this.indexName = indexName; this.batch = batch; this.esOptions = esOptions; this.key = key; this.esKey = esKey; this.queueName = queueName; if (_.isUndefined(esOptions.node)) { throw new Error('AEsQueue esOptions.node undefined!!'); } } async add(items) { const fmtItems = items.map((item) => { return { ...item, scraperUpdatedAt: moment().format(), scraperUpdatedAtTimestamp: Date.now(), scraperUpdatedAtDate: moment().format('YYYY-MM-DD'), }; }); this.list = [...this.list, ...fmtItems]; if (this.list.length >= this.batch) { const toSend = [...this.list]; this.list = []; await this.send(toSend); } } async send(items = this.list) { if (items.length === 0) return null; // console.info(`\n%c---------EsQueue send ${items.length} ${this.queueName} --------- \n`, 'background:yellow; color:blue; font-weight:600;\n'); if (this.esOptions) { await sendItemsToES(this.esOptions, this.indexName, items, this.key, this.esKey,); // onDrain 会反复调用,所以每次发送完成后,要去重,否则会重复发送 const keys = items.map((item) => item[this.key]); this.list = this.list.filter(i => keys.includes(i[this.key])); } } }