import { Client } from '@elastic/elasticsearch'; import * as _ from 'lodash'; const get = _.get; let client; export const localEsOptions = { node: process.env.ES_HOST, auth: { username: process.env.ES_USERNAME, password: process.env.ES_PASSWORD, }, }; export const awsEsOptions = { node: process.env.ES_HOST_AWS, auth: { username: process.env.ES_USERNAME, password: process.env.ES_PASSWORD, }, }; export async function getSearchClient(clientOptions) { if (!clientOptions) throw new Error('clientOptions缺失'); const client = new Client(clientOptions); return client; } export async function searchESWith(options, clientOptions) { const client = await getSearchClient(clientOptions); const resp = await client.search(options); return resp; } export async function searchESLocal(options) { return searchESWith(options, localEsOptions); } export async function searchESAws(options) { return searchESWith(options, awsEsOptions); } export async function createOrUpdateEsItems(clientOptions, index, items, query, key) { console.log('createOrUpdateEsItems'); const client = await getSearchClient(clientOptions); const body = query; const resp: any = await client.search({ index: index, body, }); const hits: any = resp.body.hits.hits; const hitsItems = hits.map((item) => item._source); const hitsKeys = hits.map((item) => item._source.key); const existItems = items.filter(i => hitsKeys.includes(i.key)); const newItems = items.filter(i => !hitsKeys.includes(i.key)); if (existItems.length > 0) { const itemsToUpdate = existItems.map((item) => { const found = _.find(hitsItems, h => h.key === item.key); return { ...item, esid: found?._id, }; }); const bulkUpdateResp = await bulkUpdate(existItems, index, clientOptions); // console.log('bulkUpdateResp', bulkUpdateResp); } if (newItems.length > 0) { const bulkCreateResp = await bulkCreate(newItems, index, clientOptions); // console.log('bulkCreateResp', bulkCreateResp); } } export async function createOrUpdateEsItem(index, item, query, clientOptions) { const client = await getSearchClient(clientOptions); const body = { query, }; const resp: any = await client.search({ index: index, body, }); const hits: any = resp.body.hits.hits; if (hits.length > 0) { const toSave = Object.assign({}, item); toSave.esid = hits[0]._id; await bulkUpdate([toSave], index, clientOptions); } else { await bulkCreate([item], index, clientOptions); } } export async function createOrUpdateEsItemAws(index, item, query) { return createOrUpdateEsItem(index, item, query, awsEsOptions); } export async function bulkCreate(items, index, clientOptions) { if (!clientOptions) throw new Error('clientOptions缺失'); const client = await getSearchClient(clientOptions); const fmtItems = items.map(i => { const { _id, ...otherProps } = i; return { ...otherProps }; }); const body = _.flatMap(fmtItems, doc => [{ index: { _index: index }, }, doc]); const resp = await client.bulk({ refresh: true, body, filter_path: 'took,errors' }); return resp; } export async function bulkUpdate(items: [{ esid: string, [key: string]: any }], index, clientOptions) { if (!clientOptions) throw new Error('clientOptions缺失'); const fmtList = items; const client = await getSearchClient(clientOptions); const body = _.flatMap(fmtList, doc => [{ index: { "_id": doc.esid, _index: index, }, }, doc]); const resp = await client.bulk({ refresh: true, body, filter_path: 'took,errors' }); return resp; } export async function esCountAll(index: string) { const resp: any = await searchESAws({ index: index, body: { query: { "match_all": {}, }, _source: false, }, }); const total = resp?.body?.hits?.total?.value; return total; } export async function removeByNames(repos) { const names = repos.map((i) => i.name || i.full_name); const body = { query: { terms: { full_name: names }, }, }; const resp: any = await searchESAws({ index: 'since', body: body, }); const hits: any = resp.body.hits.hits; const client = await getSearchClient(awsEsOptions); for (const item of hits) { await client.deleteByQuery({ index: 'since', body: { "query": { "term": { "_id": item._id, }, }, }, }); } } export async function findReposByNames(repos, removeDup = false) { const names = repos.map((i) => i.name || i.full_name); const body = { query: { terms: { full_name: names }, }, }; const resp: any = await searchESAws({ index: 'since', body: body, }); if (removeDup) { const hits: any = resp.body.hits.hits; const names = hits.map((i) => i._source.name || i._source.full_name); const dupMap = names.reduce((acc, cur) => { return { ...cur, [cur]: acc[cur] ? acc[cur] + 1 : 1, }; }, {}); const dupNames = Object.keys(dupMap).filter(i => dupMap[i] > 1); if (dupNames.length > 0) { const cache = {}; const client = await getSearchClient(awsEsOptions); for (const item of hits) { const name = item._source.name || item._source.full_name; if (cache[name]) { console.log('deleteByQuery', name); client.deleteByQuery({ index: 'since', body: { "query": { "term": { "_id": item._id, }, }, }, }); } cache[name] = 1; } } } const uniqItems = _.uniqBy(resp.body.hits.hits, '_source.name'); return uniqItems; } export async function removeEsDup() { const max = 421782901; while (1) { const id = Math.round(Math.random() * max); const body = { query: { term: { id }, }, }; const resp = await searchESAws({ index: 'since', body: body, }); // @ts-ignore const hits: any = resp?.body?.hits?.hits; if (hits.length > 1) { const names = hits.map((i) => i._source.name || i._source.full_name); const sorted = hits.sort((a, b) => a._source.full_name - b._source.full_name); const currentName = sorted[0]._source.name || sorted[0]._source.full_name; const others = sorted.slice(1); for (const item of others) { if (item._source.name === currentName || item._source.full_name === currentName) { const deleteResp = await client.deleteByQuery({ index: 'since', body: { "query": { "term": { "_id": item._id, }, }, }, }); } } } } } export async function findItemsByNames(clientOptions, indexName, items, key, eskey, removeDup = false) { const names = items.map((i) => i[key]); const body = { query: { terms: { [eskey]: names }, }, "_source": [key], "size": 10000, }; const resp = await searchESWith({ index: indexName, body: body, }, clientOptions); if (removeDup) { const hits = resp.body.hits.hits; const names = hits.map((i) => i._source[key]); const dupMap = names.reduce((acc, cur) => { return { ...cur, [cur]: acc[cur] ? acc[cur] + 1 : 1, }; }, {}); const dupNames = Object.keys(dupMap).filter(i => dupMap[i] > 1); if (dupNames.length > 0) { const cache = {}; const client = await getSearchClient(clientOptions); for (const item of hits) { const keyValue = item._source[key]; if (cache[keyValue]) { client.deleteByQuery({ index: 'since', body: { "query": { "term": { "_id": item._id, }, }, }, }); } cache[keyValue] = 1; } } } const uniqItems = _.uniqBy(resp.body.hits.hits, `_source[${key}]`); return uniqItems; }