import { ApiRequest, ApiClientConfig, RequestStatus, logger, ApiError, } from "../utils"; import { RateLimiter } from "./RateLimiter"; const priorityComparator = (a: ApiRequest, b: ApiRequest) => b.priority - a.priority; export class RequestQueue { private pendingRequests: ApiRequest[] = []; private activeRequests: Set = new Set(); private concurrency: number; private rateLimiter: RateLimiter; private processRequestCallback: (request: ApiRequest) => Promise; private isProcessing: boolean = false; constructor( config: ApiClientConfig, rateLimiter: RateLimiter, processRequestCallback: (request: ApiRequest) => Promise ) { this.concurrency = config.queue.concurrency; this.rateLimiter = rateLimiter; this.processRequestCallback = processRequestCallback; logger.log( `RequestQueue initialized with concurrency: ${this.concurrency}` ); if (typeof window !== "undefined") { window.addEventListener("online", this.processQueue); } } /** * Adds a request to the queue. * @param request The ApiRequest object to enqueue. */ public enqueue(request: ApiRequest): void { request.status = "QUEUED"; this.pendingRequests.push(request); this.pendingRequests.sort(priorityComparator); logger.log( `Request ${request.id} added to queue. Priority: ${request.priority}. Pending: ${this.pendingRequests.length}` ); this.processQueue(); } public async processQueue(): Promise { if (this.isProcessing) { return; } this.isProcessing = true; while ( this.activeRequests.size < this.concurrency && this.pendingRequests.length > 0 ) { const delay = this.rateLimiter.checkLimitAndGetDelay(); if (delay > 0) { logger.warn( `Rate limit enforced. Pausing queue processing for ${delay}ms.` ); await new Promise((resolve) => setTimeout(resolve, delay)); continue; } const nextRequest = this.pendingRequests.shift() as ApiRequest; this.activeRequests.add(nextRequest.id); this.rateLimiter.recordRequestStart(); nextRequest.status = "PENDING"; logger.log( `Processing request ${nextRequest.id}. Active: ${this.activeRequests.size}/${this.concurrency}` ); this.processRequestCallback(nextRequest) .catch((error) => { logger.error( `Error processing request ${nextRequest.id} through callback:`, error ); }) .finally(() => { this.activeRequests.delete(nextRequest.id); logger.log( `Request ${nextRequest.id} finished execution. Active: ${this.activeRequests.size}` ); setTimeout(() => this.processQueue(), 0); }); } this.isProcessing = false; if (this.pendingRequests.length === 0 && this.activeRequests.size === 0) { logger.log("Request Queue is empty."); } } /** * Attempts to cancel a request by its ID. * @param requestId The ID of the request to cancel. * @returns {boolean} True if a pending request was found and canceled. */ public cancelRequest(requestId: string): boolean { const initialLength = this.pendingRequests.length; this.pendingRequests = this.pendingRequests.filter((request) => { if (request.id === requestId) { request.status = "CANCELED"; request.reject({ name: "RequestCanceledError", message: `Request ${requestId} was manually canceled from the queue.`, classification: "UNKNOWN", } as ApiError); logger.warn(`Request ${requestId} canceled from pending queue.`); return false; } return true; }); return initialLength !== this.pendingRequests.length; } }