/*! * @license * Copyright Squiz Australia Pty Ltd. All Rights Reserved. */ import { AttributeValue, BatchWriteItemCommand, DynamoDBClient, GetItemCommand, PutItemCommand, QueryCommand, QueryCommandInput, UpdateItemCommand, } from '@aws-sdk/client-dynamodb'; import { faker } from '@faker-js/faker'; import { AwsClientStub, mockClient } from 'aws-sdk-client-mock'; import 'aws-sdk-client-mock-jest'; import { initIocContainer, iocContainer } from '../../../../mocks/mockIoc'; import { InjectTokens } from '../../constants/InjectTokens'; import { JOB_EXECUTION_STATUS } from '../../constants/JobExecutionStatus.constants'; import { ENTITY_TYPES } from '../../constants/Repository.constants'; import { createMockClientGetResponse, createMockClientPutResponse, createMockClientScanResponse, } from '../../core/dynamoClient.mock'; import { PaginationInfo } from '../../core/pagination/model/PaginationInfo'; import { PAGINATION_DIRECTION_TYPE_BEFORE } from '../../core/pagination/model/PaginationTypes'; import * as PaginationUtils from '../../core/pagination/utils/PaginationUtils'; import { JobExecution, UpdateJobExecutionParameters } from '../../model/JobExecution/JobExecution'; import { createMockJobExecution, createMockJobExecutions } from '../../model/JobExecution/mocks/JobExecution.mock'; import { createMockJobExecutionDto, createMockJobExecutionDtos, } from '../../model/JobExecution/mocks/JobExecutionDto.mock'; import { JobExecutionRepository } from './JobExecutionRepository'; initIocContainer(); describe('JobExecutionRepository', (): void => { let dynamoClient: AwsClientStub; let jobExecutionRepository: JobExecutionRepository; let tenant: string; let tableName: string; beforeEach(() => { dynamoClient = mockClient(DynamoDBClient); dynamoClient.reset(); jobExecutionRepository = iocContainer.get(JobExecutionRepository); tenant = iocContainer.get(InjectTokens.Tenant); tableName = iocContainer.get(InjectTokens.DynamoTableName); }); describe('list', () => { it(`should return an empty list`, async (): Promise => { dynamoClient.on(QueryCommand).resolvesOnce(createMockClientScanResponse()); const response = await jobExecutionRepository.list({ filters: { jobName: 'does-not-exist' }, }); expect(response).toStrictEqual([]); expect(dynamoClient).toHaveReceivedCommandTimes(QueryCommand, 1); }); it(`should return a list of existing job executions`, async (): Promise => { const mockJobExecution = createMockJobExecution(); dynamoClient.on(QueryCommand).resolvesOnce( createMockClientScanResponse({ Items: [createMockJobExecutionDto(mockJobExecution)], LastEvaluatedKey: undefined, }), ); const response = await jobExecutionRepository.list(); expect(response).toEqual([mockJobExecution]); expect(dynamoClient).toHaveReceivedCommandWith(QueryCommand, { ExclusiveStartKey: undefined, ExpressionAttributeNames: { '#pk': 'pk' }, ExpressionAttributeValues: { ':pk': { S: `jobExecution~${tenant}` } }, IndexName: undefined, KeyConditionExpression: '#pk = :pk', ScanIndexForward: false, TableName: tableName, }); }); it(`should return a list of existing job executions by name`, async (): Promise => { const mockName = `mock`; const mockJobExecutions = [ createMockJobExecution({ jobName: mockName }), createMockJobExecution({ jobName: mockName }), ]; dynamoClient.on(QueryCommand).resolvesOnce( createMockClientScanResponse({ Items: mockJobExecutions.map((execution) => createMockJobExecutionDto(execution)), LastEvaluatedKey: undefined, }), ); const response = await jobExecutionRepository.list({ filters: { jobName: mockName }, }); expect(response).toEqual(mockJobExecutions); expect(dynamoClient).toHaveReceivedCommandWith(QueryCommand, { ExclusiveStartKey: undefined, ExpressionAttributeNames: { '#pk': 'pk', '#sk': 'sk' }, ExpressionAttributeValues: { ':pk': { S: `jobExecution~${tenant}` }, ':skPrefix': { S: `${mockName}~` }, }, IndexName: undefined, KeyConditionExpression: '#pk = :pk and begins_with(#sk, :skPrefix)', ScanIndexForward: false, TableName: tableName, }); }); it(`should return a list of existing job executions by status`, async (): Promise => { const mockStatus = JOB_EXECUTION_STATUS.running; const mockJobExecutions = [ createMockJobExecution({ status: mockStatus }), createMockJobExecution({ status: mockStatus }), ]; dynamoClient.on(QueryCommand).resolvesOnce( createMockClientScanResponse({ Items: mockJobExecutions.map((execution) => createMockJobExecutionDto(execution)), LastEvaluatedKey: undefined, }), ); const response = await jobExecutionRepository.list({ filters: { status: mockStatus }, }); expect(response).toEqual(mockJobExecutions); expect(dynamoClient).toHaveReceivedCommandWith(QueryCommand, { ExclusiveStartKey: undefined, ExpressionAttributeNames: { '#lsi1sk': 'lsi1sk', '#pk': 'pk', }, ExpressionAttributeValues: { ':lsi1skPrefix': { S: `${mockStatus}~` }, ':pk': { S: `jobExecution~${tenant}` }, }, IndexName: 'lsi1', KeyConditionExpression: '#pk = :pk and begins_with(#lsi1sk, :lsi1skPrefix)', ScanIndexForward: false, TableName: tableName, }); }); it(`should paginate results to return a list of existing job executions`, async (): Promise => { const mockRecordsPage1 = [ createMockJobExecution({ jobName: 'testJob1' }), createMockJobExecution({ jobName: 'testJob1' }), createMockJobExecution({ jobName: 'testJob1' }), ]; const mockRecordsPage2 = [ createMockJobExecution({ jobName: 'testJob2' }), createMockJobExecution({ jobName: 'testJob2' }), createMockJobExecution({ jobName: 'testJob2' }), ]; dynamoClient .on(QueryCommand) .resolvesOnce( createMockClientScanResponse({ Items: mockRecordsPage1.map((execution) => createMockJobExecutionDto(execution)), LastEvaluatedKey: createMockJobExecutionDto(mockRecordsPage1.slice(-1)[0]), }), ) .resolvesOnce( createMockClientScanResponse({ Items: mockRecordsPage2.map((execution) => createMockJobExecutionDto(execution)), LastEvaluatedKey: undefined, }), ); const response = await jobExecutionRepository.list({ filters: { jobName: 'testJob1' }, limit: 4, }); expect(response).toEqual([mockRecordsPage1[0], mockRecordsPage1[1], mockRecordsPage1[2], mockRecordsPage2[0]]); expect(dynamoClient).toHaveReceivedCommandTimes(QueryCommand, 2); expect(dynamoClient).toHaveReceivedCommandWith(QueryCommand, { Limit: 4, }); }); it(`should stop paginating if unexpected result is returned`, async (): Promise => { dynamoClient.on(QueryCommand).resolvesOnce({}); const response = await jobExecutionRepository.list({ filters: { jobName: 'testJob1' }, }); expect(response).toEqual([]); expect(dynamoClient).toHaveReceivedCommandTimes(QueryCommand, 1); }); }); describe('findOne', () => { it(`should return job execution by timeQueued and id and handle any null values`, async (): Promise => { const mockJobExecutionRecord = createMockJobExecutionDto({ ecsTaskId: 'ecs-task-id', timeFinished: new Date().toISOString(), timeStarted: new Date().toISOString(), version: '1.0.0', }); dynamoClient.on(GetItemCommand).resolvesOnce( createMockClientGetResponse({ Item: mockJobExecutionRecord, }), ); const response = await jobExecutionRepository.findOne({ jobName: mockJobExecutionRecord.jobName.S as string, id: mockJobExecutionRecord.id.S as string, }); expect(response).toStrictEqual({ context: mockJobExecutionRecord.context.S as string, ecsTaskId: mockJobExecutionRecord.ecsTaskId.S as string, id: mockJobExecutionRecord.id.S as string, jobName: mockJobExecutionRecord.jobName.S as string, status: mockJobExecutionRecord.status.S as string, timeFinished: mockJobExecutionRecord.timeFinished.S as string, timeQueued: mockJobExecutionRecord.timeQueued.S as string, timeStarted: mockJobExecutionRecord.timeStarted.S as string, type: `jobExecution-${mockJobExecutionRecord.jobName.S as string}`, version: mockJobExecutionRecord.version.S as string, input: mockJobExecutionRecord.input.NULL ? null : JSON.parse(mockJobExecutionRecord.input.S as string), retryCount: 0, lastFailureReason: null, retryAfter: null, } as JobExecution); expect(dynamoClient).toHaveReceivedCommandWith(GetItemCommand, {}); }); it(`should return null if job execution does not exist`, async (): Promise => { dynamoClient.on(GetItemCommand).resolvesOnce( createMockClientGetResponse({ Item: undefined, }), ); const response = await jobExecutionRepository.findOne({ jobName: 'null', id: 'null' }); expect(response).toBe(null); expect(dynamoClient).toHaveReceivedCommandTimes(GetItemCommand, 1); }); }); describe('create', () => { it(`should create a new job execution record and return job execution record in the response`, async (): Promise => { const newJobExecutionToCreate = createMockJobExecution(); const mockResponse = createMockClientPutResponse({ Attributes: { jobName: { S: newJobExecutionToCreate.jobName }, pk: { S: ENTITY_TYPES.jobExecution }, sk: { S: `${newJobExecutionToCreate.jobName}` }, }, }); dynamoClient.on(PutItemCommand).resolvesOnce(mockResponse); const response = await jobExecutionRepository.create(newJobExecutionToCreate); expect(response).toStrictEqual({ ...newJobExecutionToCreate, }); expect(dynamoClient).toHaveReceivedCommandWith(PutItemCommand, {}); }); }); describe('begin', () => { it(`should create a new queued job record`, async (): Promise => { const newJobExecutionToCreate = createMockJobExecution({ status: JOB_EXECUTION_STATUS.queued, ecsTaskId: null, timeFinished: null, timeStarted: null, }); const mockReponse = createMockClientPutResponse({ Attributes: { pk: { S: `${ENTITY_TYPES.jobExecution}-${newJobExecutionToCreate.jobName}` }, sk: { S: `${newJobExecutionToCreate.timeQueued}-${newJobExecutionToCreate.id}` }, name: { S: newJobExecutionToCreate.jobName }, context: { S: newJobExecutionToCreate.context }, version: { S: newJobExecutionToCreate.version }, ecsTaskId: { NULL: true }, timeStarted: { NULL: true }, timeFinished: { NULL: true }, }, }); dynamoClient.on(PutItemCommand).resolvesOnce(mockReponse); const response = await jobExecutionRepository.begin({ context: newJobExecutionToCreate.context, jobName: newJobExecutionToCreate.jobName, version: newJobExecutionToCreate.version, input: newJobExecutionToCreate.input, }); expect(response.id).not.toBeNull(); expect(response.timeQueued).not.toBeNull(); expect(response.ecsTaskId).toBeNull(); expect(response.timeFinished).toBeNull(); expect(response.timeStarted).toBeNull(); expect(response.type).toBe(`jobExecution-${newJobExecutionToCreate.jobName}`); expect(response.jobName).toBe(newJobExecutionToCreate.jobName); expect(response.version).toBe(newJobExecutionToCreate.version); expect(response.input).toBe(newJobExecutionToCreate.input); expect(dynamoClient).toHaveReceivedCommandWith(PutItemCommand, {}); }); }); describe('getExecutions', () => { const jobName = 'something'; const jobExecutionId = faker.string.uuid(); let mockExclusiveStartKey: Record; let mockPaginationInfo: PaginationInfo; let mockParams: QueryCommandInput; beforeEach(() => { mockExclusiveStartKey = { pk: { S: `${ENTITY_TYPES.jobExecution}~${tenant}` }, sk: { S: `${jobName}~${jobExecutionId}` }, }; mockPaginationInfo = { direction: PAGINATION_DIRECTION_TYPE_BEFORE, limit: 100, requestUrl: 'mockUrl', token: 'mockToken', }; mockParams = { ExclusiveStartKey: mockExclusiveStartKey, ExpressionAttributeValues: { ':pk': { S: `${ENTITY_TYPES.jobExecution}~${tenant}` }, ':skPrefix': { S: `${jobName}~` }, }, KeyConditionExpression: 'pk = :pk and begins_with(sk, :skPrefix)', TableName: tableName, }; jest.spyOn(PaginationUtils, 'decodeTokenString').mockReturnValue(mockExclusiveStartKey); }); it('should return an empty array if no executions are found', async () => { dynamoClient.on(QueryCommand).resolvesOnce({ Items: [], LastEvaluatedKey: undefined, }); const result = await jobExecutionRepository.getExecutions(mockPaginationInfo, 'something'); expect(dynamoClient).toHaveReceivedCommandWith(QueryCommand, mockParams); expect(result.items).toEqual([]); expect(result.lastEvaluatedKey).toBe(undefined); }); it('should return an array of job executions if found', async () => { const mockJobExecutionRecords = createMockJobExecutionDtos(); const mockLastEvaluatedKey = { pk: { S: `${ENTITY_TYPES.job}-${'something'}` }, sk: { S: 'mockSk' }, }; dynamoClient.on(QueryCommand).resolvesOnce({ Items: mockJobExecutionRecords, LastEvaluatedKey: mockLastEvaluatedKey, }); const result = await jobExecutionRepository.getExecutions(mockPaginationInfo, 'something'); expect(dynamoClient).toHaveReceivedCommandWith(QueryCommand, mockParams); expect(result.items).toHaveLength(mockJobExecutionRecords.length); expect(result.lastEvaluatedKey).toStrictEqual(mockLastEvaluatedKey); }); }); describe('update', () => { it('Updates the record properties', async () => { const jobExecution = createMockJobExecution({ status: JOB_EXECUTION_STATUS.queued }); const updatedAttributes: UpdateJobExecutionParameters = { input: faker.lorem.words(), ecsTaskId: faker.string.uuid(), version: faker.system.semver(), timeStarted: faker.date.recent().toISOString(), timeFinished: null, context: faker.lorem.words(), }; const updatedJobExecution = createMockJobExecution({ ...jobExecution, ...updatedAttributes, }); dynamoClient.on(UpdateItemCommand).resolvesOnce({ Attributes: createMockJobExecutionDto(updatedJobExecution), }); const result = await jobExecutionRepository.update(jobExecution, updatedAttributes); expect(result).toEqual(updatedJobExecution); expect(dynamoClient).toHaveReceivedCommandWith(UpdateItemCommand, { ExpressionAttributeNames: { '#context': 'context', '#ecsTaskId': 'ecsTaskId', '#input': 'input', '#timeFinished': 'timeFinished', '#timeStarted': 'timeStarted', '#version': 'version', }, ExpressionAttributeValues: { ':context': { S: updatedAttributes.context! }, ':ecsTaskId': { S: updatedAttributes.ecsTaskId! }, ':input': { S: updatedAttributes.input }, ':timeFinished': { NULL: true }, ':timeStarted': { S: updatedAttributes.timeStarted! }, ':version': { S: updatedAttributes.version! }, }, Key: { pk: { S: `jobExecution~${tenant}` }, sk: { S: `${jobExecution.jobName}~${jobExecution.id}` }, }, ReturnValues: 'ALL_NEW', TableName: tableName, UpdateExpression: 'SET #input = :input, #ecsTaskId = :ecsTaskId, #version = :version, #timeStarted = :timeStarted, #timeFinished = :timeFinished, #context = :context', }); }); it('Updates the record keys when the status is updated', async () => { const jobExecution = createMockJobExecution({ status: JOB_EXECUTION_STATUS.queued }); const updatedJobExecution = createMockJobExecution({ ...jobExecution, status: JOB_EXECUTION_STATUS.running, }); dynamoClient.on(UpdateItemCommand).resolvesOnce({ Attributes: createMockJobExecutionDto(updatedJobExecution), }); const result = await jobExecutionRepository.update(jobExecution, { status: updatedJobExecution.status, }); expect(result).toEqual(updatedJobExecution); expect(dynamoClient).toHaveReceivedCommandWith(UpdateItemCommand, { ExpressionAttributeNames: { '#gsi1pk': 'gsi1pk', '#lsi1sk': 'lsi1sk', '#status': 'status', }, ExpressionAttributeValues: { ':gsi1pk': { S: `jobExecution~${updatedJobExecution.status}` }, ':lsi1sk': { S: `${updatedJobExecution.status}~${updatedJobExecution.jobName}` }, ':status': { S: updatedJobExecution.status }, }, Key: { pk: { S: `jobExecution~${tenant}` }, sk: { S: `${jobExecution.jobName}~${jobExecution.id}` }, }, ReturnValues: 'ALL_NEW', TableName: tableName, UpdateExpression: 'SET #status = :status, #lsi1sk = :lsi1sk, #gsi1pk = :gsi1pk', }); }); }); describe('batchDelete', () => { it('should delete the requested dynamo records', async () => { dynamoClient.on(BatchWriteItemCommand).resolves({}); const jobExecutions = createMockJobExecutions(3); const result = await jobExecutionRepository.batchDelete(jobExecutions); expect(result).toEqual({ unprocessedItems: [] }); expect(dynamoClient).toHaveReceivedCommandWith(BatchWriteItemCommand, { RequestItems: { [tableName]: [ { DeleteRequest: { Key: { pk: { S: `jobExecution~${tenant}` }, sk: { S: `${jobExecutions[0].jobName}~${jobExecutions[0].id}` }, }, }, }, { DeleteRequest: { Key: { pk: { S: `jobExecution~${tenant}` }, sk: { S: `${jobExecutions[1].jobName}~${jobExecutions[1].id}` }, }, }, }, { DeleteRequest: { Key: { pk: { S: `jobExecution~${tenant}` }, sk: { S: `${jobExecutions[2].jobName}~${jobExecutions[2].id}` }, }, }, }, ], }, }); }); it('should return without attempting to delete if an empty array of SK is provided', async () => { const result = await jobExecutionRepository.batchDelete([]); expect(result).toEqual({ unprocessedItems: [] }); expect(dynamoClient).not.toHaveReceivedCommand(BatchWriteItemCommand); }); it('should return the unprocessed SK items if some items fail to delete', async () => { const jobExecutions = createMockJobExecutions(3); dynamoClient.on(BatchWriteItemCommand).resolves({ UnprocessedItems: { [tableName]: [ { DeleteRequest: { Key: { pk: { S: `jobExecution~${tenant}` }, sk: { S: `${jobExecutions[1].jobName}~${jobExecutions[1].id}` }, }, }, }, ], }, }); const result = await jobExecutionRepository.batchDelete(jobExecutions); expect(result).toEqual({ unprocessedItems: [jobExecutions[1]] }); }); }); });