import { DynamoDBDocument, QueryCommandInput, UpdateCommandOutput, PutCommandInput, DeleteCommandInput, BatchWriteCommandInput, BatchWriteCommandOutput, BatchGetCommandInput, BatchGetCommandOutput, } from '@aws-sdk/lib-dynamodb'; import { Transaction, DynamoDbManager } from './DynamoDbManager'; import { MissingKeyValuesError } from '../error/MissingKeyValuesError'; import { InvalidDbSchemaError } from '../error/InvalidDbSchemaError'; import { InvalidDataFormatError } from '../error/InvalidDataFormatError'; import { UnknownKeyAttributeError } from '../error/UnknownKeyAttributeError'; export type QueryFilterTypeBeginsWith = { type: 'begins_with'; keyword: string; }; export type QueryOptions = { // whether to match sort key along with the partition key // if set to true this will return 1 item maximum useSortKey?: boolean; // table index to use (one of tables GSIs) index?: keyof TableIndexes; // number of items to limit in the result limit?: number; // result order based on the sort key order?: 'desc' | 'asc'; // filter operation on the sort key filter?: QueryFilterTypeBeginsWith; }; interface Reader { queryItems(partialItem: Partial, options?: QueryOptions): Promise; getItem(id: string | Partial): Promise; getItems(partialItem: Partial[]): Promise; } interface Writer { createItem(item: Partial): Promise; updateItem(partialItem: Partial): Promise; deleteItem(partialItem: Partial): Promise; deleteItems(partialItem: Partial[]): Promise; } type Repository = Reader & Writer; type Repositories = Record>; export type TableKeys = { pk: { attributeName: string; format: string; }; sk: { attributeName: string; format: string; }; }; export type TableIndexes = Record; export type KeysFormat = Record; export type EntityDefinition = { keys: TableKeys; indexes: TableIndexes; fieldsAsJsonString: string[]; optionalIndexes?: (keyof TableIndexes)[]; }; const MAX_REATTEMPTS = 3; export abstract class AbstractDynamoDbRepository implements Reader, Writer { protected client: DynamoDBDocument; protected keys: TableKeys; protected indexes: TableIndexes; protected keysFormat: KeysFormat; protected optionalIndexes: (keyof TableIndexes)[] = []; // fields listed in this property are stored as a JSON string value in db protected fieldsAsJsonString: string[]; constructor( protected tableName: string, protected dbManager: DynamoDbManager, protected entityName: string, protected entityDefinition: EntityDefinition, protected classRef: { new (data?: Record): DATA_CLASS }, ) { this.client = dbManager.client; this.keys = entityDefinition.keys; this.indexes = entityDefinition.indexes; this.fieldsAsJsonString = entityDefinition.fieldsAsJsonString; this.keysFormat = { [this.keys.pk.attributeName]: this.keys.pk.format, [this.keys.sk.attributeName]: this.keys.sk.format, }; Object.keys(this.indexes).forEach((key) => { const index = this.indexes[key]; this.keysFormat[index.pk.attributeName] = index.pk.format; this.keysFormat[index.sk.attributeName] = index.sk.format; if (entityDefinition.optionalIndexes?.includes(key)) { this.optionalIndexes.push(key); } }); } /** * Get the single item matching the key fields value in the given * partial item. Will throw MissingKeyValuesError if key field values * are missing * * @param item * * @throws MissingKeyValuesError */ public async getItem(item: Partial): Promise { const output = await this.client.get({ TableName: this.tableName, Key: { [this.keys.pk.attributeName]: this.getPk(item), [this.keys.sk.attributeName]: this.getSk(item), }, }); if (output.Item === undefined) { return undefined; } return this.hydrateItem(output.Item); } /** * Get the single item each matching the key fields value in the given * partial items. Will throw MissingKeyValuesError if key field values * are missing * Uses batchGet() to request 100 items in a batch * * @param item * * @throws MissingKeyValuesError */ public async getItems(items: Partial[]): Promise { // this is the maximum items allowed by BatchGetItem() const batchSize = 100; const keys = this.getItemsKeys(items); let result: DATA_CLASS[] = []; for (let i = 0; i < keys.length; i += batchSize) { const batchResult = await this.getBatchItems(keys.slice(i, i + batchSize)); result = result.concat(batchResult); } return result; } /** * Returns the batch items from the items primary key * * @param items * @returns */ private async getBatchItems(keys: { [key: string]: string }[]): Promise { let resultItems: DATA_CLASS[] = []; let requestKeys: BatchGetCommandInput['RequestItems'] = { [this.tableName]: { Keys: keys, }, }; let reattemptsCount = 0; while (requestKeys && Object.keys(requestKeys).length) { if (reattemptsCount++ > MAX_REATTEMPTS) { throw Error('Maximum allowed retries exceeded for unprocessed items'); } const output: BatchGetCommandOutput = await this.client.batchGet({ RequestItems: requestKeys, }); requestKeys = output.UnprocessedKeys; if (output.Responses && output.Responses[this.tableName] && output.Responses[this.tableName].length) { resultItems = resultItems.concat(output.Responses[this.tableName].map((i) => this.hydrateItem(i))); } } return resultItems; } /** * Delete items in a batch * Uses batchWrite() with 25 items * * @param item * * @throws MissingKeyValuesError */ public async deleteItems(items: Partial[]): Promise { // this is the maximum items allowed by BatchWriteItem() const batchSize = 25; const keys = this.getItemsKeys(items); for (let i = 0; i < keys.length; i += batchSize) { await this.deleteBatchItems(keys.slice(i, i + batchSize)); } } private async deleteBatchItems(keys: { [key: string]: string }[]): Promise { let requestItems: BatchWriteCommandInput['RequestItems'] = { [this.tableName]: Object.values(keys).map((key) => { return { DeleteRequest: { Key: key, }, }; }), }; let reattemptsCount = 0; while (requestItems && Object.keys(requestItems).length) { if (reattemptsCount++ > MAX_REATTEMPTS) { throw Error('Maximum allowed retries exceeded for unprocessed items'); } const response: BatchWriteCommandOutput = await this.client.batchWrite({ RequestItems: requestItems, }); requestItems = response.UnprocessedItems; } } private getItemsKeys(items: Partial[]) { const keys: { [key: string]: string }[] = []; for (const item of items) { keys.push({ [this.keys.pk.attributeName]: this.getPk(item), [this.keys.sk.attributeName]: this.getSk(item), }); } // filter duplicate items keys return keys.filter((key, index) => { return ( index === keys.findIndex( (key2) => key[this.keys.pk.attributeName] === key2[this.keys.pk.attributeName] && key[this.keys.sk.attributeName] === key2[this.keys.sk.attributeName], ) ); }); } /** * Finds all the items matching the partition key or * the gsi key (when gsi index name is specified) * * @param item * @param options * @throws MissingKeyValuesError */ public async queryItems(item: Partial, options?: QueryOptions): Promise { const useSortKey = options?.useSortKey || options?.filter; const index = options?.index; let pkName = this.keys.pk.attributeName; let skName = this.keys.sk.attributeName; let indexName = null; if (index) { if (this.indexes[index] === undefined) { throw new MissingKeyValuesError(`Table index '${index}' not defined on entity ${this.entityName}`); } indexName = index; pkName = this.indexes[index].pk.attributeName; skName = this.indexes[index].sk.attributeName; } const pk = this.getKey(item, pkName); const keyConditionExpression = ['#pkName = :pkValue']; const expressionAttributeNames: Record = { '#pkName': pkName }; const expressionAttributeValues: Record = { ':pkValue': pk }; if (useSortKey) { const sk = options?.filter?.keyword ?? this.getKey(item, skName); keyConditionExpression.push(this.getFilterKeyConditionExpression(options?.filter)); expressionAttributeNames['#skName'] = skName; expressionAttributeValues[':skValue'] = sk; } const queryCommandInput: QueryCommandInput = { TableName: this.tableName, KeyConditionExpression: keyConditionExpression.join(' AND '), ExpressionAttributeNames: expressionAttributeNames, ExpressionAttributeValues: expressionAttributeValues, }; if (indexName) { queryCommandInput['IndexName'] = String(indexName); } if (options?.order !== undefined) { queryCommandInput.ScanIndexForward = options.order === 'asc'; } return await this.queryFullPartition(queryCommandInput, options?.limit); } /** * Evaluate filter condition for sort key * @param filter * @returns string */ private getFilterKeyConditionExpression(filter: QueryOptions['filter']): string { if (filter === undefined) { return '#skName = :skValue'; } else if (filter.type === 'begins_with') { return 'begins_with(#skName, :skValue)'; } throw new Error(`Invalid query filter type: ${(filter as any)?.type}`); } /** * Get the list of objects by given params, paginating until all results are returned for a full partition. * @param params - The DynamoDB query params. * @param limit - The maximum number of items to return. * @returns Promise of array of items. */ private async queryFullPartition(params: QueryCommandInput, limit?: number): Promise { const items: Array = []; let exclusiveStartKey: QueryCommandInput['ExclusiveStartKey'] = undefined; do { const queryInput: QueryCommandInput = { ...params, ExclusiveStartKey: exclusiveStartKey, }; const result = await this.client.query(queryInput); exclusiveStartKey = result.LastEvaluatedKey; if (result.Items && result.Items.length) { const resultItems = result.Items.map((item) => this.hydrateItem(item)); const itemsToAdd = limit === undefined ? resultItems : resultItems.slice(0, limit - items.length); items.push(...itemsToAdd); const hasReachedLimit = limit !== undefined && items.length >= limit; if (hasReachedLimit) { exclusiveStartKey = undefined; } } } while (exclusiveStartKey); return items; } /** * Update the existing item matching the key fields value * in the passed newValue * @param newValue * @param transaction * * @returns Promise * @throws MissingKeyValuesError */ public async updateItem(newValue: Partial, transaction: Transaction = {}): Promise { const oldValue = await this.getItem(newValue); if (oldValue === undefined) { return undefined; } const value = { ...oldValue, ...newValue }; this.assertValueMatchesModel(value); this.convertSelectedValuesToJsonString(newValue); this.convertSelectedValuesToJsonString(oldValue as Record); const updateExpression = []; const expressionAttributeNames: Record = {}; const expressionAttributeValues: Record = {}; const updatedAttributes: string[] = []; for (const modelProperty of Object.keys(newValue)) { const propValue = newValue[modelProperty as keyof SHAPE]; if (propValue === oldValue[modelProperty as keyof SHAPE]) { // don't need to update the properties that are unchanged continue; } const propName = `#${modelProperty}`; const propValuePlaceHolder = `:${modelProperty}`; updateExpression.push(`${propName} = ${propValuePlaceHolder}`); expressionAttributeNames[propName] = modelProperty; expressionAttributeValues[propValuePlaceHolder] = propValue; updatedAttributes.push(modelProperty); } if (!updatedAttributes.length) { // nothing to update return value; } // also update the gsi attributes if needed Object.keys(this.indexes).forEach((key) => { const index = this.indexes[key]; [index.pk.attributeName, index.sk.attributeName].forEach((keyAttributeName) => { const keyFormat = this.keysFormat[keyAttributeName]; if (updatedAttributes.find((attr) => keyFormat.search(`{${attr}}`) !== -1)) { const propName = `#${keyAttributeName}`; const propValuePlaceHolder = `:${keyAttributeName}`; updateExpression.push(`${propName} = ${propValuePlaceHolder}`); expressionAttributeNames[propName] = keyAttributeName; expressionAttributeValues[propValuePlaceHolder] = this.getKey(value, keyAttributeName); } }); }); const updateCommandInput = { TableName: this.tableName, Key: { [this.keys.pk.attributeName]: this.getPk(newValue), [this.keys.sk.attributeName]: this.getSk(newValue), }, UpdateExpression: 'SET ' + updateExpression.join(', '), ExpressionAttributeValues: expressionAttributeValues, ExpressionAttributeNames: expressionAttributeNames, ConditionExpression: `attribute_exists(${this.keys.pk.attributeName})`, }; if (transaction.id?.length) { // this command will be executed together with // other db write commands in the "transaction" block this.dbManager.addWriteTransactionItem(transaction.id, { Update: updateCommandInput, }); return new this.classRef(value); } let output: UpdateCommandOutput; try { output = await this.client.update({ ...updateCommandInput, ReturnValues: 'ALL_NEW', }); } catch (e: any) { if (e && e.name === 'ConditionalCheckFailedException') { return undefined; } throw e; } let item: DATA_CLASS | undefined = undefined; if (output.Attributes) { item = this.hydrateItem(output.Attributes); } return item ? item : undefined; } /** * Adds new item to the table * * @param value * @param transaction * @param additionalValue Additional item properties that are not part of the DATA_CLASS * * @returns Promise * @throws DuplicateItemError * @throws MissingKeyValuesError */ public async createItem( value: DATA_CLASS, transaction: Transaction = {}, additionalValue: Partial = {}, options: { overrideExisting?: boolean } = { overrideExisting: false }, ): Promise { this.assertValueMatchesModel(value); const columns: any = {}; for (const modelProperty of Object.keys(value)) { columns[modelProperty] = value[modelProperty as keyof DATA_CLASS]; } this.convertSelectedValuesToJsonString(columns); const keyFields: Record = { [this.keys.pk.attributeName]: this.getPk({ ...value, ...additionalValue }), [this.keys.sk.attributeName]: this.getSk({ ...value, ...additionalValue }), }; for (const [key, index] of Object.entries(this.indexes)) { try { keyFields[index.pk.attributeName] = this.getKey({ ...value, ...additionalValue }, index.pk.attributeName); keyFields[index.sk.attributeName] = this.getKey({ ...value, ...additionalValue }, index.sk.attributeName); } catch (e) { if ((e as Error).name === 'MissingKeyValuesError' && this.optionalIndexes.includes(key)) { // ignore optional index fields missing error delete keyFields[index.pk.attributeName]; delete keyFields[index.pk.attributeName]; continue; } throw e; } } const putCommandInput: PutCommandInput = { TableName: this.tableName, Item: { ...keyFields, ...columns, }, ConditionExpression: options.overrideExisting ? undefined : `attribute_not_exists(${this.keys.pk.attributeName})`, }; if (transaction.id?.length) { // this command will be executed together with // other db write commands in the "transaction block" this.dbManager.addWriteTransactionItem(transaction.id, { Put: putCommandInput, }); return value; } await this.client.put(putCommandInput); return value; } /** * Deletes an item from the table * * @param partialItem * @param transaction * @returns number * @throw MissingKeyValuesError */ public async deleteItem(partialItem: Partial, transaction: Transaction = {}): Promise { const deleteCommandInput: DeleteCommandInput = { TableName: this.tableName, Key: { [this.keys.pk.attributeName]: this.getPk(partialItem), [this.keys.sk.attributeName]: this.getSk(partialItem), }, ConditionExpression: `attribute_exists(${this.keys.pk.attributeName})`, }; if (transaction.id?.length) { // For transaction block, don't worry if the item being deleted does not exist delete deleteCommandInput.ConditionExpression; // this command will be executed together with // other db write commands in the "transaction block" this.dbManager.addWriteTransactionItem(transaction.id, { Delete: deleteCommandInput, }); return 1; } try { await this.client.delete(deleteCommandInput); } catch (e: any) { if (e && e.name === 'ConditionalCheckFailedException') { return 0; } throw e; } return 1; } /** * Return repo model object from the db value * @param item * @returns */ protected hydrateItem(item: Record): DATA_CLASS { for (const fieldName of Object.keys(item)) { if (this.fieldsAsJsonString.includes(fieldName)) { if (typeof item[fieldName] === 'string') { item[fieldName] = JSON.parse(item[fieldName] as string); } else { throw new InvalidDataFormatError(`Field '${fieldName}' defined as JSON String has a non-string data`); } } } return new this.classRef(item); } protected convertSelectedValuesToJsonString(item: Record) { for (const fieldName of Object.keys(item)) { if (this.fieldsAsJsonString.includes(fieldName)) { item[fieldName] = JSON.stringify(item[fieldName]); } } } /** * Evaluate the partition key value from the partial item * @param item * @returns string * @throw MissingKeyValuesError */ protected getPk(item: Partial): string { return this.getKey(item, this.keys.pk.attributeName); } /** * Evaluate the sort key value from the partial item * @param item * @returns string * * @throw MissingKeyValuesError */ protected getSk(item: Partial): string { return this.getKey(item, this.keys.sk.attributeName); } /** * Evaluate the key value from the * * Example 1: * Input: * - item: {id: foo, name: 'some-name' } * - attributeName: pk * - this.keysFormat = { pk: 'item#{id}', 'sk': '#meta', ... } * Output: * - 'item#foo' * * Example 2: * Input: * - item: {id: foo, name: 'some-name', itemType: 'A' } * - attributeName: sk * - this.keysFormat = { pk: 'item#{id}', 'sk': 'type#{itemType}', ... } * Output: * - 'type#A' * * Example 3: * Input: * - item: {id: foo, name: 'some-name' } * - attributeName: sk * - this.keysFormat = { pk: 'item#{id}', 'sk': 'name-type#{itemType}{name}', ... } * Output: * - Error: "Key field "itemType" must be specified in the input item" * * @param item * @param attributeName * * @returns string * @throw MissingKeyValuesError */ protected getKey(item: Partial, attributeName: keyof KeysFormat): string { let keyFormat = this.keysFormat[attributeName]; if (keyFormat == undefined || !keyFormat.length) { throw new UnknownKeyAttributeError( `Key format not defined or empty for key attribute '${attributeName}' in entity ${this.entityName}`, ); } const matches = keyFormat.match(/{[a-zA-Z\\.]+?}/g); const replacements: { property: string; placeholder: string }[] = !matches ? [] : matches.map((match) => { return { property: match.slice(1, -1), placeholder: match, }; }); for (let i = 0; i < replacements.length; i++) { let value = JSON.parse(JSON.stringify(item)); const fields = replacements[i].property.split('.'); for (let j = 0; j < fields.length; j++) { const field = fields[j]; value = value[field]; if (value === undefined) { throw new MissingKeyValuesError( `Key field "${String(replacements[i].property)}" must be specified in the input item in entity ${ this.entityName }`, ); } } keyFormat = keyFormat.replace(replacements[i].placeholder, String(value ?? '')); } const moreMatches = keyFormat.match(/{[a-zA-Z\\.]+?}/g); if (moreMatches?.length) { throw new MissingKeyValuesError( `Cannot resolve key placeholder(s) for key attribute format '${this.keysFormat[attributeName]} in entity ${ this.entityName }: '${moreMatches.join("','")}'`, ); } return keyFormat; } /** * Validate the data matches with "DATA_MODEL" * @param value * @return void */ private assertValueMatchesModel(value: unknown) { // Trigger AssertionError if model instantiation fails // see the DATA_CLASS model class const obj = new this.classRef(value as Record); const inputProperties = Object.keys(value as object); const modelProperties = Object.keys(obj); const excessProperties = inputProperties.filter((prop) => !modelProperties.includes(prop)); if (excessProperties.length > 0) { throw new InvalidDbSchemaError(`Excess properties in entity ${this.entityName}: ${excessProperties.join(', ')}`); } } }