/*! * @license * Copyright Squiz Australia Pty Ltd. All Rights Reserved. */ import { AttributeValue, DynamoDBClient, QueryCommand, QueryCommandInput, QueryCommandOutput, } from '@aws-sdk/client-dynamodb'; import { inject } from 'inversify'; import { InjectTokens } from '../../constants/InjectTokens'; import { ENTITY_TYPES } from '../../constants/Repository.constants'; import { getRuntimeImageOrDefault } from '../../constants/RuntimeImage.constants'; import { decodeTokenString, PAGINATION_DIRECTION_TYPE_BEFORE, PaginationInfo, PaginationQueryResponse, } from '../../core/pagination'; import { CreateJobRequest, Job, JobDto, JobSk, ListJobParameters } from '../../model/Job'; import { parsePartitionKey } from '../../ParsePartitionKey'; import { provideTransient } from '../../transientProvider'; import { Repository } from '../AbstractRepository'; @provideTransient(JobRepository) export class JobRepository extends Repository { public constructor( @inject(DynamoDBClient) dynamoDbClient: DynamoDBClient, @inject(InjectTokens.DynamoTableName) tableName: string, @inject(InjectTokens.Tenant) tenant: string, ) { super(dynamoDbClient, tableName, `${ENTITY_TYPES.job}~${tenant}`); } public async create(input: CreateJobRequest): Promise { const newJob: Job = { ...input, image: getRuntimeImageOrDefault(input.image), type: ENTITY_TYPES.job, }; return super.create(newJob); } /** * Get list of records from database. * @abstract * @returns {Promise>} The retrieved records. * @memberof Repository */ async list(parameters: ListJobParameters = {}): Promise> { const keyConditionExpressions = ['#pk = :pk']; const filterExpressions: Array = []; const expressionAttributeNames: Record = { '#pk': 'pk', }; const expressionAttributeValues: Record = { ':pk': { S: this.pk }, }; if (parameters.filters) { const { name, version } = parameters.filters; if (name && version) { expressionAttributeNames['#sk'] = 'sk'; expressionAttributeValues[':sk'] = { S: `${name}~${version}` }; keyConditionExpressions.push(`#sk = :sk`); } else if (name) { expressionAttributeNames['#sk'] = 'sk'; expressionAttributeValues[':skPrefix'] = { S: `${name}~` }; keyConditionExpressions.push(`begins_with(#sk, :skPrefix)`); } else if (version) { expressionAttributeNames['#version'] = 'version'; expressionAttributeValues[':version'] = { S: version }; filterExpressions.push(`#version = :version`); } } // Apply pagination for the scan. // By default, the Scan operation processes data sequentially. // DynamoDB returns data in increments of 1 MB increments. let startKey: Record | undefined = undefined; /* Indicator of where to paginate from. */ const results: Array> = []; // Iterate through pages until no more results are found. do { const res: QueryCommandOutput = await this.dynamoDbClient.send( new QueryCommand({ ExpressionAttributeNames: expressionAttributeNames, ExpressionAttributeValues: expressionAttributeValues, KeyConditionExpression: keyConditionExpressions.join(' and '), FilterExpression: filterExpressions.length > 0 ? filterExpressions.join(' and ') : undefined, TableName: this.tableName, ExclusiveStartKey: startKey, Limit: parameters.limit, ConsistentRead: parameters.consistentRead, }), ); results.push(...(res?.Items || [])); startKey = res?.LastEvaluatedKey; if (parameters.limit !== undefined && results.length >= parameters.limit) { // Truncate list to max requested items, stop querying. results.length = parameters.limit; break; } } while (startKey); return results.length ? results.map((item) => this.convertToItem(item as JobDto)) : []; } /** * Get list of records from database. * @abstract * @returns {Promise>} The retrieved records. * @memberof Repository */ async getJobs(paginationInfo: PaginationInfo): Promise> { const { token, direction } = paginationInfo; const params: QueryCommandInput = { ExpressionAttributeValues: { ':pk': { S: this.pk }, }, KeyConditionExpression: 'pk = :pk', TableName: this.tableName, }; if (token) { params.ExclusiveStartKey = decodeTokenString(token); } if (direction === PAGINATION_DIRECTION_TYPE_BEFORE) { params.ScanIndexForward = false; } const response = await this.dynamoDbClient.send(new QueryCommand(params)); let items: Array = []; if (response.Items && response.Items.length) { items = response.Items.map((item) => this.convertToItem(item as JobDto)); } return { items, lastEvaluatedKey: response.LastEvaluatedKey, }; } convertToDynamoDbItem(item: Job): JobDto { return { pk: { S: this.pk }, sk: { S: this.buildSk(item) }, name: { S: item.name }, description: item.description ? { S: item.description } : { NULL: true }, version: { S: item.version }, image: { S: item.image }, }; } convertToItem(dbItem: JobDto): Job { const { pk, name, version } = dbItem; const image = (dbItem as Partial).image?.S; let description; // TODO https://jira.squiz.net/browse/CONN-609 remove this check if we tidy up // jobs added before this field was implemented if (!dbItem.description) { description = { S: null }; } else { description = dbItem.description; } return { name: name.S as string, description: description.S ? (description.S as string) : '', type: parsePartitionKey(pk.S as string).entityType, version: version.S as string, image: getRuntimeImageOrDefault(image), }; } protected buildSk(sk: JobSk): string { return `${sk.name}~${sk.version}`; } }