import { logger } from "@oh-my-pi/pi-utils"; const DELIVERY_RETRY_BASE_MS = 500; const DELIVERY_RETRY_MAX_MS = 30_000; const DELIVERY_RETRY_JITTER_MS = 200; const DEFAULT_RETENTION_MS = 5 * 60 * 1000; const DEFAULT_MAX_RUNNING_JOBS = 15; export interface AsyncJob { id: string; type: "bash" | "task"; status: "running" | "completed" | "failed" | "cancelled"; startTime: number; label: string; abortController: AbortController; promise: Promise; resultText?: string; errorText?: string; /** * Registry id of the agent that registered the job (e.g. "0-Main", * "3-AuthLoader"). Used by scoped cancel/list APIs so a subagent's teardown * does not cancel its parent's jobs. Undefined for callers that don't * supply an id (e.g. legacy tests, SDK consumers without an agent context). */ ownerId?: string; } export interface AsyncJobManagerOptions { onJobComplete: (jobId: string, text: string, job?: AsyncJob) => void | Promise; maxRunningJobs?: number; retentionMs?: number; } interface AsyncJobDelivery { jobId: string; text: string; attempt: number; nextAttemptAt: number; lastError?: string; ownerId?: string; promise?: Promise; } export interface AsyncJobDeliveryState { queued: number; delivering: boolean; nextRetryAt?: number; pendingJobIds: string[]; } export interface AsyncJobRegisterOptions { id?: string; /** Registry id of the agent that owns this job; used to scope cancelAll. */ ownerId?: string; onProgress?: (text: string, details?: Record) => void | Promise; } /** * Filter applied to job query/cancel APIs. With `ownerId`, results are * restricted to jobs registered by that agent (registry id from * `AgentRegistry`, e.g. "0-Main", "3-AuthLoader"). */ export interface AsyncJobFilter { ownerId?: string; } export class AsyncJobManager { static #instance: AsyncJobManager | undefined; /** Process-global instance shared by internal URL protocol handlers and tools. */ static instance(): AsyncJobManager | undefined { return AsyncJobManager.#instance; } /** Install or clear the process-global instance. */ static setInstance(value: AsyncJobManager | undefined): void { AsyncJobManager.#instance = value; } /** Reset the process-global instance. Test-only. */ static resetForTests(): void { AsyncJobManager.#instance = undefined; } readonly #jobs = new Map(); readonly #deliveries: AsyncJobDelivery[] = []; readonly #inFlightDeliveries: AsyncJobDelivery[] = []; readonly #suppressedDeliveries = new Set(); readonly #watchedJobs = new Set(); readonly #evictionTimers = new Map(); readonly #onJobComplete: AsyncJobManagerOptions["onJobComplete"]; readonly #maxRunningJobs: number; readonly #retentionMs: number; #deliveryLoop: Promise | undefined; #disposed = false; #filterJobs(jobs: Iterable, filter?: AsyncJobFilter): AsyncJob[] { const ownerId = filter?.ownerId; if (!ownerId) return Array.from(jobs); const out: AsyncJob[] = []; for (const job of jobs) { if (job.ownerId === ownerId) out.push(job); } return out; } constructor(options: AsyncJobManagerOptions) { this.#onJobComplete = options.onJobComplete; this.#maxRunningJobs = Math.max(1, Math.floor(options.maxRunningJobs ?? DEFAULT_MAX_RUNNING_JOBS)); this.#retentionMs = Math.max(0, Math.floor(options.retentionMs ?? DEFAULT_RETENTION_MS)); } register( type: "bash" | "task", label: string, run: (ctx: { jobId: string; signal: AbortSignal; reportProgress: (text: string, details?: Record) => Promise; }) => Promise, options?: AsyncJobRegisterOptions, ): string { if (this.#disposed) { throw new Error("Async job manager is disposed"); } const runningCount = this.getRunningJobs().length; if (runningCount >= this.#maxRunningJobs) { throw new Error( `Background job limit reached (${this.#maxRunningJobs}). Wait for running jobs to finish or cancel one.`, ); } const id = this.#resolveJobId(options?.id); this.#suppressedDeliveries.delete(id); const abortController = new AbortController(); const startTime = Date.now(); const job: AsyncJob = { id, type, status: "running", startTime, label, abortController, promise: Promise.resolve(), ownerId: options?.ownerId, }; const reportProgress = async (text: string, details?: Record): Promise => { if (!options?.onProgress) return; try { await options.onProgress(text, details); } catch (error) { logger.warn("Async job progress callback failed", { jobId: id, error: error instanceof Error ? error.message : String(error), }); } }; job.promise = (async () => { try { const text = await run({ jobId: id, signal: abortController.signal, reportProgress }); if (job.status === "cancelled") { job.resultText = text; this.#scheduleEviction(id); return; } job.status = "completed"; job.resultText = text; this.#enqueueDelivery(id, text); this.#scheduleEviction(id); } catch (error) { if (job.status === "cancelled") { job.errorText = error instanceof Error ? error.message : String(error); this.#scheduleEviction(id); return; } const errorText = error instanceof Error ? error.message : String(error); job.status = "failed"; job.errorText = errorText; this.#enqueueDelivery(id, errorText); this.#scheduleEviction(id); } })(); this.#jobs.set(id, job); return id; } /** * Cancel a single job by id. When `filter.ownerId` is set and does not * match the job's owner, the call is treated as not-found (returns false) * so cross-agent cancellation is rejected at the manager level. */ cancel(id: string, filter?: AsyncJobFilter): boolean { const job = this.#jobs.get(id); if (!job) return false; if (filter?.ownerId && job.ownerId !== filter.ownerId) return false; if (job.status !== "running") return false; job.status = "cancelled"; job.abortController.abort(); this.#scheduleEviction(id); return true; } getJob(id: string): AsyncJob | undefined { return this.#jobs.get(id); } getRunningJobs(filter?: AsyncJobFilter): AsyncJob[] { return this.#filterJobs(this.#jobs.values(), filter).filter(job => job.status === "running"); } getRecentJobs(limit = 10, filter?: AsyncJobFilter): AsyncJob[] { return this.#filterJobs(this.#jobs.values(), filter) .filter(job => job.status !== "running") .sort((a, b) => b.startTime - a.startTime) .slice(0, limit); } getAllJobs(filter?: AsyncJobFilter): AsyncJob[] { return this.#filterJobs(this.#jobs.values(), filter); } getDeliveryState(filter?: AsyncJobFilter): AsyncJobDeliveryState { const deliveries = this.#filterDeliveries(filter); const inFlightDeliveries = this.#filterInFlightDeliveries(filter); const nextRetryAt = deliveries.reduce((next, delivery) => { if (next === undefined) return delivery.nextAttemptAt; return Math.min(next, delivery.nextAttemptAt); }, undefined); return { queued: deliveries.length + inFlightDeliveries.length, delivering: inFlightDeliveries.length > 0 || (this.#deliveryLoop !== undefined && deliveries.length > 0), nextRetryAt, pendingJobIds: deliveries.concat(inFlightDeliveries).map(delivery => delivery.jobId), }; } hasPendingDeliveries(filter?: AsyncJobFilter): boolean { return this.getDeliveryState(filter).queued > 0; } watchJobs(jobIds: string[]): number { const uniqueJobIds = Array.from(new Set(jobIds.map(id => id.trim()).filter(id => id.length > 0))); for (const jobId of uniqueJobIds) { this.#watchedJobs.add(jobId); } return uniqueJobIds.length; } unwatchJobs(jobIds: string[]): number { const uniqueJobIds = Array.from(new Set(jobIds.map(id => id.trim()).filter(id => id.length > 0))); let removed = 0; for (const jobId of uniqueJobIds) { if (this.#watchedJobs.delete(jobId)) { removed += 1; } } return removed; } acknowledgeDeliveries(jobIds: string[]): number { const uniqueJobIds = Array.from(new Set(jobIds.map(id => id.trim()).filter(id => id.length > 0))); if (uniqueJobIds.length === 0) return 0; for (const jobId of uniqueJobIds) { this.#suppressedDeliveries.add(jobId); } const before = this.#deliveries.length; this.#deliveries.splice( 0, this.#deliveries.length, ...this.#deliveries.filter(delivery => !this.isDeliverySuppressed(delivery.jobId)), ); return before - this.#deliveries.length; } /** * Cancel running jobs. With `filter.ownerId` set, cancels only jobs the * matching agent registered; with no filter, cancels every running job * (used by `dispose()` to nuke the manager's state). */ cancelAll(filter?: AsyncJobFilter): void { for (const job of this.getRunningJobs(filter)) { job.status = "cancelled"; job.abortController.abort(); this.#scheduleEviction(job.id); } } async waitForAll(): Promise { await Promise.all(Array.from(this.#jobs.values()).map(job => job.promise)); } async drainDeliveries(options?: { timeoutMs?: number; filter?: AsyncJobFilter }): Promise { const timeoutMs = options?.timeoutMs; const filter = options?.filter; const hasDeadline = timeoutMs !== undefined; const deadline = hasDeadline ? Date.now() + Math.max(timeoutMs, 0) : Number.POSITIVE_INFINITY; while (this.hasPendingDeliveries(filter)) { if (filter?.ownerId) { const delivered = await this.#deliverNextFiltered(filter, deadline); if (delivered) continue; return false; } const inFlightDeliveries = this.#filterInFlightDeliveries(); if (inFlightDeliveries.length > 0 && this.#filterDeliveries().length === 0) { const delivered = await this.#waitForDeliveryPromise(inFlightDeliveries[0]?.promise, deadline); if (delivered) continue; return false; } this.#ensureDeliveryLoop(); const loop = this.#deliveryLoop; if (!loop) { continue; } if (!hasDeadline) { await loop; continue; } const remainingMs = deadline - Date.now(); if (remainingMs <= 0) { return false; } await Promise.race([loop, Bun.sleep(remainingMs)]); if (Date.now() >= deadline && this.hasPendingDeliveries(filter)) { return false; } } return true; } async dispose(options?: { timeoutMs?: number }): Promise { this.#disposed = true; this.#clearEvictionTimers(); this.cancelAll(); await this.waitForAll(); const drained = await this.drainDeliveries({ timeoutMs: options?.timeoutMs ?? 3_000 }); this.#clearEvictionTimers(); this.#jobs.clear(); this.#deliveries.length = 0; this.#inFlightDeliveries.length = 0; this.#suppressedDeliveries.clear(); this.#watchedJobs.clear(); return drained; } #resolveJobId(preferredId?: string): string { preferredId = preferredId?.trim(); if (!preferredId) { let candidate = 1; while (true) { const id = `bg_${candidate}`; if (!this.#jobs.has(id)) { return id; } candidate += 1; } } const base = preferredId.trim(); if (!this.#jobs.has(base)) return base; let suffix = 2; let candidate = `${base}-${suffix}`; while (this.#jobs.has(candidate)) { suffix += 1; candidate = `${base}-${suffix}`; } return candidate; } #scheduleEviction(jobId: string): void { if (this.#retentionMs <= 0) { this.#jobs.delete(jobId); this.#suppressedDeliveries.delete(jobId); this.#watchedJobs.delete(jobId); return; } const existing = this.#evictionTimers.get(jobId); if (existing) { clearTimeout(existing); } const timer = setTimeout(() => { this.#evictionTimers.delete(jobId); this.#jobs.delete(jobId); this.#suppressedDeliveries.delete(jobId); this.#watchedJobs.delete(jobId); }, this.#retentionMs); timer.unref(); this.#evictionTimers.set(jobId, timer); } #clearEvictionTimers(): void { for (const timer of this.#evictionTimers.values()) { clearTimeout(timer); } this.#evictionTimers.clear(); } #filterDeliveries(filter?: AsyncJobFilter): AsyncJobDelivery[] { const ownerId = filter?.ownerId; if (!ownerId) return this.#deliveries.filter(delivery => !this.isDeliverySuppressed(delivery.jobId)); return this.#deliveries.filter( delivery => delivery.ownerId === ownerId && !this.isDeliverySuppressed(delivery.jobId), ); } #filterInFlightDeliveries(filter?: AsyncJobFilter): AsyncJobDelivery[] { const ownerId = filter?.ownerId; if (!ownerId) return this.#inFlightDeliveries.filter(delivery => !this.isDeliverySuppressed(delivery.jobId)); return this.#inFlightDeliveries.filter( delivery => delivery.ownerId === ownerId && !this.isDeliverySuppressed(delivery.jobId), ); } async #deliverNextFiltered(filter: AsyncJobFilter, deadline: number): Promise { while (true) { let selected: AsyncJobDelivery | undefined; for (const delivery of this.#deliveries) { if (delivery.ownerId !== filter.ownerId) continue; if (this.isDeliverySuppressed(delivery.jobId)) continue; if (!selected || delivery.nextAttemptAt < selected.nextAttemptAt) { selected = delivery; } } if (!selected) { const inFlight = this.#filterInFlightDeliveries(filter); if (inFlight.length === 0) return true; return this.#waitForDeliveryPromise(inFlight[0]?.promise, deadline); } const now = Date.now(); if (selected.nextAttemptAt > now) { if (selected.nextAttemptAt > deadline) return false; await Bun.sleep(selected.nextAttemptAt - now); continue; } const index = this.#deliveries.indexOf(selected); if (index === -1) continue; this.#deliveries.splice(index, 1); if (this.isDeliverySuppressed(selected.jobId)) continue; return this.#waitForDeliveryPromise(this.#deliverDelivery(selected), deadline); } } isDeliverySuppressed(jobId: string): boolean { return this.#suppressedDeliveries.has(jobId) || this.#watchedJobs.has(jobId); } #enqueueDelivery(jobId: string, text: string): void { // Skip delivery if already acknowledged if (this.isDeliverySuppressed(jobId)) { return; } this.#deliveries.push({ jobId, text, attempt: 0, nextAttemptAt: Date.now(), ownerId: this.#jobs.get(jobId)?.ownerId, }); this.#ensureDeliveryLoop(); } #ensureDeliveryLoop(): void { if (this.#deliveryLoop) { return; } this.#deliveryLoop = this.#runDeliveryLoop() .catch(error => { logger.error("Async job delivery loop crashed", { error: String(error) }); }) .finally(() => { this.#deliveryLoop = undefined; if (this.#deliveries.length > 0) { this.#ensureDeliveryLoop(); } }); } async #runDeliveryLoop(): Promise { while (this.#deliveries.length > 0) { const delivery = this.#deliveries[0]; if (this.isDeliverySuppressed(delivery.jobId)) { this.#deliveries.shift(); continue; } const waitMs = delivery.nextAttemptAt - Date.now(); if (waitMs > 0) { await Bun.sleep(waitMs); } if (this.#deliveries[0] !== delivery) { continue; } if (this.isDeliverySuppressed(delivery.jobId)) { this.#deliveries.shift(); continue; } this.#deliveries.shift(); await this.#deliverDelivery(delivery); } } #deliverDelivery(delivery: AsyncJobDelivery): Promise { const promise = (async () => { this.#inFlightDeliveries.push(delivery); try { await this.#onJobComplete(delivery.jobId, delivery.text, this.#jobs.get(delivery.jobId)); } catch (error) { delivery.attempt += 1; delivery.lastError = error instanceof Error ? error.message : String(error); delivery.nextAttemptAt = Date.now() + this.#getRetryDelay(delivery.attempt); if (!this.isDeliverySuppressed(delivery.jobId)) { this.#deliveries.push(delivery); } logger.warn("Async job completion delivery failed", { jobId: delivery.jobId, attempt: delivery.attempt, nextRetryAt: delivery.nextAttemptAt, error: delivery.lastError, }); } finally { const index = this.#inFlightDeliveries.indexOf(delivery); if (index !== -1) this.#inFlightDeliveries.splice(index, 1); if (this.#deliveries.length > 0) this.#ensureDeliveryLoop(); } })(); delivery.promise = promise; return promise; } async #waitForDeliveryPromise(promise: Promise | undefined, deadline: number): Promise { if (!promise) return true; if (deadline === Number.POSITIVE_INFINITY) { await promise; return true; } const remainingMs = deadline - Date.now(); if (remainingMs <= 0) return false; let timedOut = false; await Promise.race([ promise, Bun.sleep(remainingMs).then(() => { timedOut = true; }), ]); return !timedOut; } #getRetryDelay(attempt: number): number { const exp = Math.min(Math.max(attempt - 1, 0), 8); const backoffMs = DELIVERY_RETRY_BASE_MS * 2 ** exp; const jitterMs = Math.floor(Math.random() * DELIVERY_RETRY_JITTER_MS); return Math.min(DELIVERY_RETRY_MAX_MS, backoffMs + jitterMs); } }