import { ConflictException, ECSClient, ResourceNotFoundException, RunTaskCommand } from '@aws-sdk/client-ecs'; import { faker } from '@faker-js/faker'; import { AwsClientStub, mockClient } from 'aws-sdk-client-mock'; import 'aws-sdk-client-mock-jest'; import { JobDetailsRepository, JobUploadsRepository } from '../../S3Repository'; import { JOB_EXECUTION_STATUS } from '../../constants/JobExecutionStatus.constants'; import { RuntimeImageTaskDefinitionError } from '../../errors'; import { createMockJobManifest } from '../../manifest'; import { createMockJobContext } from '../../model/JobContext'; import { createMockJobExecution } from '../../model/JobExecution'; import { JobExecutionRepository } from '../../repository'; import { JobContextService } from '../JobContextService/JobContextService'; import { JobManifestService } from '../JobManifestService/JobManifestService'; import { JobExecutionTaskService } from './JobExecutionTaskService'; import { RunJobTaskParams } from './JobExecutionTaskService.defs'; describe('JobExecutionTaskService', () => { let mockEcsClient: AwsClientStub; let mockJobManifestService: jest.Mocked; let mockJobContextService: jest.Mocked; let mockJobExecutionRepository: jest.Mocked; let mockJobDetailsRepository: jest.Mocked; let mockJobUploadRepository: jest.Mocked; let jobExecutionTaskService: JobExecutionTaskService; const jobDetailsSignedUrl = faker.internet.url(); const jobCodeArtifactSignedUrl = faker.internet.url(); const defaultRunJobTaskParams: RunJobTaskParams = { jobExecution: createMockJobExecution(), awsRegion: 'ap-southeast-2', shortRegion: 'au', shortEnvironment: 'dev', clusterArn: `clusterArn`, taskDefinitionArn: `legacyTaskDefinitionArn`, taskDefinitionArnsByRuntimeImage: { 'node:20': 'node20TaskDefinitionArn', 'node:24': 'node24TaskDefinitionArn', }, tenantId: 'some_tenant', }; beforeEach(() => { mockEcsClient = mockClient(ECSClient); mockJobManifestService = { findJobManifest: jest.fn(), } as unknown as jest.Mocked; mockJobContextService = { findJobContext: jest.fn(), } as unknown as jest.Mocked; mockJobExecutionRepository = { list: jest.fn(), } as unknown as jest.Mocked; mockJobDetailsRepository = { createJobDetailsFile: jest.fn(), getS3SignedUrl: jest.fn().mockResolvedValue(jobDetailsSignedUrl), } as unknown as jest.Mocked; mockJobUploadRepository = { getS3SignedUrl: jest.fn().mockResolvedValue(jobCodeArtifactSignedUrl), } as unknown as jest.Mocked; jobExecutionTaskService = new JobExecutionTaskService( new ECSClient({}), mockJobManifestService, mockJobContextService, mockJobExecutionRepository, mockJobDetailsRepository, mockJobUploadRepository, ); }); describe(`hasReachedMaxTenantConcurrency`, () => { it('should return false if number of running jobs is less than max', async (): Promise => { const maxTenantConcurrentTasks = 3; mockJobExecutionRepository.list.mockResolvedValue([createMockJobExecution(), createMockJobExecution()]); const response = await jobExecutionTaskService.hasReachedMaxTenantConcurrency({ maxTenantConcurrentTasks, }); expect(response).toBe(false); expect(mockJobExecutionRepository.list).toHaveBeenCalledWith({ limit: maxTenantConcurrentTasks, consistentRead: true, filters: { status: JOB_EXECUTION_STATUS.running }, }); }); it('should return true if number of running jobs is more than max', async (): Promise => { const maxTenantConcurrentTasks = 1; mockJobExecutionRepository.list.mockResolvedValue([createMockJobExecution(), createMockJobExecution()]); const response = await jobExecutionTaskService.hasReachedMaxTenantConcurrency({ maxTenantConcurrentTasks, }); expect(response).toBe(true); expect(mockJobExecutionRepository.list).toHaveBeenCalledWith({ limit: maxTenantConcurrentTasks, consistentRead: true, filters: { status: JOB_EXECUTION_STATUS.running }, }); }); }); describe(`runJobExecutionTask`, () => { it('should return task arn after starting an ECS task', async (): Promise => { const mockArn = `mock-arn`; const mockTask = { taskArn: mockArn }; const jobContext = createMockJobContext(); const jobManifest = createMockJobManifest({ image: 'node:24' }); const jobExecution = createMockJobExecution({ id: `20250917T005949395Z-9602417782`, jobName: 'jobName', version: 'jobVersion', }); mockEcsClient.on(RunTaskCommand).resolves({ tasks: [mockTask] }); mockJobContextService.findJobContext.mockResolvedValue(jobContext); mockJobManifestService.findJobManifest.mockResolvedValue(jobManifest); const response = await jobExecutionTaskService.runJobExecutionTask({ ...defaultRunJobTaskParams, jobExecution: jobExecution, }); expect(response).toBe(mockArn); expect(mockEcsClient).toHaveReceivedCommandWith(RunTaskCommand, { cluster: 'clusterArn', overrides: { containerOverrides: [ { name: 'job-runtime-container', environment: [ { name: 'JOB_NAME', value: jobExecution.jobName, }, { name: 'JOB_EXECUTION_ID', value: jobExecution.id, }, { name: 'AWS_REGION', value: defaultRunJobTaskParams.awsRegion, }, { name: 'TENANT_ID', value: defaultRunJobTaskParams.tenantId, }, { name: 'SHORT_ENVIRONMENT', value: defaultRunJobTaskParams.shortEnvironment, }, { name: 'SHORT_REGION', value: defaultRunJobTaskParams.shortRegion, }, { name: 'JOB_CODE_ARTIFACT_URL', value: jobCodeArtifactSignedUrl, }, { name: 'JOB_DETAILS_FILE_URL', value: jobDetailsSignedUrl, }, ], }, ], }, tags: [ { key: 'Environment', value: defaultRunJobTaskParams.shortEnvironment }, { key: 'Region', value: defaultRunJobTaskParams.awsRegion }, { key: 'Product', value: 'job-runner' }, { key: 'Department', value: 'platform services team' }, { key: 'Purpose', value: 'job runner service' }, { key: 'map-migrated', value: 'migUJ1IX3DUXG' }, { key: 'cmp:tenant', value: 'some_tenant' }, ], taskDefinition: defaultRunJobTaskParams.taskDefinitionArnsByRuntimeImage['node:24'], // sha1 of execution job name/id + tenant + retryCount. clientToken: '4725755e08138edde031a177c36a269ecdbed5a3', }); expect(mockJobDetailsRepository.createJobDetailsFile).toHaveBeenCalledWith( defaultRunJobTaskParams.tenantId, jobContext, jobManifest, jobExecution, ); expect(mockJobUploadRepository.getS3SignedUrl).toHaveBeenCalledWith( `${defaultRunJobTaskParams.tenantId}/${jobExecution.jobName}/${jobExecution.version}/export.zip`, ); }); it('should run the task if the job manifest and context do not exist', async () => { // We don't allow deleting jobs but contexts can be deleted. Allow the job to be run even if they have been // deleted and error inside of the container so that: // 1. We don't block the queue with potentially broken executions. // 2. Job status/logs more clearly indicate the job is broken rather than potentially remaining queued. const mockArn = `mock-arn`; mockEcsClient.on(RunTaskCommand).resolves({ tasks: [{ taskArn: mockArn }] }); mockJobContextService.findJobContext.mockResolvedValue(null); mockJobManifestService.findJobManifest.mockResolvedValue(null); await jobExecutionTaskService.runJobExecutionTask(defaultRunJobTaskParams); expect(mockJobDetailsRepository.createJobDetailsFile).toHaveBeenCalledWith( defaultRunJobTaskParams.tenantId, null, null, defaultRunJobTaskParams.jobExecution, ); expect(mockEcsClient).toHaveReceivedCommandWith(RunTaskCommand, { taskDefinition: defaultRunJobTaskParams.taskDefinitionArnsByRuntimeImage['node:20'], }); }); it('should use the legacy task definition ARN when the node:20 map entry is missing', async () => { const mockArn = `mock-arn`; mockEcsClient.on(RunTaskCommand).resolves({ tasks: [{ taskArn: mockArn }] }); mockJobContextService.findJobContext.mockResolvedValue(createMockJobContext()); mockJobManifestService.findJobManifest.mockResolvedValue(createMockJobManifest({ image: 'node:20' })); await jobExecutionTaskService.runJobExecutionTask({ ...defaultRunJobTaskParams, taskDefinitionArnsByRuntimeImage: {}, }); expect(mockEcsClient).toHaveReceivedCommandWith(RunTaskCommand, { taskDefinition: defaultRunJobTaskParams.taskDefinitionArn, }); }); it('should throw before creating an ECS task when the runtime image is unsupported', async () => { mockJobManifestService.findJobManifest.mockResolvedValue(createMockJobManifest({ image: 'node:26' })); await expect(jobExecutionTaskService.runJobExecutionTask(defaultRunJobTaskParams)).rejects.toThrow( RuntimeImageTaskDefinitionError, ); expect(mockEcsClient).not.toHaveReceivedCommand(RunTaskCommand); expect(mockJobDetailsRepository.createJobDetailsFile).not.toHaveBeenCalled(); }); it('should throw before creating an ECS task when the runtime image has no task definition ARN', async () => { mockJobManifestService.findJobManifest.mockResolvedValue(createMockJobManifest({ image: 'node:24' })); await expect( jobExecutionTaskService.runJobExecutionTask({ ...defaultRunJobTaskParams, taskDefinitionArnsByRuntimeImage: { 'node:20': 'node20TaskDefinitionArn', }, }), ).rejects.toThrow(RuntimeImageTaskDefinitionError); expect(mockEcsClient).not.toHaveReceivedCommand(RunTaskCommand); expect(mockJobDetailsRepository.createJobDetailsFile).not.toHaveBeenCalled(); }); it('should handle ConflictException with a previously started task arn', async () => { const mockArn = `mock-arn`; const jobContext = createMockJobContext(); const jobManifest = createMockJobManifest(); const jobExecution = createMockJobExecution({ id: `20250917T005949395Z-9602417782`, jobName: 'jobName', version: 'jobVersion', }); mockJobContextService.findJobContext.mockResolvedValue(jobContext); mockJobManifestService.findJobManifest.mockResolvedValue(jobManifest); mockEcsClient.on(RunTaskCommand).rejects( new ConflictException({ message: 'The RunTask request could not be processed due to conflicts.', resourceIds: [mockArn], $metadata: {}, }), ); const response = await jobExecutionTaskService.runJobExecutionTask({ ...defaultRunJobTaskParams, jobExecution: jobExecution, }); expect(response).toBe(mockArn); }); it('should handle ConflictException with no previously started task arn', async () => { const jobContext = createMockJobContext(); const jobManifest = createMockJobManifest(); const jobExecution = createMockJobExecution({ id: `20250917T005949395Z-9602417782`, jobName: 'jobName', version: 'jobVersion', }); mockJobContextService.findJobContext.mockResolvedValue(jobContext); mockJobManifestService.findJobManifest.mockResolvedValue(jobManifest); mockEcsClient.on(RunTaskCommand).rejects( new ConflictException({ message: 'The RunTask request could not be processed due to conflicts.', resourceIds: [], $metadata: {}, }), ); const response = await jobExecutionTaskService.runJobExecutionTask({ ...defaultRunJobTaskParams, jobExecution: jobExecution, }); // A ConflictException may be thrown without returning the ARN of the task. expect(response).toBe(undefined); }); it('should handle when ECS returns no task ARNs or failures', async () => { const jobContext = createMockJobContext(); const jobManifest = createMockJobManifest(); const jobExecution = createMockJobExecution({ id: `20250917T005949395Z-9602417782`, jobName: 'jobName', version: 'jobVersion', }); mockJobContextService.findJobContext.mockResolvedValue(jobContext); mockJobManifestService.findJobManifest.mockResolvedValue(jobManifest); mockEcsClient.on(RunTaskCommand).resolves({ tasks: [], failures: [] }); const response = await jobExecutionTaskService.runJobExecutionTask({ ...defaultRunJobTaskParams, jobExecution: jobExecution, }); // Similar to a ConflictException being thrown. ECS appears to sometimes return an empty tasks and failures // array when the idempotency key is used. expect(response).toBe(undefined); }); it('should throw when ECS returns no task ARNs and has failures', async () => { const jobContext = createMockJobContext(); const jobManifest = createMockJobManifest(); const jobExecution = createMockJobExecution({ id: `20250917T005949395Z-9602417782`, jobName: 'jobName', version: 'jobVersion', }); const failures = [ { arn: faker.string.uuid(), detail: 'Insufficient capacity to start job', reason: faker.lorem.words(), }, ]; mockJobContextService.findJobContext.mockResolvedValue(jobContext); mockJobManifestService.findJobManifest.mockResolvedValue(jobManifest); mockEcsClient.on(RunTaskCommand).resolves({ tasks: [], failures }); await expect( jobExecutionTaskService.runJobExecutionTask({ ...defaultRunJobTaskParams, jobExecution: jobExecution, }), ).rejects.toThrow( `Execution start failed: ${JSON.stringify(failures)} (id: "20250917T005949395Z-9602417782", job: jobName, tenant: some_tenant)`, ); }); it('should re-throw unhandled errors when starting a task', async () => { const expectedError = new ResourceNotFoundException({ message: faker.lorem.words(), $metadata: {} }); const jobContext = createMockJobContext(); const jobManifest = createMockJobManifest(); const jobExecution = createMockJobExecution({ id: `20250917T005949395Z-9602417782`, jobName: 'jobName', version: 'jobVersion', }); mockJobContextService.findJobContext.mockResolvedValue(jobContext); mockJobManifestService.findJobManifest.mockResolvedValue(jobManifest); mockEcsClient.on(RunTaskCommand).rejects(expectedError); await expect( jobExecutionTaskService.runJobExecutionTask({ ...defaultRunJobTaskParams, jobExecution: jobExecution, }), ).rejects.toThrow(expectedError); }); }); });