/** @file Batch run module. */ import { randomUUID } from "node:crypto"; import { DEFAULT_CONCURRENCY } from "../defaults.ts"; import { isAbortError } from "../http/abort.ts"; import { hasStructuredError } from "../http/retry.ts"; import { ensureHttpClientDeps } from "../http/scrape-deps.ts"; import { scrapeUrl, type ScrapePipelineDeps, type ScrapeResult } from "../scrape/pipeline.ts"; import { resultChars } from "../scrape/result-chars.ts"; import { normalizeMaybe } from "../storage/db/row-fields.ts"; import { appendJobError, structuredErrorToJobError, unknownToJobError, } from "../storage/jobs/errors.ts"; import { setupScrapeJob } from "../storage/jobs/setup.ts"; import { storeResponse, type StoreResponseOptions } from "../storage/responses/store.ts"; import { truncateAndStore } from "../storage/responses/truncate.ts"; import type { CommonScrapeOptions, StructuredError } from "../types.ts"; export interface BatchProgress { state: "queued" | "processing" | "done" | "error"; current: number; total: number; url?: string; } export interface BatchScrapeOptions extends CommonScrapeOptions, StoreResponseOptions { concurrency?: number; perHostConcurrency?: number; storeFullResults?: boolean; onProgress?: (progress: BatchProgress) => void; } export interface BatchItemSuccess { ok: true; index: number; url: string; result: ScrapeResult; } export interface BatchItemFailure { ok: false; index: number; url: string; error: StructuredError; } export type BatchItemResult = BatchItemSuccess | BatchItemFailure; export interface BatchScrapeResult { items: BatchItemResult[]; responseId?: string; fullOutputPath?: string; jobId: string; jobManifestPath?: string; truncated: boolean; summary: string; } export async function runBatchScrape( urls: readonly string[], options: BatchScrapeOptions = {}, deps: ScrapePipelineDeps = {}, signal?: AbortSignal, ): Promise { const items: BatchItemResult[] = Array.from({ length: urls.length }); const jobId = randomUUID(); const jobSetup = await setupScrapeJob( { jobId, jobType: "batch", params: { urls, ...options }, mode: options.mode, format: options.format, }, options, ); let { jobManifestPath, errors, totalBytes, totalChars, truncatedPages } = jobSetup; const jobWriter = jobSetup.writer; const cache = new Map< string, Promise<{ ok: true; result: ScrapeResult } | { ok: false; error: StructuredError }> >(); let next = 0; const concurrency = Math.max( 1, Math.min(options.concurrency ?? DEFAULT_CONCURRENCY.global, urls.length || 1), ); const sharedDeps = ensureHttpClientDeps(deps, { concurrency, perHostConcurrency: options.perHostConcurrency, retryAttempts: options.retryAttempts, }); options.onProgress?.({ state: "queued", current: 0, total: urls.length }); await updateBatchJob("running", 0, 0, undefined, true); async function worker(): Promise { while (next < urls.length) { if (signal?.aborted) throw signal.reason ?? new DOMException("Batch aborted", "AbortError"); const index = next++; const url = urls[index]; options.onProgress?.({ state: "processing", current: index, total: urls.length, url, }); const item = await scrapeCached(url); items[index] = item.ok ? { ok: true, index, url, result: item.result } : { ok: false, index, url, error: item.error }; if (item.ok) { totalBytes += item.result.downloadedBytes ?? 0; totalChars += resultChars(item.result); if (item.result.truncated) truncatedPages += 1; } else { errors = appendJobError(errors, structuredErrorToJobError(item.error)); } await updateBatchJob("running", items.filter(Boolean).length, errors.length); options.onProgress?.({ state: item.ok ? "done" : "error", current: index + 1, total: urls.length, url, }); } } function scrapeCached( url: string, ): Promise<{ ok: true; result: ScrapeResult } | { ok: false; error: StructuredError }> { const key = normalizeMaybe(url); const existing = cache.get(key); if (existing) return existing; const promise = scrapeItem(url); cache.set(key, promise); return promise; } async function scrapeItem( url: string, ): Promise<{ ok: true; result: ScrapeResult } | { ok: false; error: StructuredError }> { try { const result = await scrapeUrl(url, options, sharedDeps, signal); return result.error ? { ok: false, error: result.error } : { ok: true, result }; } catch (error) { return { ok: false, error: toStructuredError(error, url) }; } } try { await Promise.all(Array.from({ length: concurrency }, () => worker())); } catch (error) { errors = appendJobError(errors, unknownToJobError(error, "batch")); await updateBatchJob( isAbortError(error, signal) ? "paused" : "error", items.filter(Boolean).length, errors.length, undefined, true, ); throw error; } const completed = items.filter(Boolean); const summary = summarize(completed); if (options.storeFullResults === true) { const metadata = await storeResponse(completed, options); await updateBatchJob("done", completed.length, errors.length, [metadata.responseId], true); return { items: completed, responseId: metadata.responseId, fullOutputPath: metadata.fullOutputPath, jobId, jobManifestPath, truncated: false, summary, }; } const truncated = await truncateAndStore(summary, completed, options); await updateBatchJob( "done", completed.length, errors.length, truncated.metadata?.responseId ? [truncated.metadata.responseId] : undefined, true, ); return { items: completed, responseId: truncated.metadata?.responseId, fullOutputPath: truncated.metadata?.fullOutputPath, jobId, jobManifestPath, truncated: truncated.truncated, summary: truncated.text, }; async function updateBatchJob( status: "running" | "done" | "error" | "paused", urlsProcessed: number, urlsFailed: number, responseIds?: string[], force = false, ): Promise { const updated = await jobWriter.update( { status, startedAt: new Date().toISOString(), completedAt: status === "running" ? undefined : new Date().toISOString(), urlsProcessed, urlsFailed, errors, totalBytes, totalChars, truncatedPages, responseIds, }, { force }, ); if (updated) jobManifestPath = updated.path; } } function summarize(items: readonly BatchItemResult[]): string { const ok = items.filter((item) => item.ok).length; const failed = items.length - ok; return `Batch scrape complete: ${ok} succeeded, ${failed} failed, ${items.length} total.`; } function toStructuredError(error: unknown, url: string): StructuredError { if (hasStructuredError(error)) return error.structured; return { code: "BATCH_ITEM_FAILED", phase: "batch", message: error instanceof Error ? error.message : "Batch item failed", retryable: false, url, }; }