// Copyright (C) 2017-2019 Brainbean Apps OU (https://brainbeanapps.com). // License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). import { forOwn, isArray, findLast, isFunction } from 'lodash' import check from 'check-types' import { Transform } from 'stream' import { Promise } from 'bluebird' import * as logger from 'winston' import { CounterOptions } from './counterOptions' /** * Counts objects */ export default class Counter extends Transform { static DEFAULT_OPTIONS: { counter: (data: any, count: (...args: any[]) => void) => void logger: (...args: any[]) => void prefixComponents: string[] suffixComponents: string[] thresholds: number[] abortOnError: boolean } private _options: any private _defaultCount: (...args: any[]) => void private readonly _counters: {} /** * @constructor * @param options Processed counter options */ constructor(options?: {} | CounterOptions) { options = Object.assign({}, Counter.DEFAULT_OPTIONS, options) check.assert.like(options, Counter.DEFAULT_OPTIONS) super({ objectMode: true, }) this._options = options this._counters = {} this._defaultCount = (...args) => this._count(...args) } /** * https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback * @override */ _transform( chunk: Buffer | string | any, encoding: string, callback: Function, ) { Promise.try(() => this._options.counter(chunk, this._defaultCount)) .then(() => { this._print() callback(null, chunk) }) .catch(error => { logger.log('error', 'Error in Counter:', error) callback(this._options.abortOnError ? error : null, chunk) }) } /** * https://nodejs.org/api/stream.html#stream_transform_flush_callback * @override */ _flush(callback: Function) { this._print(false) callback() } /** * Prints current counter to log */ _print(checkThresholds = true) { forOwn(this._counters, (value, counter) => { const thresholds = isArray(this._options.thresholds) ? this._options.thresholds : this._options.thresholds[counter] const threshold = findLast( thresholds || [1], threshold => value >= threshold, ) if (checkThresholds && value % threshold !== 0) { return } const prefixComponents = isArray(this._options.prefixComponents) ? this._options.prefixComponents : this._options.prefixComponents[counter] const suffixComponents = isArray(this._options.suffixComponents) ? this._options.suffixComponents : this._options.suffixComponents[counter] const logger = isFunction(this._options.logger) ? this._options.logger : this._options.logger[counter] logger(...(prefixComponents || []), value, ...(suffixComponents || [])) }) } /** * Counts a counter */ _count(counter = '') { check.assert.string(counter) this._counters[counter] = (this._counters[counter] || 0) + 1 } }