import IUploadTask from './ICryptUploadTask'; import {ApplyUploadRequest} from '../BackendServerModel'; import {UploadProgress, VideoComplete, UploadResult, Multiparts, AllFileContext} from '../Model'; import S3, {ListPartsRequest, PutObjectRequest} from 'aws-sdk/clients/s3'; import OSS from 'ali-oss'; import fs from 'fs'; import {GetCoverImage, GetMd5Buffer} from '../utils/ImageUtils'; import { StatusCheckpoint } from '../TaskStatus'; import ICryptUploadTask from "./ICryptUploadTask"; import {EnctyptObject, enctyptSdk, getPathDigest} from "../Crypt/Encrypt"; import {stringToUint8Array} from "../utils/TransferFunction"; import chackCache, {CheckBucket} from "../StreamCache/CheckCache"; import {SencondPassBucket} from "../StreamCache/SencondPassCache"; import {CryptUploadDispatcher} from "./CryptUploadDispatcher"; import UGCPublish from "../index"; import {VedioExceptionModel} from "../utils/ExceptionUtil"; export default class CryptVideoUploadTask implements ICryptUploadTask { ffmpegPath?: string; duration?: number; width?: number; height?: number; objectKeyPrefix: string = 'short_video'; makeApplyUploadRequest(objectKeyPartial: string, file: string, region: string, bucket: string, fileExt: string, serverType: number): ApplyUploadRequest { let fileSize = fs.statSync(file).size; let applyRequest: ApplyUploadRequest = { Md5: undefined, CategoryId: 1, MediaType: 'video', FileName: objectKeyPartial, FileSize: fileSize, Region: region, BucketName: bucket, ServerType: serverType, MediaInfo: { FileExt: '.mp4' } }; return applyRequest; } makeResult(mediaId: string, fileSize: number, mark: string, uploadRet: UploadResult[]): VideoComplete { if (!(this.duration && this.width && this.height)) { throw 'have no video info'; } //console.log("aa",uploadRet[0].Error) let videoComplete: VideoComplete = { id: mediaId, url: uploadRet[0].Url, cover: uploadRet[1].Url, mark: mark, //Exception: uploadRet[0].Error, duration: this.duration, filesize: fileSize, width: this.width, height: this.height, cryptInfo: uploadRet[0].cryptInfo, cryptInfoCover:uploadRet[1].cryptInfo }; return videoComplete; } async makeUploadTask(s3: S3 | OSS, bucket: string, objectKeyPartial: string, mediaId: string, file: string, fileExt: string, statusCp: CheckBucket | undefined,today:Date,_receive_proxy, _minor_receive_proxy, _bucket,allFileInfo:AllFileContext, listener: (progress: UploadProgress) => void): Promise { let coverKey = await UGCPublish.processUtil.getCoverKey(objectKeyPartial,allFileInfo.mediaId); console.log("id",coverKey); if (coverKey == null){ return [{ Error: VedioExceptionModel.E_GET_FILE_COVER_EXCEPTION, }]; } console.log("获取文件位置"); /** 获取文件位置 */ console.log(file); allFileInfo.coverFile = await UGCPublish.processUtil.getCoverFile(this.ffmpegPath, file); if (allFileInfo.coverFile == null){ console.log("获取封面失败"); return [{ Error: VedioExceptionModel.E_GET_FILE_COVER_EXCEPTION }]; } /** 获取视频秒传信息 */ let result = await UGCPublish.processUtil.getVedioSencodePassInfo(statusCp,allFileInfo); //console.log("result:",result); /** 如果信息错误则直接返回错误编码*/ if (result != null){ return result; } /** 获取视频的Uri key*/ let videoKey = await UGCPublish.processUtil.getVedioKey(statusCp,allFileInfo); console.log("videoKey",videoKey); let promises: Array> = []; /** 用来判断是否插入执行续传语句*/ let sequelInsert: boolean = true; let resultEnctyptInfo = await UGCPublish.processUtil.getEnctyptObject(allFileInfo,statusCp); /** 如果信息错误则直接返回错误编码*/ if (resultEnctyptInfo.Complete == null){ result[0].Error = resultEnctyptInfo.exception; result[1].Error = resultEnctyptInfo.exception; return result; } /** 创建加密对象 */ let enctyptObjectVideo = resultEnctyptInfo.Complete; /** 上传加密对象 */ let promisesInfo = await UGCPublish.processUtil.uploadVedioAndCover(this,s3,coverKey, videoKey,sequelInsert, allFileInfo, enctyptObjectVideo,statusCp,listener); /** 返回错误信息 注意这次是不需要强制返回的 */ if (promisesInfo[0].exception != 0 || promisesInfo[1].exception != 0 ){ result[0].Error = resultEnctyptInfo[0].exception; result[1].Error = resultEnctyptInfo[1].exception; } /** 等待上传完成直接删除 */ await chackCache.build().then((obj)=>{ obj.delete(allFileInfo.digest.toString()); }); /** 等待两个上传任务完成 */ return await Promise.all([promisesInfo[0].Complete,promisesInfo[1].Complete]); } private async UploadImage(client: S3 | OSS, bucket: string, objectKey: string, fileSize: number, enctyptObject: EnctyptObject, isGif: boolean, listener: (progress: UploadProgress) => void): Promise { if (client instanceof S3) { let request: PutObjectRequest = { Bucket: bucket, ContentLength: Buffer.from(enctyptObject.ciphertext).length, ContentType: isGif ? 'image/gif' : 'image/jpg', Key: objectKey, Tagging: 'public=1', Body: Buffer.from(enctyptObject.ciphertext) }; let upload = client.upload(request); upload.on('httpUploadProgress', progress => { listener.call(listener, { Uploaded: progress.loaded, Total: progress.total }); }); let sendData = await upload.promise(); //console.log(sendData); } else if (client instanceof OSS) { let sendData = await client.multipartUpload(objectKey, new Buffer(enctyptObject.ciphertext),{}); //console.log(sendData); } else { throw 'makeUploadTask uploader object is wrong.'; } //let url = `https://${that._bucket}.s3-${that._region}.amazonaws.com/${objectKey}`; let url = `${IUploadTask.proxyUrl}/${objectKey}`; const toHexString = bytes => bytes.reduce((str, byte) => str + byte.toString(16).padStart(2, '0'), ''); return { Url: url, Bucket: bucket, Key: objectKey, cryptInfo:{ digest:toHexString(new Uint8Array(enctyptObject.digest)), key:toHexString(new Uint8Array(enctyptObject.key)), fileSize:fileSize } }; } private async UploadVideo(client: S3 | OSS, bucket: string, objectKey: string, sequelInsert:boolean,mediaId:string, file:string,digest:string, fileSize: number, enctyptObject: EnctyptObject,statusCp: CheckBucket | undefined, listener: (progress: UploadProgress) => void): Promise { let md5 = await GetMd5Buffer(Buffer.from(enctyptObject.ciphertext)); //console.log("进入到视频上传函数"); if (client instanceof S3) { if (enctyptObject == undefined) { /** 创建流数据 */ let fileStream = fs.readFileSync(file); enctyptObject = enctyptSdk(fileStream); } /*** * 普通上传数据接口 */ let request: PutObjectRequest = { Bucket: bucket, ContentLength: Buffer.from(enctyptObject.ciphertext).length, ContentType: 'video/mp4', Key: objectKey, Tagging: 'public=2', Body: Buffer.from(enctyptObject.ciphertext) }; let upload = client.upload(request); upload.on('httpUploadProgress', progress => { listener.call(listener, {Uploaded: progress.loaded, Total: progress.total}); }) let sendData = await upload.promise(); //console.log('upload success!'); } else if (client instanceof OSS) { //console.log("obejctkey",objectKey); if (fileSize < 10 * 1024 * 1024) { if (enctyptObject == undefined) { /** 创建流数据 */ let fileStream = fs.readFileSync(file); enctyptObject = enctyptSdk(fileStream); } let sendData = await client.multipartUpload(objectKey, Buffer.from(enctyptObject.ciphertext), {}); } else { //console.log("enctyptObject",enctyptObject); let parts = statusCp ? await this.getUploadedParts(client, bucket, objectKey, statusCp.upload_id) : []; let noRecord = true; if (enctyptObject == undefined) { /** 创建流数据 */ let fileStream = fs.readFileSync(file); enctyptObject = enctyptSdk(fileStream); } //这是一个上传监听 let progress = async (p: any, checkpoint: any) => { if (checkpoint) { if (noRecord) { let status: StatusCheckpoint = { md5: md5, media_id: mediaId, file_size: checkpoint.fileSize, object_key: objectKey, part_size: checkpoint.partSize, upload_id: checkpoint.uploadId }; /** 加入续传信息 */ if (sequelInsert == true) { let today = new Date(); await CryptUploadDispatcher._cacheManager.getCheckCache().then((obj) => { /** 保证信息正确到情况下 进行保存*/ if (enctyptObject != undefined && enctyptObject.key != undefined && enctyptObject.iv != undefined) { if (digest != undefined && digest.length > 31 && enctyptObject.key.byteLength == 64 && enctyptObject.iv.byteLength == 16) { console.log("执行插入语句,",digest); obj.insert({ plain_sha: digest.toString(), md5: md5, media_id: mediaId, file_size: checkpoint.fileSize, object_key: objectKey, part_size: checkpoint.partSize, upload_id: checkpoint.uploadId, key_value: new Uint8Array(enctyptObject.key).toString(), iv_value: new Uint8Array(enctyptObject.iv).toString(), storage_time: (today.getTime() / (1000 * 60)).toString() }); } } }); } noRecord = false; } } // 状态通知 listener.call(listener, {Uploaded: p * 10000, Total: 10000}); }; let checkpoint; if (statusCp) { //console.log("开始执行续传逻辑"); checkpoint = { md5: md5, file: Buffer.from(enctyptObject.ciphertext), fileSize: statusCp.file_size, name: objectKey, partSize: statusCp.part_size, uploadId: statusCp.upload_id, doneParts: parts }; //console.log("读取数据", checkpoint); } //console.log("开始视频上传"); //这里发送流数据 let sendData = await client.multipartUpload(objectKey, Buffer.from(enctyptObject.ciphertext), { progress, checkpoint }); } } else { throw 'makeUploadTask uploader object is wrong.'; } let param = {Bucket: bucket, Key: objectKey}; let url = `${IUploadTask.proxyUrl}/${objectKey}`; const toHexString = bytes => bytes.reduce((str, byte) => str + byte.toString(16).padStart(2, '0'), ''); return { Bucket: bucket, Key: objectKey, Url: url, cryptInfo: { digest: toHexString(new Uint8Array(enctyptObject.digest)), key: toHexString(new Uint8Array(enctyptObject.key)), fileSize: fileSize } }; } /** * 获取已上传的分片 * @param client S3或oss对象 * @param bucket 桶 * @param objectKey key * @param uploadId 上传id */ async getUploadedParts(client: S3 | OSS, bucket: string, objectKey: string, uploadId: string) :Promise { if (client instanceof OSS) { let result = await client.listParts(objectKey, uploadId); let parts = result.parts; return parts.map(part => { return { number: parseInt(part.PartNumber + ''), etag: part.ETag, } }) } else if (client instanceof S3) { let request: ListPartsRequest = { Bucket: bucket, Key: objectKey, UploadId: uploadId }; return new Promise((resolve, reject) => { client.listParts(request, (err, data) => { if (err) reject(err); else { if (data.Parts) { let arr = data.Parts.map(part => { return { number: part.PartNumber ?? 0, etag: part.ETag ?? '', } }) resolve(arr); } else { resolve([]); } } }); }); } else { throw 'makeUploadTask client object is wrong.'; } } }