import { exec } from 'child_process'; import _ from 'lodash'; import { localEsOptions, bulkCreate, bulkUpdate, findItemsByNames } from './es'; const get = _.get; declare let process: any; const N = +process.env.GQL_N || 5; let reloadingES = false; export async function saveItemsToLocalEs(index, items, key) { return await sendItemsToES(localEsOptions, index, items, key, key); } export async function sendItemsToES(clientOptions, indexName, uniqItems, key, esKey) { try { const docs = await findItemsByNames(clientOptions, indexName, uniqItems, key, esKey, false); const fmtDocs = uniqItems.map((r) => { //@ts-ignore const foundDoc = docs.find(i => i._source[key] === r[key]); if (foundDoc) { return { ...r, //@ts-ignore esid: foundDoc._id, found: true, }; } else { return r; } }); const foundDocs = fmtDocs.filter(i => i.found); const newDocs = fmtDocs.filter(i => !i.found); if (newDocs.length > 0) { const resp = await bulkCreate(newDocs, indexName, clientOptions); // console.log('bulkCreate resp', resp); } if (foundDocs.length > 0) { const resp = await bulkUpdate(foundDocs, indexName, clientOptions); // console.log('bulkUpdate resp', resp); } // console.info('foundDocs.length', foundDocs.length); // console.info('newDocs.length', newDocs.length); } catch (error: any) { // console.error('error', error); // console.error('error.stack', error.stack?.toString()?.split('\n')?.slice(0, 2)?.join('\n')); const msg = get(error, 'errors[0].message') || error.toString() || ''; if (msg.indexOf('connect EADDRNOTAVAIL') > -1 || msg.indexOf('ConnectionError') > -1) { if (!reloadingES) { reloadES(); reloadingES = true; setTimeout(() => { reloadingES = false; }, 600 * 1000); } } } } export const IndexSince = 'since'; export const IndexReposNpm = 'repos_npm'; export async function reloadES() { console.info('docker restart es'); exec('docker restart es', (err, stdout, stderr) => { if (err) { console.error(err); } else { console.info(`reloadES stdout: ${stdout}`); console.info(`reloadES stderr: ${stderr}`); } }); }