import type { ValidateFunction } from 'ajv'; import { setTimeout } from 'node:timers/promises'; import type { AnyObject } from 'pinejs-client-core'; import { tasks as tasksEnv } from '../config-loader/env.js'; import type * as Db from '../database-layer/db.js'; import * as permissions from '../sbvr-api/permissions.js'; import { PinejsClient } from '../sbvr-api/sbvr-utils.js'; import { sbvrUtils } from '../server-glue/module.js'; import { ajv } from './common.js'; import type { Task } from './tasks.js'; import type TasksModel from './tasks.js'; import type { Resolvable } from '../sbvr-api/common-types.js'; interface TaskArgs { api: PinejsClient; params: T; } type TaskResponse = Resolvable<{ status: Task['Read']['status']; error?: string; }>; export interface TaskHandler< T = NonNullable, > { name: string; fn: (options: TaskArgs) => TaskResponse; validate?: ValidateFunction; } type PartialTask = Pick< Task['Read'], | 'id' | 'is_created_by__actor' | 'is_executed_by__handler' | 'is_executed_with__parameter_set' | 'is_scheduled_with__cron_expression' | 'attempt_count' | 'attempt_limit' >; // Map of column names with SBVR names used in SELECT queries const selectColumns = Object.entries({ id: 'id', 'is executed by-handler': 'is_executed_by__handler', 'is executed with-parameter set': 'is_executed_with__parameter_set', 'is scheduled with-cron expression': 'is_scheduled_with__cron_expression', 'attempt count': 'attempt_count', 'attempt limit': 'attempt_limit', 'is created by-actor': 'is_created_by__actor', } satisfies Record) .map(([key, value]) => `t."${key}" AS "${value}"`) .join(', '); // The worker is responsible for executing tasks in the queue. // It polls the database for tasks to execute. It will execute // tasks in parallel up to a certain concurrency limit. export class Worker { public handlers = new Map(); private currentConcurrency = 0; private running = false; constructor(private readonly client: PinejsClient) {} // Check if instance can execute more tasks private canExecute(): boolean { return this.running && this.currentConcurrency <= tasksEnv.queueConcurrency; } private async execute(task: PartialTask, tx: Db.Tx): Promise { try { // Get specified handler const handler = this.handlers.get(task.is_executed_by__handler); const startedOnTime = new Date(); // This should never actually happen if (handler == null) { await this.update( tx, task, startedOnTime, 'failed', 'Matching task handler not found, this should never happen!', ); return; } // Validate parameters before execution so we can fail early if // the parameter set is invalid. This can happen if the handler // definition changes after a task is added to the queue. if ( handler.validate != null && !handler.validate(task.is_executed_with__parameter_set) ) { await this.update( tx, task, startedOnTime, 'failed', `Invalid parameter set: ${ajv.errorsText(handler.validate.errors)}`, ); return; } // Execute handler and update task with results let status: Task['Read']['status'] = 'queued'; let error: string | undefined; try { const results = await handler.fn({ api: new PinejsClient({}), params: task.is_executed_with__parameter_set ?? {}, }); status = results.status; error = results.error; } finally { await this.update(tx, task, startedOnTime, status, error); } } catch (err) { // This shouldn't happen, but if it does we want to log and kill the process console.error( `Failed to execute task ${task.id} with handler ${task.is_executed_by__handler}:`, err, ); process.exit(1); } } // Update task and schedule next attempt if needed private async update( tx: Db.Tx, task: PartialTask, startedOnTime: Date, status: Task['Read']['status'], errorMessage?: string, ): Promise { const attemptCount = task.attempt_count + 1; const body: Partial = { started_on__time: startedOnTime, ended_on__time: new Date(), status, attempt_count: attemptCount, ...(errorMessage != null && { error_message: errorMessage }), }; // Re-enqueue if the task failed but has retries left, remember that // attemptCount includes the initial attempt while attempt_limit does not if (status === 'failed' && attemptCount < task.attempt_limit) { body.status = 'queued'; // Schedule next attempt using exponential backoff body.is_scheduled_to_execute_on__time = this.getNextAttemptTime(attemptCount); } // Patch current task await this.client.patch({ resource: 'task', passthrough: { tx, req: permissions.root, }, id: task.id, body, }); // Create new task with same configuration if previous // iteration completed and has a cron expression if ( body.status != null && ['failed', 'succeeded'].includes(body.status) && task.is_scheduled_with__cron_expression != null ) { await this.client.post({ resource: 'task', passthrough: { tx, req: permissions.root, }, options: { returnResource: false, }, body: { attempt_limit: task.attempt_limit, is_created_by__actor: task.is_created_by__actor, is_executed_by__handler: task.is_executed_by__handler, is_executed_with__parameter_set: task.is_executed_with__parameter_set, is_scheduled_with__cron_expression: task.is_scheduled_with__cron_expression, }, }); } } // Calculate next attempt time using exponential backoff private getNextAttemptTime(attempt: number): Date | null { const delayMS = Math.ceil(Math.exp(Math.min(10, attempt))) * 1000; return new Date(Date.now() + delayMS); } // Poll for tasks and execute them // This is recursive and is spawned once per concurrency limit private poll(): void { let executed = false; void (async () => { try { if (!this.canExecute()) { return; } if (this.handlers.size === 0) { // No handlers currently added so just wait for next poll in case one is added in the meantime return; } await sbvrUtils.db.transaction(async (tx) => { const result = await tx.executeSql( `SELECT ${selectColumns} FROM task AS t WHERE t."is executed by-handler" = ANY($1) AND t."status" = 'queued' AND t."attempt count" <= t."attempt limit" AND ( t."is scheduled to execute on-time" IS NULL OR t."is scheduled to execute on-time" <= CURRENT_TIMESTAMP + $2 * INTERVAL '1 SECOND' ) ORDER BY t."is scheduled to execute on-time" ASC, t."id" ASC LIMIT 1 FOR UPDATE SKIP LOCKED`, [ Array.from(this.handlers.keys()), Math.ceil(tasksEnv.queueIntervalMS / 1000), ], ); // Execute task if one was found if (result.rows.length > 0) { await this.execute(result.rows[0] as PartialTask, tx); executed = true; } }); } catch (err) { console.error('Failed polling for tasks:', err); // Add some delay after an error to avoid retrying at max speed // TODO: This should be configurable/adaptive await setTimeout(100); } finally { if (!executed) { await setTimeout(tasksEnv.queueIntervalMS); } if (this.canExecute()) { this.poll(); } else { console.info('Stopping task worker poller'); // If stopping, decrement concurrency count so on start it will be recreated as necessary this.currentConcurrency--; } } })(); } public stop(): void { this.running = false; } // Start polling for tasks public start(): void { // Tasks only support postgres for now if (sbvrUtils.db.engine !== 'postgres') { throw new Error( 'Database does not support tasks, giving up on starting worker', ); } this.running = true; // Spawn children to poll for and execute tasks, only spawning additional up to the limit in the case we already have some running for ( ; this.currentConcurrency < tasksEnv.queueConcurrency; this.currentConcurrency++ ) { console.info( `Spawning task worker poller ${this.currentConcurrency + 1} of ${tasksEnv.queueConcurrency}`, ); this.poll(); } } /** * This should be called after all handlers should have been registered in order to assert * that there are no tasks in the database with handlers that have not been registered and * will therefore never be executed as that could cause unexpected behavior. * * If any such tasks are found, an error is thrown. */ public async assertNoUnknownHandlers(): Promise { // Check for any pending tasks with unknown handlers const tasksWithUnknownHandlers = await this.client.get({ resource: 'task', passthrough: { req: permissions.root, }, options: { $filter: { status: 'queued', ...(this.handlers.size > 0 && { $not: { is_executed_by__handler: { $in: Array.from(this.handlers.keys()), }, }, }), }, }, }); if (tasksWithUnknownHandlers.length > 0) { throw new Error( `Found tasks with unknown handlers: ${tasksWithUnknownHandlers .map((task) => `${task.id}(${task.is_executed_by__handler})`) .join(', ')}`, ); } } }