import { PieceMetadataModel } from '@tonyshark/framework' import { ApQueueJob, exceptionHandler, GetRunForWorkerRequest, PollJobRequest, QueueName, ResumeRunRequest, SavePayloadRequest, SendEngineUpdateRequest, SubmitPayloadsRequest, UpdateFailureCountRequest, UpdateJobRequest } from '@tonyshark/server-shared' import { ActivepiecesError, ErrorCode, FlowRun, GetFlowVersionForWorkerRequest, GetPieceRequestQuery, PopulatedFlow, RemoveStableJobEngineRequest, UpdateRunProgressRequest, WorkerMachineHealthcheckRequest, WorkerMachineHealthcheckResponse } from '@tonyshark/shared' import { FastifyBaseLogger } from 'fastify' import { StatusCodes } from 'http-status-codes' import pLimit from 'p-limit' import { workerMachine } from '../utils/machine' import { ApAxiosClient } from './ap-axios' const removeTrailingSlash = (url: string): string => { return url.endsWith('/') ? url.slice(0, -1) : url } export const workerApiService = (workerToken: string) => { const apiUrl = removeTrailingSlash(workerMachine.getInternalApiUrl()) const client = new ApAxiosClient(apiUrl, workerToken) return { async heartbeat(): Promise { const request: WorkerMachineHealthcheckRequest = await workerMachine.getSystemInfo() try { return await client.post('/v1/worker-machines/heartbeat', request) } catch (error) { if (ApAxiosClient.isApAxiosError(error) && error.error.code === 'ECONNREFUSED') { return null } throw error } }, async poll(queueName: QueueName): Promise { try { const request: PollJobRequest = { queueName, } const response = await client.get('/v1/workers/poll', { params: request, }) return response } catch (error) { await new Promise((resolve) => setTimeout(resolve, 2000)) return null } }, async resumeRun(request: ResumeRunRequest): Promise { await client.post('/v1/workers/resume-run', request) }, async savePayloadsAsSampleData(request: SavePayloadRequest): Promise { await client.post('/v1/workers/save-payloads', request) }, async startRuns(request: SubmitPayloadsRequest): Promise { const arrayOfPayloads = splitPayloadsIntoOneMegabyteBatches(request.payloads) const limit = pLimit(1) const promises = arrayOfPayloads.map(payloads => limit(() => client.post('/v1/workers/submit-payloads', { ...request, payloads, })), ) const results = await Promise.allSettled(promises) const errors = results.filter((r): r is PromiseRejectedResult => r.status === 'rejected') if (errors.length > 0) { const errorMessages = errors.map(e => e.reason.message).join(', ') throw new Error(`Failed to start runs: ${errorMessages}`) } return results .filter((r): r is PromiseFulfilledResult => r.status === 'fulfilled') .map(r => r.value) .flat() }, async sendUpdate(request: SendEngineUpdateRequest): Promise { await client.post('/v1/workers/send-engine-update', request) }, } } function splitPayloadsIntoOneMegabyteBatches(payloads: unknown[]): unknown[][] { const batches: unknown[][] = [[]] const ONE_MB = 1024 * 1024 let currentSize = 0 for (const payload of payloads) { const payloadSize = Buffer.byteLength(JSON.stringify(payload)) currentSize += payloadSize if (currentSize > ONE_MB) { batches.push([]) currentSize = payloadSize } batches[batches.length - 1].push(payload) } return batches } export const engineApiService = (engineToken: string, log: FastifyBaseLogger) => { const apiUrl = removeTrailingSlash(workerMachine.getInternalApiUrl()) const client = new ApAxiosClient(apiUrl, engineToken) return { async getFile(fileId: string): Promise { return client.get(`/v1/engine/files/${fileId}`, { responseType: 'arraybuffer', }) }, async updateJobStatus(request: UpdateJobRequest): Promise { await client.post('/v1/engine/update-job', request) }, async updateFailureCount(request: UpdateFailureCountRequest): Promise { await client.post('/v1/engine/update-failure-count', request) }, async getRun(request: GetRunForWorkerRequest): Promise { return client.get('/v1/engine/runs/' + request.runId, {}) }, async updateRunStatus(request: UpdateRunProgressRequest): Promise { await client.post('/v1/engine/update-run', request) }, async removeStaleFlow(request: RemoveStableJobEngineRequest): Promise { await client.post('/v1/engine/remove-stale-job', request) }, async getPiece(name: string, options: GetPieceRequestQuery): Promise { return client.get(`/v1/pieces/${encodeURIComponent(name)}`, { params: options, }) }, async checkTaskLimit(): Promise { try { await client.get('/v1/engine/check-task-limit', {}) } catch (e) { if (ApAxiosClient.isApAxiosError(e) && e.error.response && e.error.response.status === StatusCodes.PAYMENT_REQUIRED) { throw new ActivepiecesError({ code: ErrorCode.QUOTA_EXCEEDED, params: { metric: 'tasks', }, }) } exceptionHandler.handle(e, log) } }, async getFlowWithExactPieces(request: GetFlowVersionForWorkerRequest): Promise { const startTime = performance.now() log.debug({ request }, '[EngineApiService#getFlowWithExactPieces] start') //TODO: Add caching logic try { const flow = await client.get('/v1/engine/flows', { params: request, }) return flow } catch (e) { if (ApAxiosClient.isApAxiosError(e) && e.error.response && e.error.response.status === 404) { return null } throw e } finally { log.debug({ request, took: performance.now() - startTime }, '[EngineApiService#getFlowWithExactPieces] cache miss') } }, } }