import assert from 'assert'; import {NotFoundError} from '../../errors'; import type {ResultType} from '../../types'; import type {Handler} from '../common/handlers'; import {makeSqsHandler} from '../common/handlers'; type Loader = (record: SOURCE) => Promise; type Creator = (record: SOURCE) => Promise; type Updater = ( record: SOURCE, target: TARGET ) => Promise; interface Projector { load: Loader; create: Creator; update: Updater; } interface SDK { createTargetModel: ( target: CREATE_TARGET_INPUT ) => Promise>; unmarshallSourceModel: (item: Record) => SOURCE; updateTargetModel: ( target: UPDATE_TARGET_INPUT ) => Promise>; } /** * Returns a function that handles a DynamoDB for the SOURCE and produces a * TARGET */ export function makeEnricher< SOURCE, TARGET, CREATE_TARGET_INPUT, UPDATE_TARGET_INPUT >( enricher: Projector, sdk: SDK ): Handler { const {unmarshallSourceModel} = sdk; return makeSqsHandler(async (unmarshalledRecord) => { assert(unmarshalledRecord.dynamodb?.NewImage); const source = unmarshallSourceModel(unmarshalledRecord.dynamodb?.NewImage); assert(source); await enrich(enricher, sdk, source); }); } /** Enriches the source model into the target model */ async function enrich( enricher: Projector, sdk: SDK, source: SOURCE ) { const {create, load, update} = enricher; const {createTargetModel, updateTargetModel} = sdk; try { const item = await load(source); const modelToUpdate = await update(source, item); if (modelToUpdate) { return await updateTargetModel(modelToUpdate); } } catch (err) { if (err instanceof NotFoundError) { const modelToCreate = await create(source); if (modelToCreate) { return await createTargetModel(modelToCreate); } } throw err; } }