import { EntityTarget, getRepository, SelectQueryBuilder } from 'typeorm'; const ENTITY_ALIAS = 'Entity'; type BatchProcessEntityStreamOptions = { /** * Sets where condition in the query builder. */ where?: Parameters['where']>[0]; /** * Sets limit in the query builder. */ limit?: Parameters['limit']>[0]; /** * Sets take in the query builder. */ take?: Parameters['take']>[0]; /** * Sets offset in the query builder. */ offset?: Parameters['offset']>[0]; /** * Sets skip in the query builder. */ skip?: Parameters['skip']>[0]; /** * How large each batch is. * @default 100 */ batchSize?: number; }; /** * @exerimental * Batch processes a stream of entities, using `processFn`. Attempts to * handle conversion from raw postgres to output. * WARNING: this is experimental and may fail in certain edge cases. */ const batchProcessEntityStream = async ( entity: EntityTarget, processFn: (pages: Entity[]) => void | Promise, options?: BatchProcessEntityStreamOptions ): Promise => { let queryBuilder = getRepository(entity).createQueryBuilder(ENTITY_ALIAS); let batch = new Array(); // set up all options for controlling the query builder if (options?.where) { queryBuilder = queryBuilder.where(options?.where); } if (options?.limit) { queryBuilder = queryBuilder.limit(options?.limit); } if (options?.take) { queryBuilder = queryBuilder.take(options?.take); } if (options?.offset) { queryBuilder = queryBuilder.offset(options?.offset); } if (options?.skip) { queryBuilder = queryBuilder.skip(options?.skip); } const stream = await queryBuilder.stream(); // on each batch, run process function and then clear the batch const flush = async () => { await processFn(batch); batch = []; }; return new Promise((resolve, reject) => { const onError = (error: Error) => { stream.destroy(error); reject(error); }; stream.on('data', (pgPage: Record) => { // create the entity const page = getRepository(entity).create( Object.entries(pgPage).reduce((prev, [key, value]) => { return { ...prev, [key.startsWith(`${ENTITY_ALIAS}_`) ? key.slice(ENTITY_ALIAS.length + 1) : key]: value, }; }, {}) ); // push the entity to the batch batch.push(page); if (batch.length >= (options?.batchSize ?? 100)) { stream.pause(); flush() .then(() => { stream.resume(); }) .catch((error) => { onError(error); }); } }); stream.on('error', (error) => { onError(error); }); stream.on('end', () => { if (batch.length > 0) { flush().then(resolve).catch(onError); } else { resolve(); } }); }); }; export default batchProcessEntityStream;