import IUploadTask from './ICryptUploadTask'; import {ApplyUploadRequest} from '../BackendServerModel'; import {UploadProgress, FileComplete, UploadResult, Multiparts, CryptContentResult, AllFileContext} from '../Model'; import S3, { ListPartsRequest, ManagedUpload, MultipartUpload, PutObjectRequest } from 'aws-sdk/clients/s3'; import OSS from 'ali-oss'; import fs from 'fs'; import TaskStatus, { StatusCheckpoint } from '../TaskStatus'; import ICryptUploadTask from "./ICryptUploadTask"; import {EnctyptObject, enctyptSdk, getCiphertextDigest, getPathDigest} from "../Crypt/Encrypt"; import chackCache, {CheckBucket} from '../StreamCache/CheckCache'; import {GetMd5, GetMd5Buffer} from "../utils/ImageUtils"; import UGCPublish from "../index"; import {CryptUploadDispatcher} from "./CryptUploadDispatcher"; export default class CryptFileUploadTask implements ICryptUploadTask { objectKeyPrefix: string = 'other'; makeApplyUploadRequest( objectKeyPartial: string, file: string, region: string, bucket: string, fileExt: string, serverType: number): ApplyUploadRequest { let fileSize = fs.statSync(file).size; let applyRequest: ApplyUploadRequest = { Md5:undefined, CategoryId: 1, MediaType: 'other', FileName: objectKeyPartial, FileSize: fileSize, Region: region, BucketName: bucket, ServerType: serverType, MediaInfo: { FileExt: fileExt } }; return applyRequest; } makeResult(mediaId: string, fileSize: number, mark: string, uploadRet: { Url: string, Bucket: string, Key: string,cryptInfo?: CryptContentResult }[]): FileComplete { let fileComplete: FileComplete = { id: mediaId, url: uploadRet[0].Url, filesize: fileSize, mark: mark, cryptInfo: { digest:uploadRet[0].cryptInfo.digest, fileSize:uploadRet[0].cryptInfo.fileSize, key:uploadRet[0].cryptInfo.key, } }; //console.log("uploadRet",fileComplete); return fileComplete; } async makeUploadTask(client: S3 | OSS, bucket: string, objectKeyPartial: string, mediaId: string, file: string, fileExt: string, statusCp: CheckBucket | undefined,today:Date,_receive_proxy, _minor_receive_proxy, _bucket,allFileInfo:AllFileContext, listener: (progress: UploadProgress) => void): Promise { /** 查找图片秒速状态缓存 */ let result = []; result = await UGCPublish.processUtil.getFileSencodePassInfo(statusCp,allFileInfo); /** 获取到秒传数据则进行返回 */ if (result != null && result[0] != null) { if (result[0] != undefined) { return result; } } console.log("秒传查询失败"); /** 对象url key www.OSS.com/(obejctkey) */ let objectKey = await UGCPublish.processUtil.getFileKey(statusCp,allFileInfo); allFileInfo.objectKey = [objectKey]; /** 查询本地 续传查询 和 获取加密对象 */ let resultEnctyptInfo = await UGCPublish.processUtil.getEnctyptObject(allFileInfo,statusCp); /** 如果信息错误则直接返回错误编码*/ if (resultEnctyptInfo.exception != 0){ result = [({Error: resultEnctyptInfo.exception})]; return result; } /** 上传文件信息 */ let uploadFileInfo = await UGCPublish.processUtil.uploadFile(this, allFileInfo, resultEnctyptInfo.Complete,statusCp,listener); /** 如果信息错误则直接返回错误编码*/ if (uploadFileInfo[0].exception != 0) { result = [({Error: uploadFileInfo[0].exception})]; return result; } return uploadFileInfo[0].Complete; } /** * 拆分代码块 S3文件上传模块 * @param * @param * */ private async fileUploadS3Task(client: S3 ,bucket: string, objectKey: string, enctyptObject: EnctyptObject, fileSize: number,listener:(progress: UploadProgress) => void) { //创建文件流 let request: PutObjectRequest = { Bucket: bucket, ContentLength: Buffer.from(enctyptObject.ciphertext).length, Key: objectKey, Tagging: 'public=1', Body: Buffer.from(enctyptObject.ciphertext), }; let options: ManagedUpload.ManagedUploadOptions = { leavePartsOnError: true }; //console.log("开始S3上传数据"); let upload = client.upload(request, options); upload.on('httpUploadProgress', progress => { listener.call(listener, {Uploaded: progress.loaded, Total: progress.total}); }) let sendData = await upload.promise(); //console.log("上传S3成功"); const toHexString = bytes => bytes.reduce((str, byte) => str + byte.toString(16).padStart(2, '0'), ''); return [{ Bucket: bucket, Key: objectKey, Url: `${IUploadTask.proxyUrl}/${objectKey}`, cryptInfo:{ digest: toHexString(new Uint8Array(enctyptObject.digest)), //摘要对象 key: toHexString(new Uint8Array(enctyptObject.key)), fileSize:fileSize } }]; } /** * 拆分代码块 OSS文件上传模块 * @param * @param * */ private async fileUploadOSSTask(client: OSS ,digest:string,bucket: string, objectKey: string, enctyptObject: EnctyptObject ,file: string, fileSize: number,statusCp: CheckBucket | undefined, mediaId: string,listener:(progress: UploadProgress) => void){ let md5 = await GetMd5Buffer(Buffer.from(enctyptObject.ciphertext)); if (fileSize > 10 * 1024 * 1024){ /** console.log("开始大文件执行续传逻辑"); */ let parts = statusCp ? await this.getUploadedParts(client, bucket, objectKey, statusCp.upload_id) : []; let noRecord = true; /** 这是一个上传监听 */ let progress = (p: any, checkpoint: any) => { if(checkpoint) { if (noRecord) { // 状态记录 let status: StatusCheckpoint = { md5: md5, media_id: mediaId, file_size: checkpoint.fileSize, object_key: objectKey, part_size: checkpoint.partSize, upload_id: checkpoint.uploadId }; /** 加入续传信息 */ let today = new Date(); CryptUploadDispatcher._cacheManager.getCheckCache().then((obj) => { /** 保证信息正确到情况下 进行保存*/ if (enctyptObject != undefined && enctyptObject.key != undefined && enctyptObject.iv != undefined) { if (digest != undefined && digest.length > 31 && enctyptObject.key.byteLength == 64 && enctyptObject.iv.byteLength == 16) { console.log("执行插入语句",digest); obj.insert({ plain_sha: digest.toString(), md5: md5, media_id: mediaId, file_size: checkpoint.fileSize, object_key: objectKey, part_size: checkpoint.partSize, upload_id: checkpoint.uploadId, key_value: new Uint8Array(enctyptObject.key).toString(), iv_value: new Uint8Array(enctyptObject.iv).toString(), storage_time: (today.getTime() / (1000 * 60)).toString() }); } } }); //console.log("插入成功"); noRecord = false; } } // 状态通知 listener.call(listener, { Uploaded: p * 10000, Total: 10000 }); }; let checkpoint; if (statusCp) { checkpoint = { file: Buffer.from(enctyptObject.ciphertext), fileSize: statusCp.file_size, name: statusCp.object_key, partSize: statusCp.part_size, uploadId: statusCp.upload_id, doneParts: parts }; } /** 这里发送流数据 */ try { let sendData = await client.multipartUpload(objectKey, Buffer.from(enctyptObject.ciphertext), {progress,checkpoint}); }catch (e){ console.log(e); } }else { let noRecord = true; //console.log("objectkey",objectKey); //这是一个上传监听 let progress = (p: any, checkpoint: any) => { if(checkpoint) { if (noRecord) { // 状态记录 let status: StatusCheckpoint = { md5: md5, media_id: mediaId, file_size: checkpoint.fileSize, object_key: objectKey, part_size: checkpoint.partSize, upload_id: checkpoint.uploadId }; noRecord = false; } } // 状态通知 listener.call(listener, { Uploaded: p * 10000, Total: 10000 }); }; //这里发送流数据 try { let sendData = await client.multipartUpload(objectKey, Buffer.from(enctyptObject.ciphertext), {progress}); }catch (e){ console.log(e); } } const toHexString = bytes => bytes.reduce((str, byte) => str + byte.toString(16).padStart(2, '0'), ''); return [{ Bucket: bucket, Key: objectKey, Url: `${IUploadTask.proxyUrl}/${objectKey}`, cryptInfo:{ digest: toHexString(new Uint8Array(enctyptObject.digest)), //摘要对象 key: toHexString(new Uint8Array(enctyptObject.key)), fileSize: fileSize } }]; } /** * 获取已上传的分片 * @param client S3或oss对象 * @param bucket 桶 * @param objectKey key * @param uploadId 上传id */ async getUploadedParts(client: S3 | OSS, bucket: string, objectKey: string, uploadId: string) :Promise { if (client instanceof OSS) { let result = await client.listParts(objectKey, uploadId); let parts = result.parts; return parts.map(part => { return { number: parseInt(part.PartNumber + ''), etag: part.ETag, } }) } else if (client instanceof S3) { let request: ListPartsRequest = { Bucket: bucket, Key: objectKey, UploadId: uploadId }; return new Promise((resolve, reject) => { client.listParts(request, (err, data) => { if (err) reject(err); else { if (data.Parts) { let arr = data.Parts.map(part => { return { number: part.PartNumber ?? 0, etag: part.ETag ?? '', } }) resolve(arr); } else { resolve([]); } } }); }); } else { throw 'makeUploadTask client object is wrong.'; } } }