import type { RateLimiter } from 'bull'; import _ from 'lodash'; import moment from 'moment'; import { mongoUrlAws } from 'nestjs-dynamoose-redis'; import { AScraperQueue } from './AScraperQueue'; import { createRedisClient } from './utils/redis'; import cheerio from 'cheerio'; import type Redis from 'ioredis'; import { doRequest } from './utils/request'; import { AEsApiQueue, IEsApiQueueOptions } from './AEsApiQueue'; import { AEsQueue, IEsQueueOptions } from './AEsQueue'; import { AMongoApiQueue, IMongoApiQueueOptions } from './AMongoApiQueue'; import { AMongoQueue, IMongoQueueOptions } from './AMongoQueue'; import { APgApiQueue, IPgApiQueueOptions } from './APgApiQueue'; import { AUniqCache } from './AUniqCache'; import { AUniqCacheDay } from './UniqCacheDay'; import { messageError, deleteKeysByPattern } from './utils'; export abstract class AScraper { queue: AScraperQueue; // @ts-ignore redis: Redis; demoHtml; esQueueList: AEsQueue[] = []; mongoQueueList: AMongoQueue[] = []; esApiQueueList: AEsApiQueue[] = []; mongoApiQueueList: AMongoApiQueue[] = []; pgApiQueueList: APgApiQueue[] = []; queueName = ''; timer; unique = false; uniqCache: AUniqCache | AUniqCacheDay; constructor ({ demoHtml = '', esQueueOptions, esApiQueueOptions, mongoQueueOptions, mongoApiQueueOptions, pgApiQueueOptions, N, limiter, queueName, logging = true, collectionName, unique = false, mongoUrl = mongoUrlAws, } : { demoHtml?: string, mongoQueueOptions?: IMongoQueueOptions[] esQueueOptions?: IEsQueueOptions[], esApiQueueOptions?: IEsApiQueueOptions[], mongoApiQueueOptions?: IMongoApiQueueOptions[], pgApiQueueOptions?: IPgApiQueueOptions[], N?: number, limiter?: RateLimiter, queueName?, logging? collectionName?: string unique?: boolean, mongoUrl?: string }) { this.queueName = queueName; this.demoHtml = demoHtml; this.unique = unique; this.initEsQueue(esQueueOptions); this.initEsApiQueue(esApiQueueOptions); this.initMongoApiQueue(mongoApiQueueOptions); this.initPgApiQueue(pgApiQueueOptions); if (mongoQueueOptions && mongoQueueOptions.length > 0) { this.initMongoQueue(mongoQueueOptions); } else if (collectionName) { const options = [{ collectionName: collectionName, key: 'url', batch: 1, queueName: queueName, mongoUrl, }]; this.initMongoQueue(options); } this.redis = createRedisClient(); this.onDrainQueue = _.throttle(this.onDrainQueue.bind(this), 1000, { leading: true, trailing: true, }); this.onDrain = _.throttle(this.onDrain.bind(this), 1000, { leading: true, trailing: true, }); this.queue = new AScraperQueue({ queueName: queueName || this.constructor.name, limiter, N, runner: this.runner.bind(this), onDrain: (a) => { this.onDrainQueue(); this.onDrain(); }, onFinish: async (error, result, task) => { this.onFinish(error, result, task); // }, logging: logging, }); // 必须有这行,否则 undefined.runCallback // this.runner = this.runner.bind(this); this.uniqCache = new AUniqCache(); } onFinish(error?, result?, task?) { // } setQueueName(name) { this.queueName = name; } onDrain() { // } get queueLenLoading() { return this.queue.queueLengtnLoading; } async runner(task, callback) { const { url } = task; if (this.unique) { if (this.uniqCache.has(url)) { this.runCallback(null, callback); return []; } else { this.uniqCache.add(url); } } try { const html = await this.getHTML(url, task); const $ = cheerio.load(html); const data = await this.parseItems(html, task, $); await this.pipResult(data, task); let fmtItems = []; if (data && data?.items && data?.items?.length > 0) { const items = data?.items; fmtItems = items.map((item) => { return { ...item, scraperUpdatedAt: moment().format(), scraperUpdatedAtTimestamp: Date.now(), scraperUpdatedAtDate: moment().format('YYYY-MM-DD'), }; }); await this.pipItems(fmtItems, task); try { await this.addToAllPipline(fmtItems) } catch (error: any) { messageError(error); } } this.runCallback(null, callback); return fmtItems; } catch (error) { console.error('error', error); this.runCallback(error, callback); return null; } } async getHTML(url, task?) { if (this.demoHtml) return this.demoHtml; try { const resp = await doRequest(url); // @ts-ignore const $ = cheerio.load(resp); const html = $.html(); return html; } catch (error: any) { console.error('error', error); throw error; } } async onGetHtmlError(error, url, task) { } abstract start(); abstract loadItems(); abstract parseItems(html, task, $); abstract pipItems(items: any[], task); async addToAllPipline(items) { const ps = [ this.addToAllEs(items), this.addToAllEsApi(items), this.addToAllMongo(items), this.addToAllMongoApi(items), this.addToAllPgApi(items), ] return await Promise.all(ps); } pipResult(data: any, task) { return data; } getRedis() { if (this.redis) { return this.redis; } this.redis = createRedisClient(); return this.redis; } runCallback(error, callback) { try { callback(error); } catch (error) { // } } initMongoQueue(mongoQueueOptions) { if (mongoQueueOptions && mongoQueueOptions.length > 0) { for (const mongoQueueOption of mongoQueueOptions) { this.mongoQueueList.push(new AMongoQueue(mongoQueueOption)); } } } initEsQueue(esQueueOptions) { if (esQueueOptions && esQueueOptions.length > 0) { for (const esQueueOption of esQueueOptions) { const { indexName, esOptions, batch, key, esKey, queueName } = esQueueOption; this.esQueueList.push(new AEsQueue({ esOptions, indexName, batch, queueName: queueName || esOptions.node, key, esKey, })); } } } initEsApiQueue(options) { if (options && options.length > 0) { for (const opt of options) { const { indexName, batch, key, esKey, queueName } = opt; this.esApiQueueList.push(new AEsApiQueue({ indexName, batch, queueName: queueName || options.node, key, esKey, })); } } } initMongoApiQueue(options) { if (options && options.length > 0) { for (const opt of options) { const { dbName, colName, batch, key, queueName } = opt; this.mongoApiQueueList.push(new AMongoApiQueue({ dbName, colName, batch, queueName: queueName || options.node, key, })); } } } initPgApiQueue(options) { if (options && options.length > 0) { for (const opt of options) { const { tableName, batch, key, queueName } = opt; this.pgApiQueueList.push(new APgApiQueue({ tableName, batch, queueName: queueName || tableName, key, })); } } } addTask(item) { this.queue.add(item); } get queueLength() { return this.queue.queueLength; } async addToAllMongo(items) { console.info(`\n%c--------- addToAllMongo ${this.queue.queueName}--------- \n`, 'background:yellow; color:blue; font-weight:600;'); console.log('items.length', items.length); if (this.mongoQueueList && this.mongoQueueList.length > 0) { for (const queue of this.mongoQueueList) { if (_.isArray(items)) { await queue.add(items); } else { await queue.add([items]); } } } } async addToAllEs(items) { console.info(`\n%c--------- addToAllEs ${this.queue.queueName}--------- \n`, 'background:yellow; color:blue; font-weight:600;'); console.log('items.length', items.length); if (this.esQueueList && this.esQueueList.length > 0) { for (const queue of this.esQueueList) { await queue.add(items); } } } async addToAllEsApi(items) { console.info(`\n%c--------- addToAllEsApi ${this.queue.queueName}--------- \n`, 'background:yellow; color:blue; font-weight:600;'); console.log('items.length', items.length); if (this.esApiQueueList && this.esApiQueueList.length > 0) { for (const queue of this.esApiQueueList) { await queue.add(items); } } } async addToAllMongoApi(items) { console.info(`\n%c--------- addToAllMongoApi ${this.queue.queueName}--------- \n`, 'background:yellow; color:blue; font-weight:600;'); console.log('items.length', items.length); if (this.mongoApiQueueList && this.mongoApiQueueList.length > 0) { for (const queue of this.mongoApiQueueList) { await queue.add(items); } } } async addToAllPgApi(items) { console.info('\n%c--------- addToAllPgApi --------- \n', 'background:yellow; color:blue; font-weight:600;'); console.log('items.length', items.length); if (this.pgApiQueueList && this.pgApiQueueList.length > 0) { for (const queue of this.pgApiQueueList) { await queue.add(items); } } } onDrainQueue() { if (this.esQueueList && this.esQueueList.length > 0) { for (const q of this.esQueueList) { q.send(); } } if (this.esApiQueueList && this.esApiQueueList.length > 0) { for (const q of this.esApiQueueList) { q.send(); } } if (this.mongoQueueList && this.mongoQueueList.length > 0) { for (const q of this.mongoQueueList) { q.send(); } } if (this.mongoApiQueueList && this.mongoApiQueueList.length > 0) { for (const q of this.mongoApiQueueList) { q.send(); } } if (this.pgApiQueueList && this.pgApiQueueList.length > 0) { for (const q of this.pgApiQueueList) { q.send(); } } } async emptyQueue() { await deleteKeysByPattern(this.redis, `bull:${this.queueName}:*`); return await this.queue.empty(); } async obliterateQueue() { this.emptyQueue(); return await this.queue.obliterate(); } loop() { if (this.timer) clearTimeout(this.timer); this.timer = setTimeout(async () => { if (await this.queue.length() === 0) { this.loadItems(); } this.loop(); }, 60 * 1000); } }