import { addJob, addJobs, JobKind } from '@spokenio/jobqueue-producer'; import randomatic from 'randomatic'; import { Body, Controller, Post, Res, Route, Security, SuccessResponse, TsoaResponse, } from 'tsoa'; import logger from '@/logger'; import { AggregateProductVariantRepository } from '@/repositories'; import { getProductIdentifiersFromData, getVariantIdentifiersFromData, } from '@/repositories/shared/identifiers'; import { HttpStatusCodes } from '@/routes/types'; import { UserRole } from '@/types'; import { getRetailerFromUrl, getRetailerKindFromUrl } from '@/utils/retailer'; import { CreateProductInput, CreateProductPayload } from './types'; @Route('aggregate-products') @Security('basic', [UserRole.ADMIN]) export class DataIngestionController extends Controller { @Post() @SuccessResponse('201', 'Created') async createAggregateProduct( @Body() requestBody: CreateProductInput, @Res() invalidInput: TsoaResponse< HttpStatusCodes.UNPROCESSABLE_ENTITY, { reason: string } > ): Promise { const { retailerUrls, googleShoppingUrl } = requestBody; if ((!retailerUrls || !retailerUrls.length) && !googleShoppingUrl) { return invalidInput(HttpStatusCodes.UNPROCESSABLE_ENTITY, { reason: 'Either retailerUrls or googleShoppingUrl is required', }); } const ingestionID = requestBody.ingestionId ?? randomatic('a0', 13); if (googleShoppingUrl) { const jobInfo = await addJob(JobKind.FETCH_GOOGLE_SHOPPING_INFO_1, { url: googleShoppingUrl, ingestionID, }); logger.info( `Started job to fetch google shopping info. GoogleShoppingURL:${googleShoppingUrl} JobID:${jobInfo?.id} ingestionID:${ingestionID}` ); } if (retailerUrls?.length) { const jobInfo = await addJob(JobKind.FETCH_RETAILER_INFO_1, { urls: retailerUrls, ingestionID, }); logger.info( `Started job to fetch retailer info. Retailer URLs:${retailerUrls.join( ', ' )} JobID:${jobInfo?.id} ingestionID:${ingestionID}` ); } return { status: 'queued', }; } @Post('batch') @SuccessResponse('201', 'Created') async batchCreateAggregateProducts( @Body() requestBody: CreateProductInput[], @Res() invalidInput: TsoaResponse< HttpStatusCodes.UNPROCESSABLE_ENTITY, { reasons: string[] } > ): Promise { const errors = new Array(); const fetchGoogleShoppingInfoJobs: { url: string; ingestionID?: string | undefined; productIds: string[]; }[] = []; const fetchRetailerInfoJobs: { urls: string[]; ingestionID: string; productIds: string[]; }[] = []; const variantsToCreate: { productIds: string[]; variantIds: string[]; }[] = []; requestBody.forEach((body, index) => { const { retailerUrls, googleShoppingUrl } = body; if ((!retailerUrls || !retailerUrls.length) && !googleShoppingUrl) { errors.push( `Product at index ${index}: Either retailerUrls or googleShoppingUrl is required` ); return null; } const ingestionID = body.ingestionId ?? randomatic('a0', 13); const productIds = retailerUrls ?.map((url) => getProductIdentifiersFromData({ url, retailer: getRetailerFromUrl(url), }) ) .flat(); const variantIds = [ `ingestionid:${ingestionID}`, ...(retailerUrls ?.map((url) => { try { return getVariantIdentifiersFromData({ url, retailer: getRetailerKindFromUrl(url), }); } catch (error) { return []; } }) .flat() ?? []), ]; if (googleShoppingUrl) { fetchGoogleShoppingInfoJobs.push({ url: googleShoppingUrl, ingestionID, productIds: productIds ?? [], }); } if (retailerUrls?.length) { fetchRetailerInfoJobs.push({ urls: retailerUrls, ingestionID, productIds: productIds ?? [], }); } if (variantIds?.length && productIds?.length) { variantsToCreate.push({ variantIds, productIds, }); } return { status: 'queued', }; }); const repo = new AggregateProductVariantRepository(); await repo.bulkUpsert( variantsToCreate.map(({ productIds, variantIds }) => ({ data: { price: 0, }, productIdentifiers: productIds, variantIdentifiers: variantIds, })) ); await addJobs( JobKind.FETCH_GOOGLE_SHOPPING_INFO_1, fetchGoogleShoppingInfoJobs.map((data) => ({ data })) ); await addJobs( JobKind.FETCH_RETAILER_INFO_1, fetchRetailerInfoJobs.map((data) => ({ data })) ); if (errors.length > 0) { return invalidInput(HttpStatusCodes.UNPROCESSABLE_ENTITY, { reasons: errors, }); } return [...fetchRetailerInfoJobs, ...fetchRetailerInfoJobs].map(() => ({ status: 'queued', })); } }