#!/usr/bin/env ts-node import fs from 'fs'; import path from 'path'; import { config as setupEnv } from 'dotenv-flow'; import pAll from 'p-all'; import { chain } from 'stream-chain'; import StreamArray from 'stream-json/streamers/StreamArray'; import Batch from 'stream-json/utils/Batch'; import { In } from 'typeorm'; import { v4 } from 'uuid'; import { transformURL } from '@/repositories/PageRepository/utils'; import { PageSource, ProductSource } from '@/entities/types'; import { Page, Product } from '@/entities'; import connect from '../src/config/db'; setupEnv(); type JSONPage = { urlOriginal: string | null; googleShoppingID: string; price: number | null; annotations: string | null; discoverable: boolean | null; hasReviews: boolean | null; description: string | null; img: string | null; name: string | null; retailer: string | null; url: string | null; urlQuery: string | null; }; type NewProductPayload = { id: string; firebaseId: string; googleShoppingIDs: string[]; source: ProductSource; }; type NewPagePayload = { id: string; firebaseId: string; img?: string; name?: string; productFirebaseId: string; price?: number; retailer?: string; url?: string; urlQuery?: string; urlOriginal?: string; googleShoppingIDs: string[]; annotations?: string; description?: string; discoverable?: boolean; hasReviews?: boolean; featured?: boolean; source: PageSource; }; const getURL = (originalURL?: string | null) => { let url: string | undefined; try { url = originalURL ? transformURL(originalURL) : undefined; } catch (error) { // no-op } return url; }; const main = async () => { await connect(); const stream = fs.createReadStream(path.join(__dirname, 'inventory.json')); const errorStream = fs.createWriteStream(path.join(__dirname, 'error.txt')); const lowestPriceMapByProductFirebaseId = new Map< string, { price: number; pageFirebaseId: string; updated: boolean } >(); return new Promise((resolve, reject) => { stream.pipe( chain([ StreamArray.withParser(), new Batch({ batchSize: 100 }), async (chunk: { value: JSONPage }[]) => { const googleShoppingIDs = chunk.map( ({ value: { googleShoppingID } }) => googleShoppingID ); // map googleShoppingIds to firebaseId const productsMap = new Map( ( await Product.createQueryBuilder('Product') .where( 'Product.googleShoppingIDs && ARRAY[:...ids]::varchar[]', { ids: googleShoppingIDs, } ) .select([ 'Product.id', 'Product.firebaseId', 'Product.googleShoppingIDs', ]) .getMany() ) .map(({ firebaseId, googleShoppingIDs }): [string[], string] => { return [googleShoppingIDs ? googleShoppingIDs : [], firebaseId]; }) .reduce((prev, [googleShoppingIDs, id]) => { return [ ...prev, ...googleShoppingIDs.map( (googleShoppingID): [string, string] => [ googleShoppingID, id, ] ), ]; }, new Array<[string, string]>()) ); const productsToCreate = Array.from( new Set( chunk.map(({ value }) => { return value.googleShoppingID; }) ) ) .map((googleShoppingID) => { let productId = productsMap.get(googleShoppingID); if (!productId) { productId = v4(); productsMap.set(googleShoppingID, productId); return { id: productId, firebaseId: productId, googleShoppingIDs: [googleShoppingID], source: ProductSource.GOOGLE_SHOPPING, }; } return null; }) .filter((payload): payload is NewProductPayload => { return !!payload; }); await Product.insert(productsToCreate); const pageUrls = chunk.map(({ value }) => { return getURL(value.url); }); const existingPagesByUrlSet = new Set( ( await Page.find({ where: { url: In(pageUrls), }, select: ['id', 'url'], }) ).map(({ url }) => { return url; }) ); const pagesToCreate = chunk .filter( ({ value }) => value.url && !existingPagesByUrlSet.has(value.url) ) .map(({ value }): NewPagePayload | null => { const productId = productsMap.get(value.googleShoppingID); const url = getURL(value.url); if (productId && url && value.img && value.description) { const id = v4(); const existingPage = lowestPriceMapByProductFirebaseId.get(productId); if (!existingPage && value.price) { lowestPriceMapByProductFirebaseId.set(productId, { price: value.price, pageFirebaseId: id, updated: true, }); } else if ( existingPage && value.price && existingPage.price > value.price ) { lowestPriceMapByProductFirebaseId.set(productId, { price: value.price, pageFirebaseId: id, updated: true, }); } return { id: id, firebaseId: id, img: value.img ?? undefined, name: value.name ?? undefined, productFirebaseId: productId, price: value.price ?? undefined, retailer: value.retailer ?? undefined, url, urlQuery: value.urlQuery ?? undefined, urlOriginal: value.urlOriginal ?? undefined, googleShoppingIDs: [value.googleShoppingID], annotations: value.annotations ?? undefined, description: value.description ?? undefined, discoverable: value.discoverable ?? undefined, hasReviews: value.hasReviews ?? undefined, featured: false, source: PageSource.GOOGLE_SHOPPING, }; } errorStream.write(`${value.googleShoppingID}\n`, 'utf8'); return null; }) .filter((payload): payload is NewPagePayload => { return !!payload; }); await Page.insert(pagesToCreate); // get lowest priced pages await pAll( Array.from(lowestPriceMapByProductFirebaseId.entries()) .filter(([, { updated }]) => updated) .map(([productFirebaseId, value]) => async () => { lowestPriceMapByProductFirebaseId.set(productFirebaseId, { ...value, updated: false, }); const { pageFirebaseId } = value; return Page.update( { productFirebaseId }, { lowestPricePageId: pageFirebaseId } ); }), { concurrency: 100 } ); }, ]) .on('error', (error) => { reject(error); }) .on('finish', () => { resolve(); }) ); }); }; console.time('main'); main() .then(() => { console.timeEnd('main'); }) .catch((error) => { console.error(error); process.exit(1); });