import * as _ from 'lodash'; import moment from 'moment'; import { mongoUrlAws, getCollection, createOrUpdateMongoItems, createIndex } from 'nestjs-dynamoose-redis'; export interface IMongoQueueOptions { mongoUrl?: string; dbName?: string; collectionName: string; key: string | string[]; batch?: number; queueName: string } export class AMongoQueue { list: any = []; indexNames; batch; dbName; collectionName; key; mongoUrl; queueName; constructor ({ dbName = 'data', collectionName, batch = 1, key, mongoUrl = mongoUrlAws, queueName }) { this.dbName = dbName; this.collectionName = collectionName; this.batch = batch; this.key = key; this.queueName = queueName; if (_.isUndefined(mongoUrl)) { throw new Error('AMongoQueue mongoUrl undefined!!'); } this.mongoUrl = mongoUrl; createIndex({ dbName, colName: collectionName, mongoUrl, key, }); } async add(items: any[]) { const fmtItems = items.map((item) => { return { ...item, scraperUpdatedAt: moment().format(), scraperUpdatedAtTimestamp: Date.now(), scraperUpdatedAtDate: moment().format('YYYY-MM-DD'), }; }); this.list = [...this.list, ...fmtItems]; if (this.list.length >= this.batch) { const toSend = [...this.list]; this.list = []; await this.send(toSend); } } async send(items: any[] = this.list) { if (items.length === 0) return null; // console.info(`\n%c---------MongoQueue send ${this.queueName} ${items.length} --------- \n`, 'background:yellow; color:blue; font-weight:600;\n'); if (items.length > 0) { if (this.key) { const { client, col } = await getCollection(this.dbName, this.collectionName, this.mongoUrl); await createOrUpdateMongoItems(col, items, this.key); await client.close(); const keys = items.map((item) => item[this.key]); this.list = this.list.filter(i => keys.includes(i[this.key])); } } } }