// SPDX-License-Identifier: MIT /** * BlobService client for artifact storage operations. */ import type * as grpc from '@grpc/grpc-js'; import type { BlobServiceClient, BlobUploadResponse, BlobDownloadChunk, BlobDeleteResponse, ArtifactInfo, } from '../generated/neumann.js'; import { ConnectionError, NotFoundError, InternalError, InvalidArgumentError, } from '../types/errors.js'; import type { NeumannError } from '../types/errors.js'; /** * Options for blob upload. */ export interface BlobUploadOptions { /** Content type (MIME type). */ contentType?: string; /** Creator identifier. */ createdBy?: string; /** Tags for categorization. */ tags?: string[]; /** IDs of linked artifacts. */ linkedTo?: string[]; /** Custom metadata. */ custom?: Record; } /** * Result of a blob upload operation. */ export interface BlobUploadResult { /** Generated artifact ID. */ artifactId: string; /** Total size in bytes. */ size: number; /** SHA-256 checksum. */ checksum: string; } /** * Artifact metadata. */ export interface ArtifactMetadata { id: string; filename: string; contentType: string; size: number; checksum: string; chunkCount: number; created: Date; modified: Date; createdBy: string; tags: string[]; linkedTo: string[]; custom: Record; } /** * Build upload metadata with proper typing for exactOptionalPropertyTypes. */ function buildUploadMetadata( filename: string, options: BlobUploadOptions ): { filename: string; contentType?: string; createdBy?: string; tags: string[]; linkedTo: string[]; custom: Record; } { const result: { filename: string; contentType?: string; createdBy?: string; tags: string[]; linkedTo: string[]; custom: Record; } = { filename, tags: options.tags ?? [], linkedTo: options.linkedTo ?? [], custom: options.custom ?? {}, }; if (options.contentType !== undefined) { result.contentType = options.contentType; } if (options.createdBy !== undefined) { result.createdBy = options.createdBy; } return result; } /** * Service client for blob/artifact operations. */ export class BlobClient { private client: BlobServiceClient; private metadata: grpc.Metadata; constructor(client: BlobServiceClient, metadata: grpc.Metadata) { this.client = client; this.metadata = metadata; } /** * Upload a blob from a buffer. * * @param filename - The filename for the artifact. * @param data - The blob data as a Buffer or Uint8Array. * @param options - Upload options. * @returns Upload result with artifact ID. */ async uploadBlob( filename: string, data: Buffer | Uint8Array, options: BlobUploadOptions = {} ): Promise { return new Promise((resolve, reject) => { const call = this.client.Upload( this.metadata, (err: grpc.ServiceError | null, response: BlobUploadResponse) => { if (err) { reject(this.handleError(err)); return; } resolve({ artifactId: response.artifactId, size: response.size, checksum: response.checksum, }); } ); // Send metadata first call.write({ metadata: buildUploadMetadata(filename, options), }); // Send data in chunks (64KB chunks) const CHUNK_SIZE = 64 * 1024; const buffer = Buffer.from(data); for (let offset = 0; offset < buffer.length; offset += CHUNK_SIZE) { const chunk = buffer.subarray(offset, offset + CHUNK_SIZE); call.write({ chunk }); } call.end(); }); } /** * Upload a blob from an async iterable (streaming). * * @param filename - The filename for the artifact. * @param chunks - Async iterable of data chunks. * @param options - Upload options. * @returns Upload result with artifact ID. */ async uploadBlobStreaming( filename: string, chunks: AsyncIterable, options: BlobUploadOptions = {} ): Promise { return new Promise((resolve, reject) => { const call = this.client.Upload( this.metadata, (err: grpc.ServiceError | null, response: BlobUploadResponse) => { if (err) { reject(this.handleError(err)); return; } resolve({ artifactId: response.artifactId, size: response.size, checksum: response.checksum, }); } ); // Send metadata first call.write({ metadata: buildUploadMetadata(filename, options), }); // Stream chunks with proper error handling const streamChunks = async (): Promise => { try { for await (const chunk of chunks) { call.write({ chunk }); } call.end(); } catch (err) { call.destroy(err instanceof Error ? err : new Error(String(err))); } }; streamChunks().catch((err: unknown) => reject(this.handleError(err as grpc.ServiceError))); }); } /** * Download a blob as an async iterable of chunks. * Automatically cancels the stream on early break or error. * * @param artifactId - The artifact ID to download. * @returns Async iterable of data chunks. */ async *downloadBlob(artifactId: string): AsyncIterable { const stream = this.client.Download({ artifactId }, this.metadata) as AsyncIterable< BlobDownloadChunk > & { cancel?: () => void }; try { for await (const chunk of stream) { if (chunk.data && chunk.data.length > 0) { yield chunk.data; } if (chunk.isFinal) { break; } } } finally { if (typeof stream.cancel === 'function') { try { stream.cancel(); } catch { // Ignore cancel errors } } } } /** * Download a blob as a complete buffer. * * @param artifactId - The artifact ID to download. * @returns The complete blob data. */ async downloadBlobFull(artifactId: string): Promise { const chunks: Uint8Array[] = []; for await (const chunk of this.downloadBlob(artifactId)) { chunks.push(chunk); } return Buffer.concat(chunks); } /** * Delete a blob. * * @param artifactId - The artifact ID to delete. * @returns True if deletion was successful. */ async deleteBlob(artifactId: string): Promise { return new Promise((resolve, reject) => { this.client.Delete( { artifactId }, this.metadata, (err: grpc.ServiceError | null, response: BlobDeleteResponse) => { if (err) { reject(this.handleError(err)); return; } resolve(response.success); } ); }); } /** * Get blob metadata. * * @param artifactId - The artifact ID. * @returns Artifact metadata. */ async getBlobMetadata(artifactId: string): Promise { return new Promise((resolve, reject) => { this.client.GetMetadata( { artifactId }, this.metadata, (err: grpc.ServiceError | null, response: ArtifactInfo) => { if (err) { reject(this.handleError(err)); return; } resolve({ id: response.id, filename: response.filename, contentType: response.contentType, size: response.size, checksum: response.checksum, chunkCount: response.chunkCount, created: new Date(response.created), modified: new Date(response.modified), createdBy: response.createdBy, tags: response.tags, linkedTo: response.linkedTo, custom: response.custom ?? {}, }); } ); }); } private handleError(err: grpc.ServiceError): NeumannError { const code = err.code as number; const NOT_FOUND = 5; const INVALID_ARGUMENT = 3; const UNAVAILABLE = 14; if (code === NOT_FOUND) { return new NotFoundError(err.details || 'Artifact not found'); } if (code === INVALID_ARGUMENT) { return new InvalidArgumentError(err.details || 'Invalid argument'); } if (code === UNAVAILABLE) { return new ConnectionError(err.details || 'Service unavailable'); } return new InternalError(err.details || err.message || 'Internal error'); } }