import moment from 'moment'; import * as _ from 'lodash'; import dayjs from 'dayjs'; import crypto from 'crypto'; import axios from 'axios'; import { BulkTypes } from '../bulk-api'; import mapKeysDeep from 'map-keys-deep-lodash'; export function camel(obj) { return mapKeysDeep(obj, (value, key) => { return _.camelCase(key); }); } export function snake(obj) { return mapKeysDeep(obj, (value, key) => { return _.snakeCase(key); }); } export const toSnake = (item: any) => snake(item); export const toCamel = (item: any) => camel(item); const get = _.get; export const formatNow = () => { return dayjs().format('YYYY-MM-DD HH:MM'); }; export function md5(value) { const hash = crypto.createHash('md5').update(value).digest('hex'); return hash; } export function fixUrlXPath(host, url) { if (url.startsWith('/')) { return `${host.replace(/\/+$/, '')}${url}`; } return url; } export const deleteKeysByPattern = (redis, pattern: string) => { return new Promise((resolve, reject) => { const stream = redis.scanStream({ match: pattern, }); stream.on("data", (keys: string[]) => { if (keys.length) { const pipeline = redis.pipeline(); keys.forEach((key) => { pipeline.del(key); }); pipeline.exec(); } }); stream.on("end", () => { resolve(1); }); stream.on("error", (e) => { reject(e); }); }); }; export async function sendItemsToAwsPg(tableName, key, items) { const fmtItems = toSnake(items); const api = process.env.API_HOST; const bulkApi = `${api}/bulk`; const secret = process.env.API_SECRET; if (!secret) { throw new Error('sendItemsToAwsPg 没有 secret'); } try { const resp = await axios.post(bulkApi, { tableName, key, items: fmtItems, secret, bulk_type: BulkTypes.pg, }); return resp; } catch (error: any) { const msg = getErrorMessage(error); console.error('sendItemsToAwsPg error', msg); return null } } /** * ------------------------------- */ export async function batchProcess(items, func, batch = 20) { if (items.length === 0) return null; const fmtItems = toSnake(items); try { const promises: any = []; const chunks = _.chunk(fmtItems, batch); for (const chunk of chunks) { for (const item of chunk) { promises.push(func(item)); } } const resp = await Promise.all(promises); return resp; } catch (error: any) { console.error('error', error); return null; } } export async function scrollCollection(col, where, batch = 50, processItems) { async function getItems() { const items = await col.find(where).limit(batch).toArray(); return items; } let items = await getItems(); let cc = 0; while (items && items.length > 0) { await processItems(items); cc += items.length; console.log('cc', cc); items = await getItems(); } } export async function messageError(error) { console.error(error); const msg = getErrorMessage(error); console.error(msg); } export async function erororMessage(error) { console.error(error); const msg = getErrorMessage(error); console.error(msg); } export function getErrorMessage(error) { return error?.message || error?.response?.error || error?.data?.message || _.get(error, 'messages[0]') || _.get(error, 'data.messages[0]'); } export function functionName() { } export function readableNumber(num) { // console.log('num', num); const billion = 1000000000; const million = 1000000; const thousand = 1000; const wan = 10000; const b = Math.floor(num / billion); // console.log('b', b); const m = Math.floor((num % billion) / million); // console.log('m', m); const t = Math.floor((num % million) / thousand); const w = Math.floor((num % million) / wan); // console.log('t', t); let ret = ''; if (b >= 1) { ret += `${b}b`; } if (m >= 1) { ret += `${m}m`; } if (w >= 1) { ret += `${w}w`; } // if (t >= 1) { // ret += `${t}T` // } ret = `${ret}${num % wan}`; // console.log('ret', ret); return ret; } export function formatDate(date = new Date()) { return moment(date).format('YYYY-MM-DD'); } export function sleep(time) { return new Promise((resolve, reject) => { setTimeout(() => { resolve(1); }, time); }); }