/*! * @license * Copyright Squiz Australia Pty Ltd. All Rights Reserved. */ import { AttributeValue, DynamoDBClient, QueryCommand, QueryCommandInput, QueryCommandOutput, UpdateItemCommand, UpdateItemCommandInput, } from '@aws-sdk/client-dynamodb'; import { randomInt } from 'crypto'; import { inject } from 'inversify'; import { JOB_EXECUTION_STATUS } from '../../constants/JobExecutionStatus.constants'; import { ENTITY_TYPES } from '../../constants/Repository.constants'; import { PaginationInfo, PaginationQueryResponse, PAGINATION_DIRECTION_TYPE_AFTER, decodeTokenString, } from '../../core/pagination'; import { CreateJobExecutionRequest, JobExecution, JobExecutionDto, JobExecutionSk, ListJobExecutionParameters, UpdateJobExecutionParameters, } from '../../model/JobExecution'; import { provideTransient } from '../../transientProvider'; import { Repository } from '../AbstractRepository'; import { InjectTokens } from '../../constants/InjectTokens'; import { convertDynamoDbItemToJobExecution } from './utils'; @provideTransient(JobExecutionRepository) export class JobExecutionRepository extends Repository { public constructor( @inject(DynamoDBClient) dynamoDbClient: DynamoDBClient, @inject(InjectTokens.DynamoTableName) tableName: string, @inject(InjectTokens.Tenant) tenant?: string, ) { super(dynamoDbClient, tableName, `${ENTITY_TYPES.jobExecution}~${tenant}`); } public async begin(createParams: CreateJobExecutionRequest): Promise { const newJobQueueTime = new Date().toISOString(); const newJobId = `${newJobQueueTime.replace(/[^a-zA-Z0-9 ]/g, '')}-${randomInt(10 ** 9, 10 ** 10 - 1)}`; // append a random 10 digit number on the end of the date string const newJobExecution: JobExecution = { type: `${ENTITY_TYPES.jobExecution}-${createParams.jobName}`, ecsTaskId: null, timeStarted: null, timeFinished: null, input: null, retryCount: 0, lastFailureReason: null, retryAfter: null, id: newJobId, timeQueued: newJobQueueTime, status: JOB_EXECUTION_STATUS.queued, ...createParams, }; return this.create(newJobExecution); } /** * Get list of records from database. * @abstract * @returns {Promise>} The retrieved records. * @memberof JobExecutionRepository */ async list(parameters: ListJobExecutionParameters = {}): Promise> { let indexName: string | undefined = undefined; const keyConditionExpressions = ['#pk = :pk']; const expressionAttributeNames: Record = { '#pk': 'pk', }; const expressionAttributeValues: Record = { ':pk': { S: this.pk }, }; if (parameters.filters) { const { jobName, status } = parameters.filters; if (jobName && status) { indexName = 'lsi1'; expressionAttributeNames['#lsi1sk'] = 'lsi1sk'; expressionAttributeValues[':lsi1sk'] = { S: `${status}~${jobName}` }; keyConditionExpressions.push('#lsi1sk = :lsi1sk'); } else if (jobName) { expressionAttributeNames['#sk'] = 'sk'; expressionAttributeValues[':skPrefix'] = { S: `${jobName}~` }; keyConditionExpressions.push('begins_with(#sk, :skPrefix)'); } else if (status) { indexName = 'lsi1'; expressionAttributeNames['#lsi1sk'] = 'lsi1sk'; expressionAttributeValues[':lsi1skPrefix'] = { S: `${status}~` }; keyConditionExpressions.push('begins_with(#lsi1sk, :lsi1skPrefix)'); } } let startKey: Record | undefined = undefined; /* Indicator of where to paginate from. */ const results: Array> = []; // Iterate through pages until no more results are found. do { const res: QueryCommandOutput = await this.dynamoDbClient.send( new QueryCommand({ ExpressionAttributeNames: expressionAttributeNames, ExpressionAttributeValues: expressionAttributeValues, KeyConditionExpression: keyConditionExpressions.join(' and '), IndexName: indexName, TableName: this.tableName, ScanIndexForward: false, ExclusiveStartKey: startKey, Limit: parameters.limit, ConsistentRead: parameters.consistentRead, }), ); results.push(...(res?.Items || [])); startKey = res?.LastEvaluatedKey; if (parameters.limit !== undefined && results.length >= parameters.limit) { // Truncate list to max requested items, stop querying. results.length = parameters.limit; break; } } while (startKey); return results.map((item) => this.convertToItem(item as JobExecutionDto)); } /** * Get list of records from database. * @abstract * @returns {Promise>} The retrieved records. * @memberof Repository */ async getExecutions(paginationInfo: PaginationInfo, jobName: string): Promise> { const { token, direction } = paginationInfo; const params: QueryCommandInput = { ExpressionAttributeValues: { ':pk': { S: this.pk }, ':skPrefix': { S: `${jobName}~` }, }, KeyConditionExpression: 'pk = :pk and begins_with(sk, :skPrefix)', TableName: this.tableName, }; if (token) { params.ExclusiveStartKey = decodeTokenString(token); } // Executions are sorted in descending order. if (direction === PAGINATION_DIRECTION_TYPE_AFTER) { params.ScanIndexForward = false; } const response = await this.dynamoDbClient.send(new QueryCommand(params)); let items: Array = []; if (response.Items && response.Items.length) { items = response.Items.map((item) => this.convertToItem(item as JobExecutionDto)); } return { items, lastEvaluatedKey: response.LastEvaluatedKey, }; } /** * Update record in database. * @param {string} sk The sort key used to identify the record. * @param {Partial} input The (partial) record to update. * @param {UpdateItemCommandInput} options Additional options for the update command. * @returns {Promise} The updated record. * @memberof JobExecutionRepository */ async update( sk: JobExecutionSk, input: UpdateJobExecutionParameters, options?: { ConditionExpression?: string; ExpressionAttributeValues?: Record; ExpressionAttributeNames?: Record; }, ): Promise { // Initialise update command const updateParams: UpdateItemCommandInput = { Key: { pk: { S: this.pk }, sk: { S: this.buildSk(sk) }, }, ReturnValues: 'ALL_NEW', TableName: this.tableName, }; const expressionAttributeNames: Record = {}; const expressionAttributeValues: Record = {}; const updateExpressionPairs: Array = []; // To avoid conflict with reserved keywords, refer to attributes with an additional '#' prefix for (const key of Object.keys(input)) { const val = input[key as keyof UpdateJobExecutionParameters]; expressionAttributeNames[`#${key}`] = key; if (val === null) { expressionAttributeValues[`:${key}`] = { NULL: true }; } else if (typeof val === 'number') { expressionAttributeValues[`:${key}`] = { N: val.toString() }; } else { expressionAttributeValues[`:${key}`] = { S: val as string }; } updateExpressionPairs.push(`#${key} = :${key}`); if (key === 'status') { expressionAttributeNames[`#lsi1sk`] = 'lsi1sk'; expressionAttributeValues[`:lsi1sk`] = { S: `${val || ''}~${sk.jobName}` }; expressionAttributeNames[`#gsi1pk`] = 'gsi1pk'; expressionAttributeValues[`:gsi1pk`] = { S: `${ENTITY_TYPES.jobExecution}~${val}` }; updateExpressionPairs.push(`#lsi1sk = :lsi1sk`); updateExpressionPairs.push(`#gsi1pk = :gsi1pk`); } } updateParams.ExpressionAttributeNames = expressionAttributeNames; updateParams.ExpressionAttributeValues = expressionAttributeValues; updateParams.UpdateExpression = `SET ${updateExpressionPairs.join(', ')}`; // Add condition expression if provided if (options?.ConditionExpression) { updateParams.ConditionExpression = options.ConditionExpression; } if (options?.ExpressionAttributeValues) { // Merge existing expression attribute values with the ones from options updateParams.ExpressionAttributeValues = { ...updateParams.ExpressionAttributeValues, ...options.ExpressionAttributeValues, }; } // Add expression attribute names if provided if (options?.ExpressionAttributeNames) { updateParams.ExpressionAttributeNames = { ...updateParams.ExpressionAttributeNames, ...options.ExpressionAttributeNames, }; } const updateItemCommand = new UpdateItemCommand(updateParams); const updatedRecord = await this.dynamoDbClient.send(updateItemCommand); return this.convertToItem(updatedRecord.Attributes as JobExecutionDto); } buildSk(sk: JobExecutionSk): string { return `${sk.jobName}~${sk.id}`; } /* istanbul ignore next */ convertToDynamoDbItem(item: JobExecution): JobExecutionDto { return { context: { S: item.context }, ecsTaskId: item.ecsTaskId ? { S: item.ecsTaskId } : { NULL: true }, gsi1pk: { S: `${ENTITY_TYPES.jobExecution}~${item.status}` }, id: { S: item.id }, jobName: { S: item.jobName }, lsi1sk: { S: `${item.status}~${item.jobName}` }, pk: { S: this.pk }, sk: { S: this.buildSk(item) }, status: { S: item.status }, timeFinished: item.timeFinished ? { S: item.timeFinished } : { NULL: true }, timeQueued: { S: item.timeQueued }, timeStarted: item.timeStarted ? { S: item.timeQueued } : { NULL: true }, version: item.version ? { S: item.version } : { NULL: true }, input: item.input ? { S: JSON.stringify(item.input) } : { NULL: true }, retryCount: { N: item.retryCount.toString() }, lastFailureReason: item.lastFailureReason ? { S: item.lastFailureReason } : { NULL: true }, retryAfter: item.retryAfter ? { S: item.retryAfter } : { NULL: true }, }; } convertToItem(dbItem: JobExecutionDto): JobExecution { return convertDynamoDbItemToJobExecution(dbItem); } }