import { stat } from "node:fs/promises"; import type { FileCreateParams } from "@mixedbread/sdk/resources/stores"; import chalk from "chalk"; import { Command } from "commander"; import { glob } from "glob"; import { z } from "zod"; import { createClient } from "../../utils/client"; import { loadConfig } from "../../utils/config"; import { warnContextualizationDeprecated } from "../../utils/deprecation"; import { addGlobalOptions, extendGlobalOptions, type GlobalOptions, parseOptions, } from "../../utils/global-options"; import { log, spinner } from "../../utils/logger"; import { uploadFromManifest } from "../../utils/manifest"; import { validateMetadata } from "../../utils/metadata"; import { formatBytes, formatCountWithSuffix } from "../../utils/output"; import { checkExistingFiles, resolveStore } from "../../utils/store"; import { type FileToUpload, type MultipartUploadOptions, uploadFilesInBatch, } from "../../utils/upload"; const UploadStoreSchema = extendGlobalOptions({ nameOrId: z.string().min(1, { error: '"name-or-id" is required' }), patterns: z.array(z.string()).optional(), strategy: z .enum(["fast", "high_quality"], { error: '"strategy" must be either "fast" or "high_quality"', }) .optional(), contextualization: z .boolean({ error: '"contextualization" must be a boolean' }) .optional(), metadata: z.string().optional(), dryRun: z.boolean().optional(), parallel: z.coerce .number({ error: '"parallel" must be a number' }) .int({ error: '"parallel" must be an integer' }) .min(1, { error: '"parallel" must be at least 1' }) .max(200, { error: '"parallel" must be less than or equal to 200' }) .optional(), unique: z.boolean().optional(), manifest: z.string().optional(), multipartThreshold: z.coerce .number({ error: '"multipart-threshold" must be a number' }) .min(5, { error: '"multipart-threshold" must be at least 5 MB' }) .optional(), multipartPartSize: z.coerce .number({ error: '"multipart-part-size" must be a number' }) .min(5, { error: '"multipart-part-size" must be at least 5 MB' }) .optional(), multipartConcurrency: z.coerce .number({ error: '"multipart-concurrency" must be a number' }) .int({ error: '"multipart-concurrency" must be an integer' }) .min(1, { error: '"multipart-concurrency" must be at least 1' }) .optional(), }); export interface UploadOptions extends GlobalOptions { strategy?: FileCreateParams.Config["parsing_strategy"]; contextualization?: boolean; metadata?: string; dryRun?: boolean; parallel?: number; unique?: boolean; manifest?: string; multipartThreshold?: number; multipartPartSize?: number; multipartConcurrency?: number; } export function createUploadCommand(): Command { const command = addGlobalOptions( new Command("upload") .description("Upload files to a store") .argument("", "Name or ID of the store") .argument( "[patterns...]", 'File patterns to upload (e.g., "*.md", "docs/**/*.pdf")' ) .option("--strategy ", "Processing strategy") .option( "--contextualization", "Deprecated (ignored): contextualization is now configured at the store level" ) .option("--metadata ", "Additional metadata as JSON string") .option("--dry-run", "Preview what would be uploaded", false) .option("--parallel ", "Number of concurrent uploads (1-200)") .option( "--unique", "Update existing files instead of creating duplicates", false ) .option("--manifest ", "Upload using manifest file") .option( "--multipart-threshold ", "File size threshold in MB to trigger multipart upload" ) .option( "--multipart-part-size ", "Size of each part in MB for multipart upload" ) .option( "--multipart-concurrency ", "Number of concurrent part uploads for multipart upload" ) ); command.action(async (nameOrId: string, patterns: string[]) => { let activeSpinner: ReturnType | null = null; try { const mergedOptions = command.optsWithGlobals(); const parsedOptions = parseOptions(UploadStoreSchema, { ...mergedOptions, nameOrId, patterns, }); if (parsedOptions.contextualization) { warnContextualizationDeprecated("store upload"); } const client = createClient(parsedOptions); activeSpinner = spinner(); activeSpinner.start("Initializing upload..."); const store = await resolveStore(client, parsedOptions.nameOrId); const config = loadConfig(); activeSpinner.stop("Upload initialized"); activeSpinner = null; const MB = 1024 * 1024; const multipartUpload: MultipartUploadOptions = { ...(parsedOptions.multipartThreshold != null && { threshold: parsedOptions.multipartThreshold * MB, }), ...(parsedOptions.multipartPartSize != null && { partSize: parsedOptions.multipartPartSize * MB, }), ...(parsedOptions.multipartConcurrency != null && { concurrency: parsedOptions.multipartConcurrency, }), }; // Handle manifest file upload if (parsedOptions.manifest) { return await uploadFromManifest( client, store.id, parsedOptions.manifest, parsedOptions, multipartUpload ); } if (!parsedOptions.patterns || parsedOptions.patterns.length === 0) { log.error( "No file patterns provided. Use --manifest for manifest-based uploads." ); process.exit(1); } // Get configuration values with precedence: command-line > config defaults > built-in defaults const strategy = parsedOptions.strategy ?? config.defaults?.upload?.strategy ?? "fast"; const parallel = parsedOptions.parallel ?? config.defaults?.upload?.parallel ?? 100; const metadata = validateMetadata(parsedOptions.metadata); // Collect all files matching patterns const files: string[] = []; for (const pattern of parsedOptions.patterns) { const matches = await glob(pattern, { nodir: true, absolute: false, }); files.push(...matches); } // Remove duplicates const uniqueFiles = [...new Set(files)]; if (uniqueFiles.length === 0) { log.warn("No files found matching the patterns."); return; } let totalSize = 0; for (const file of uniqueFiles) { try { totalSize += (await stat(file)).size; } catch { // File may not exist } } console.log( `Found ${formatCountWithSuffix(uniqueFiles.length, "file")} matching the ${ patterns.length > 1 ? "patterns" : "pattern" } (${formatBytes(totalSize)})` ); if (parsedOptions.dryRun) { console.log(chalk.blue("Dry run - files that would be uploaded:")); for (const file of uniqueFiles) { try { const stats = await stat(file); console.log(` \n${file} (${formatBytes(stats.size)})`); console.log(` Strategy: ${strategy}`); if (metadata && Object.keys(metadata).length > 0) { console.log(` Metadata: ${JSON.stringify(metadata)}`); } } catch (_error) { console.log(` ${file} (${chalk.red("✗ File not found")})`); } } return; } // Handle --unique flag: check for existing files let existingFiles: Map = new Map(); if (parsedOptions.unique) { activeSpinner = spinner(); activeSpinner.start("Checking for existing files..."); try { existingFiles = await checkExistingFiles( client, store.id, uniqueFiles ); activeSpinner.stop( `Found ${formatCountWithSuffix(existingFiles.size, "existing file")}` ); activeSpinner = null; } catch (error) { activeSpinner.stop(); activeSpinner = null; log.error("Failed to check existing files"); throw error; } } // Transform files to shared format const filesToUpload: FileToUpload[] = uniqueFiles.map((filePath) => ({ path: filePath, strategy, metadata, })); // Upload files with progress tracking await uploadFilesInBatch(client, store.id, filesToUpload, { unique: parsedOptions.unique || false, existingFiles, parallel, multipartUpload, }); } catch (error) { activeSpinner?.stop(); if (error instanceof Error) { log.error(error.message); } else { log.error("Failed to upload files"); } process.exit(1); } }); return command; }