/*! * @license * Copyright Squiz Australia Pty Ltd. All Rights Reserved. */ import { ConflictException, ECSClient, RunTaskCommand } from '@aws-sdk/client-ecs'; import { createHash } from 'node:crypto'; import { JobDetailsRepository, JobUploadsRepository } from '../../S3Repository'; import { JOB_EXECUTION_STATUS } from '../../constants/JobExecutionStatus.constants'; import { JobManifest } from '../../manifest'; import { JobExecution } from '../../model/JobExecution'; import { JobExecutionRepository } from '../../repository'; import { getAwsTags, resolveRuntimeImageTaskDefinitionArn } from '../../utils'; import { JobContextService } from '../JobContextService/JobContextService'; import { JobManifestService } from '../JobManifestService/JobManifestService'; import { HasReachedMaxTenantConcurrencyParams, RunJobTaskParams } from './JobExecutionTaskService.defs'; export class JobExecutionTaskService { constructor( private readonly ecsClient: ECSClient, private readonly jobManifestService: JobManifestService, private readonly jobContextService: JobContextService, private readonly jobExecutionRepository: JobExecutionRepository, private readonly jobDetailsRepository: JobDetailsRepository, private readonly jobUploadRepository: JobUploadsRepository, ) {} public async hasReachedMaxTenantConcurrency(params: HasReachedMaxTenantConcurrencyParams): Promise { const currentRunningExecutions = await this.jobExecutionRepository.list({ consistentRead: true, limit: params.maxTenantConcurrentTasks, filters: { status: JOB_EXECUTION_STATUS.running, }, }); return currentRunningExecutions.length >= params.maxTenantConcurrentTasks; } public async hasReachedMaxJobConcurrency(jobExecution: JobExecution): Promise { const jobManifest = await this.jobManifestService.findJobManifest(jobExecution.jobName, jobExecution.version); if (!jobManifest || typeof jobManifest.concurrency !== 'number') { return false; } const runningExecutions = await this.jobExecutionRepository.list({ consistentRead: true, limit: jobManifest.concurrency, filters: { jobName: jobExecution.jobName, status: JOB_EXECUTION_STATUS.running, }, }); return runningExecutions.length >= jobManifest.concurrency; } public async runJobExecutionTask(params: RunJobTaskParams): Promise { const jobManifest = await this.findJobManifest(params.jobExecution); const taskDefinitionArn = resolveRuntimeImageTaskDefinitionArn({ image: jobManifest?.image, taskDefinitionArnsByRuntimeImage: params.taskDefinitionArnsByRuntimeImage, legacyTaskDefinitionArn: params.taskDefinitionArn, }); const jobDetailsFileUrl = await this.createJobDetailsFile(params.tenantId, params.jobExecution, jobManifest); const tags = getAwsTags({ tenant: params.tenantId, region: params.awsRegion, environment: params.shortEnvironment, }); const environmentVariables = [ { name: 'JOB_NAME', value: params.jobExecution.jobName, }, { name: 'JOB_EXECUTION_ID', value: params.jobExecution.id, }, { name: 'AWS_REGION', value: params.awsRegion, }, { name: 'TENANT_ID', value: params.tenantId, }, { name: 'SHORT_ENVIRONMENT', value: params.shortEnvironment, }, { name: 'SHORT_REGION', value: params.shortRegion, }, { name: 'JOB_CODE_ARTIFACT_URL', value: await this.jobUploadRepository.getS3SignedUrl( `${params.tenantId}/${params.jobExecution.jobName}/${params.jobExecution.version}/export.zip`, ), }, { name: 'JOB_DETAILS_FILE_URL', value: jobDetailsFileUrl, }, ]; try { const result = await this.ecsClient.send( new RunTaskCommand({ cluster: params.clusterArn, taskDefinition: taskDefinitionArn, overrides: { containerOverrides: [ { name: 'job-runtime-container', environment: environmentVariables, }, ], }, tags: Object.entries(tags).map(([key, value]) => ({ key, value, })), clientToken: this.computeJobExecutionClientToken(params), }), ); if (!result.tasks?.length) { if (result.failures?.length) { // Task has probably already started. // ECS can sometimes return no tasks/failures if attempting to start // the same task multiple times in quick succession: // * {"tasks":[],"failures":[],"$metadata":{"httpStatusCode":200,...}} throw new Error( `Execution start failed: ${JSON.stringify(result.failures)} ` + `(id: "${params.jobExecution.id}", job: ${params.jobExecution.jobName}, tenant: ${params.tenantId})`, ); } else { // Task has probably already started. // ECS can sometimes return no tasks/failures if attempting to start // the same task multiple times in quick succession: // * {"tasks":[],"failures":[],"$metadata":{"httpStatusCode":200,...}} console.warn( `Execution start returned no ECS task ID` + ` (id: "${params.jobExecution.id}", job: ${params.jobExecution.jobName}, tenant: ${params.tenantId})`, JSON.stringify(result), ); return undefined; } } // This task ARN can be used to terminate the job task. // If no error is returned, assume task has started. // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const task = result.tasks![0]; return task.taskArn; } catch (error) { // Task already started with the provided "clientToken", return the previously started task ARN. if (error instanceof ConflictException) { if (error.resourceIds?.[0]) { console.warn( `Execution already started, returning previous task ARN: ${error.resourceIds[0]} ` + `(id: "${params.jobExecution.id}", job: ${params.jobExecution.jobName}, tenant: ${params.tenantId})`, ); return error.resourceIds[0]; } else { // In some instances the previous task ARN isn't returned. console.warn( `Execution already started, no previous task ARN returned ` + `(id: "${params.jobExecution.id}", job: ${params.jobExecution.jobName}, tenant: ${params.tenantId})`, ); return undefined; } } throw error; } } private computeJobExecutionClientToken(params: RunJobTaskParams): string { // https://docs.aws.amazon.com/AmazonECS/latest/APIReference/ECS_Idempotency.html#RunTaskIdempotency // Include retryCount so that retry attempts get a unique clientToken and create new tasks // Without this, ECS would return the old failed task ARN instead of creating a new task const retryCount = params.jobExecution.retryCount ?? 0; const key = `jobExecution~${params.tenantId}~${params.jobExecution.jobName}~${params.jobExecution.id}~${retryCount}`; return createHash('sha1').update(key).digest('hex'); } private async createJobDetailsFile( tenantId: string, executionToStart: JobExecution, jobManifest: JobManifest | null, ): Promise { const jobContext = await this.jobContextService.findJobContext(executionToStart.context); const jobDetailsFileName = await this.jobDetailsRepository.createJobDetailsFile( tenantId, jobContext, jobManifest, executionToStart, ); return await this.jobDetailsRepository.getS3SignedUrl(jobDetailsFileName); } private async findJobManifest(executionToStart: JobExecution): Promise { return await this.jobManifestService.findJobManifest(executionToStart.jobName, executionToStart.version); } }