/*! * @license * Copyright Squiz Australia Pty Ltd. All Rights Reserved. */ import { injectable } from 'inversify'; import 'reflect-metadata'; import { AttributeValue, BatchWriteItemCommand, DeleteItemCommand, DeleteItemCommandInput, DynamoDBClient, GetItemCommand, GetItemCommandInput, PutItemCommand, QueryCommand, QueryCommandOutput, UpdateItemCommand, UpdateItemCommandInput, } from '@aws-sdk/client-dynamodb'; import { BaseDto, BaseItem, ListParameters } from '../../job/model/Base/Base'; /** * Generic abstract class for representing repositories interacting with DynamoDB. * @export * @abstract * @class Repository * @template T The entity definition. * @template U The corresponding DB record attributes for the entity. */ @injectable() export abstract class Repository> { public constructor( protected dynamoDbClient: DynamoDBClient, protected tableName: string, protected pk: string, ) {} /** * Save item into database. * @abstract * @param {*} input The item to store. * @returns {Promise} The item stored. * @memberof Repository */ async create(input: T): Promise { const dynamoDbRecord = this.convertToDynamoDbItem(input); await this.dynamoDbClient.send( new PutItemCommand({ Item: dynamoDbRecord, TableName: this.tableName, }), ); return input; } /** * Get one record from database. * @abstract * @param {string} sk The sort key used to identify the record. * @returns {Promise} The retrieved record. * @memberof Repository */ async findOne(sk: SK): Promise { const params: GetItemCommandInput = { Key: { pk: { S: this.pk }, sk: { S: this.buildSk(sk) }, }, TableName: this.tableName, }; const res = await this.dynamoDbClient.send(new GetItemCommand(params)); return res.Item ? this.convertToItem(res.Item as U) : null; } /** * Get list of records from database. * @abstract * @returns {Promise>} The retrieved records. * @memberof Repository */ async list(parameters: ListParameters = {}): Promise> { // Apply pagination for the scan. // By default, the Scan operation processes data sequentially. // DynamoDB returns data in increments of 1 MB increments. let startKey: Record | undefined = undefined; /* Indicator of where to paginate from. */ const results: Array> = []; // Iterate through pages until no more results are found. do { const res: QueryCommandOutput = await this.dynamoDbClient.send( new QueryCommand({ ExpressionAttributeNames: { '#pk': 'pk', }, ExpressionAttributeValues: { ':pk': { S: this.pk }, }, KeyConditionExpression: '#pk = :pk', TableName: this.tableName, ExclusiveStartKey: startKey, Limit: parameters.limit, ConsistentRead: parameters.consistentRead, }), ); results.push(...(res.Items ?? [])); startKey = res.LastEvaluatedKey; if (parameters.limit !== undefined && results.length >= parameters.limit) { // Truncate list to max requested items, stop querying. results.length = parameters.limit; break; } } while (startKey); return results.length ? results.map((item) => this.convertToItem(item as U)) : []; } /** * Update record in database. * @param {string} sk The sort key used to identify the record. * @param {Partial} input The (partial) record to update (properties used in the SK cannot be updated). * @returns {Promise} The updated record. * @memberof Repository */ async update(sk: SK, input: Partial>): Promise { // Initialise update command const updateParams: UpdateItemCommandInput = { Key: { pk: { S: this.pk }, sk: { S: this.buildSk(sk) }, }, ReturnValues: 'ALL_NEW', TableName: this.tableName, }; const expressionAttributeNames: Record = {}; const expressionAttributeValues: Record = {}; // To avoid conflict with reserved keywords, refer to attributes with an additional '#' prefix for (const key of Object.keys(input)) { if (input[key as keyof Partial>] === null) { expressionAttributeNames[`#${key}`] = key; expressionAttributeValues[`:${key}`] = { NULL: true }; } else { const val: string = input[key as keyof Partial>] as string; expressionAttributeNames[`#${key}`] = key; expressionAttributeValues[`:${key}`] = { S: val }; } } const updateExpressionPairs = Object.keys(input).map((key) => `#${key} = :${key}`); const updateExpression = `SET ${updateExpressionPairs.join(', ')}`; updateParams.ExpressionAttributeNames = expressionAttributeNames; updateParams.ExpressionAttributeValues = expressionAttributeValues; updateParams.UpdateExpression = updateExpression; const updateItemCommand = new UpdateItemCommand(updateParams); const updatedRecord = await this.dynamoDbClient.send(updateItemCommand); return this.convertToItem(updatedRecord.Attributes as U); } /** * Delete record from database. * @abstract * @param {string} sk The sort key used to identify the record. * @returns {void} * @memberof Repository */ async delete(sk: T | SK): Promise { const deleteCommandParams: DeleteItemCommandInput = { Key: { pk: { S: this.pk }, sk: { S: this.buildSk(sk) }, }, TableName: this.tableName, }; await this.dynamoDbClient.send(new DeleteItemCommand(deleteCommandParams)); return; } async batchDelete(sk: Array): Promise<{ unprocessedItems: Array }> { if (sk.length === 0) { return { unprocessedItems: [] }; } const skMap: Record = {}; sk.forEach((sk) => { skMap[this.buildSk(sk)] = sk; }); const result = await this.dynamoDbClient.send( new BatchWriteItemCommand({ RequestItems: { [this.tableName]: Object.keys(skMap).map((sk) => ({ DeleteRequest: { Key: { pk: { S: this.pk }, sk: { S: sk }, }, }, })), }, }), ); return { unprocessedItems: result.UnprocessedItems?.[this.tableName]?.map((item) => { const sk = item.DeleteRequest?.Key?.sk.S; // eslint-disable-next-line @typescript-eslint/no-non-null-assertion return skMap[sk!]; }) || [], }; } /** * Convert the DynanomDB attributes of the entity into a DynamoDB item. * @abstract * @param {T} item The DynamoDB item attributes to convert. * @returns {Record} The DynamoDB record. * @memberof Repository */ protected abstract convertToDynamoDbItem(item: T): Record; /** * Convert DynamoDB item to entity. * @abstract * @param {Record} record The record to convert. * @returns {T} The converted entity. * @memberof Repository */ protected abstract convertToItem(record: U): T; /** * Converts a sort key to a string for querying/putting in Dynamo. * @param {SK} sk * @return {string} * @protected */ protected abstract buildSk(sk: T | SK): string; }