import IUploadTask from './IUploadTask'; import { ApplyUploadRequest } from '../BackendServerModel'; import { UploadProgress, FileComplete, UploadResult, Multiparts } from '../Model'; import OSS from 'ali-oss'; import fs from 'fs'; import TaskStatus, { StatusCheckpoint } from '../TaskStatus'; import {S3} from "aws-sdk"; import {ListPartsRequest, ManagedUpload, PutObjectRequest} from "aws-sdk/clients/s3"; export default class FileUploadTask implements IUploadTask { objectKeyPrefix: string = 'other'; makeApplyUploadRequest(md5: string, objectKeyPartial: string, file: string, region: string, bucket: string, fileExt: string, serverType: number): ApplyUploadRequest { let fileSize = fs.statSync(file).size; let applyRequest: ApplyUploadRequest = { Md5: md5, CategoryId: 1, MediaType: 'other', FileName: objectKeyPartial, FileSize: fileSize, Region: region, BucketName: bucket, ServerType: serverType, MediaInfo: { FileExt: fileExt } }; return applyRequest; } makeResult(mediaId: string, fileSize: number, mark: string, uploadRet: { Url: string, Bucket: string, Key: string }[]): FileComplete { let fileComplete: FileComplete = { id: mediaId, url: uploadRet[0].Url, filesize: fileSize, mark: mark }; return fileComplete; } async makeUploadTask(client: S3 | OSS, bucket: string, objectKeyPartial: string, mediaId: string, file: string, fileExt: string, md5: string, statusCp: StatusCheckpoint | undefined, listener: (progress: UploadProgress) => void): Promise { //let fileExt = path.extname(file); let fileSize = fs.statSync(file).size; let objectKey = statusCp ? statusCp.object_key : `${objectKeyPartial}${mediaId}${fileExt}`; if (client instanceof S3) { //创建文件流 let fileStream = fs.createReadStream(file); let request: PutObjectRequest = { Bucket: bucket, ContentLength: fileSize, Key: objectKey, Tagging: 'public=1', Body: fileStream }; let options: ManagedUpload.ManagedUploadOptions = { leavePartsOnError: true }; let upload = client.upload(request, options); upload.on('httpUploadProgress', progress => { listener.call(listener, { Uploaded: progress.loaded, Total: progress.total }); }) let sendData = await upload.promise(); } else if (client instanceof OSS) { let parts = statusCp ? await this.getUploadedParts(client, bucket, objectKey, statusCp.upload_id) : []; const taskStatus = await TaskStatus.build(); let noRecord = true; //这是一个上传监听 let progress = (p: any, checkpoint: any) => { if(checkpoint) { if (noRecord) { // 状态记录 let status: StatusCheckpoint = { md5: md5, media_id: mediaId, file_size: checkpoint.fileSize, object_key: objectKey, part_size: checkpoint.partSize, upload_id: checkpoint.uploadId }; taskStatus.insert(status).then().catch(); noRecord = false; } } // 状态通知 listener.call(listener, { Uploaded: p * 10000, Total: 10000 }); }; let checkpoint; if (statusCp) { checkpoint = { //contentLength: fileSize file: file, fileSize: fileSize, name: objectKey, partSize: statusCp.part_size, uploadId: statusCp.upload_id, doneParts: parts }; } //这里发送流数据 let sendData = client.multipartUpload(objectKey, file, {checkpoint,async progress(percentage,cpt){ checkpoint = cpt; }, }); // 完成后删除状态记录 taskStatus?.delete(md5); } else { throw 'makeUploadTask client object is wrong.'; } console.log('upload success!'); return [{ Bucket: bucket, Key: objectKey, Url: `${IUploadTask.proxyUrl}/${objectKey}` }]; } /** * 获取已上传的分片 * @param client S3或oss对象 * @param bucket 桶 * @param objectKey key * @param uploadId 上传id */ async getUploadedParts(client: S3 | OSS, bucket: string, objectKey: string, uploadId: string) :Promise { if (client instanceof OSS) { let result = await client.listParts(objectKey, uploadId); let parts = result.parts; return parts.map(part => { return { number: parseInt(part.PartNumber + ''), etag: part.ETag, } }) } else if (client instanceof S3) { let request: ListPartsRequest = { Bucket: bucket, Key: objectKey, UploadId: uploadId }; return new Promise((resolve, reject) => { client.listParts(request, (err, data) => { if (err) reject(err); else { if (data.Parts) { let arr = data.Parts.map(part => { return { number: part.PartNumber ?? 0, etag: part.ETag ?? '', } }) resolve(arr); } else { resolve([]); } } }); }); } else { throw 'makeUploadTask client object is wrong.'; } } }