import IUploadTask from './IUploadTask'; import { ApplyUploadRequest } from '../BackendServerModel'; import { UploadProgress, VideoComplete, UploadResult } from '../Model'; import S3, { PutObjectRequest } from 'aws-sdk/clients/s3'; import OSS from 'ali-oss'; import fs from 'fs'; import { GetCoverImage } from '../utils/ImageUtils'; import TaskStatus, { StatusCheckpoint } from '../TaskStatus'; export default class VideoUploadTask implements IUploadTask { ffmpegPath?: string; duration?: number; width?: number; height?: number; objectKeyPrefix: string = 'short_video'; makeApplyUploadRequest(md5: string, objectKeyPartial: string, file: string, region: string, bucket: string, fileExt: string, serverType: number): ApplyUploadRequest { let fileSize = fs.statSync(file).size; let applyRequest: ApplyUploadRequest = { Md5: md5, CategoryId: 1, MediaType: 'video', FileName: objectKeyPartial, FileSize: fileSize, Region: region, BucketName: bucket, ServerType: serverType, MediaInfo: { FileExt: '.mp4' } }; return applyRequest; } makeResult(mediaId: string, fileSize: number, mark: string, uploadRet: UploadResult[]): VideoComplete { if (!(this.duration && this.width && this.height)) { throw 'have no video info'; } let videoComplete: VideoComplete = { id: mediaId, url: uploadRet[0].Url, cover: uploadRet[1].Url, mark: mark, duration: this.duration, filesize: fileSize, width: this.width, height: this.height }; return videoComplete; } async makeUploadTask(s3: S3 | OSS, bucket: string, objectKeyPartial: string, mediaId: string, file: string, fileExt: string, md5: string, statusCp: StatusCheckpoint | undefined, listener: (progress: UploadProgress) => void): Promise { let arr = objectKeyPartial.split('/'); arr[0] = 'images'; arr[arr.length - 1] = mediaId + '.jpg'; // 封面objectkey let coverKey = arr.join('/'); let coverFile = await GetCoverImage(this.ffmpegPath, file); let coverSize = fs.statSync(coverFile).size; let videoSize = fs.statSync(file).size; // 视频objectkey let videoKey = `${objectKeyPartial}${mediaId}.mp4`; let promises: Array> = []; // 上传视频 let promise0 = this.UploadVideo(s3, bucket, videoKey, mediaId, videoSize, md5, fs.createReadStream(file), listener); promises.push(promise0); // 上传封面 let promise1 = this.UploadImage(s3, bucket, coverKey, coverSize, fs.createReadStream(coverFile), false, (progress) => { }); promises.push(promise1); // 等待两个上传任务完成 return await Promise.all(promises); } private async UploadImage(client: S3 | OSS, bucket: string, objectKey: string, fileSize: number, filestream: fs.ReadStream, isGif: boolean, listener: (progress: UploadProgress) => void): Promise { if (client instanceof S3) { let request: PutObjectRequest = { Bucket: bucket, ContentLength: fileSize, ContentType: isGif ? 'image/gif' : 'image/jpg', Key: objectKey, Tagging: 'public=1', Body: filestream }; let upload = client.upload(request); upload.on('httpUploadProgress', progress => { listener.call(listener, { Uploaded: progress.loaded, Total: progress.total }); }); let sendData = await upload.promise(); console.log(sendData); } else if (client instanceof OSS) { let sendData = await client.put(objectKey, filestream); console.log(sendData); } else { throw 'makeUploadTask uploader object is wrong.'; } //let url = `https://${that._bucket}.s3-${that._region}.amazonaws.com/${objectKey}`; let url = `${IUploadTask.proxyUrl}/${objectKey}`; return { Url: url, Bucket: bucket, Key: objectKey }; } private async UploadVideo(client: S3 | OSS, bucket: string, objectKey: string, mediaId:string, fileSize: number, md5: string, filestream: fs.ReadStream, listener: (progress: UploadProgress) => void): Promise { if (client instanceof S3) { let request: PutObjectRequest = { Bucket: bucket, ContentLength: fileSize, ContentType: 'video/mp4', Key: objectKey, Tagging: 'public=2', Body: filestream }; let upload = client.upload(request); upload.on('httpUploadProgress', progress => { listener.call(listener, { Uploaded: progress.loaded, Total: progress.total }); }) let sendData = await upload.promise(); console.log('upload success!'); } else if (client instanceof OSS) { if (fileSize < 10 * 1024 * 1024) { let sendData = await client.put(objectKey, filestream.path); } else { // 状态记录对象初始化 let taskStatus: TaskStatus | null = null; try { taskStatus = await TaskStatus.build(); } catch (e) { console.debug('TaskStatus.build() error:' + e); } let noRecord = true; // 进度回调 let progress = (p: any, checkpoint: any) => { // 状态记录 //taskStatus?.insert({ ...checkpoint, md5: md5 }).then().catch(); if(checkpoint) { if (noRecord) { // 状态记录 let status: StatusCheckpoint = { md5: md5, media_id: mediaId, file_size: checkpoint.fileSize, object_key: objectKey, part_size: checkpoint.partSize, upload_id: checkpoint.uploadId }; taskStatus?.insert(status).then().catch(); noRecord = false; } } // 状态通知 listener.call(listener, { Uploaded: p * 10000, Total: 10000 }); }; let sendData = await client.multipartUpload(objectKey, filestream.path, { progress }); // 完成后删除状态记录 taskStatus?.delete(md5); } } else { throw 'makeUploadTask uploader object is wrong.'; } let param = { Bucket: bucket, Key: objectKey }; let url = `${IUploadTask.proxyUrl}/${objectKey}`; return { Bucket: bucket, Key: objectKey, Url: url }; } }