#!/usr/bin/env ts-node import fs from 'fs'; import path from 'path'; import { config as setupEnv } from 'dotenv-flow'; import { chain } from 'stream-chain'; import StreamArray from 'stream-json/streamers/StreamArray'; import Batch from 'stream-json/utils/Batch'; import { getCustomRepository } from 'typeorm'; import { transformURL } from '@/repositories/PageRepository/utils'; import { PageRepository } from '@/repositories'; import { ProductSource } from '@/entities/types'; import { Page, Product } from '@/entities'; import connect from '../src/config/db'; setupEnv({ silent: true }); type JSONPage = { urlOriginal: string | null; googleShoppingID: string; price: number | null; annotations: string | null; discoverable: boolean | null; hasReviews: boolean | null; description: string | null; img: string | null; name: string | null; retailer: string | null; url: string | null; urlQuery: string | null; }; const getURL = (originalURL?: string | null) => { let url: string | undefined; try { url = originalURL ? transformURL(originalURL) : undefined; } catch (error) { // no-op } return url; }; const main = async () => { await connect(); const stream = fs.createReadStream(path.join(__dirname, 'dataset.json')); return new Promise((resolve, reject) => { stream.pipe( chain([ StreamArray.withParser(), new Batch({ batchSize: 1000 }), async (chunk: { value: JSONPage }[]) => { const urlToGoogleShoppingIdMap = new Map( chunk .map( ({ value: { googleShoppingID, urlOriginal }, }): [string | undefined, string] => [ getURL(urlOriginal), googleShoppingID, ] ) .filter((entry): entry is [string, string] => !!entry[0]) ); const repo = getCustomRepository(PageRepository); const pages = ( await repo.getByUrls([...urlToGoogleShoppingIdMap.keys()]) ).filter((page): page is Page => !!page); const urlToProductMap = new Map( pages .map(({ product, url }): [string, Product | undefined] => [ url, product, ]) .filter( (entry): entry is [string, Product] => !!entry[1] && entry[1].source === ProductSource.GOOGLE_SHOPPING ) ); const productsToUpdate: Product[] = [...urlToProductMap.entries()] .map(([url, product]) => { const googleShoppingID = urlToGoogleShoppingIdMap.get(url); if (googleShoppingID) { const googleShoppingIds = Array.from( new Set([ googleShoppingID, ...(product.googleShoppingIDs ?? []), ]) ); if ( googleShoppingIds.length !== product.googleShoppingIDs?.length ) { product.googleShoppingIDs = googleShoppingIds; return product; } } }) .filter((update): update is Product => !!update); const pagesToUpdate: Page[] = pages .map((page) => { const googleShoppingID = urlToGoogleShoppingIdMap.get(page.url); if (googleShoppingID) { const googleShoppingIds = Array.from( new Set([googleShoppingID, ...(page.googleShoppingIDs ?? [])]) ); if ( googleShoppingIds.length !== page.googleShoppingIDs?.length ) { page.googleShoppingIDs = googleShoppingIds; return page; } } }) .filter((update): update is Page => !!update); await Promise.all([ ...(pagesToUpdate.length ? [ Page.save(pagesToUpdate, { listeners: false, transaction: false, }), ] : []), ...(productsToUpdate.length ? [Product.save(productsToUpdate, { transaction: false })] : []), ]); console.log( `Saved ${productsToUpdate.length} products and ${pagesToUpdate.length} pages` ); }, ]) .on('error', (error) => { reject(error); }) .on('finish', () => { resolve(); }) ); }); }; console.time('main'); main() .then(() => { console.timeEnd('main'); }) .catch((error) => { console.error(error); process.exit(1); });