import BackendServer from '../BackendServer'; import { SdkSettingsResult } from '../BackendServerModel'; import { UploadProgress, UploadComplete } from '../Model'; import S3 from 'aws-sdk/clients/s3'; import OSS from 'ali-oss'; import fs from 'fs'; import path from 'path'; import crypto from 'crypto'; import IUploadTask from './IUploadTask'; import FileUploadTask from './FileUploadTask'; import TaskStatus, { StatusCheckpoint } from '../TaskStatus'; /** * 抽象上传方法 */ export class UploadDispatcher { private _url: string = ''; private _token :string; private static _s3: S3 | OSS | null = null; private static _time: number = 0; private static _settings: SdkSettingsResult | null = null; private static _timeSettings: number = 0; private _region = 'ap-northeast-1'; private _bucket = 'didisit'; private _receive_proxy: string | null = null; private _minor_region: string = ''; private _minor_bucket: string = ''; private _minor_receive_proxy: string = ''; private _minor_used: boolean = false; private _objectKeyPrefix: string; private _uploadHandler?: (mediaId: string, listener: (progress: UploadProgress) => void) => void; private _listener?: (progress: UploadProgress) => void; private _uploadTask: IUploadTask; constructor(uploadTask: IUploadTask, url: string, token :string) { this._uploadTask = uploadTask; this._objectKeyPrefix = uploadTask.objectKeyPrefix; this._url = url; this._token = token; } set onUpload(uploadHandler: (mediaId: string, listener: (progress: UploadProgress) => void) => void) { this._uploadHandler = uploadHandler; } set progressListener(listener: (progress: UploadProgress) => void) { this._listener = listener; } /** * 开始上传 * @param file 文件路径 * @param mark 标识 * @param duration 时长 * @param listener 进度监听 */ async start(file: string, mark: string, duration: number | undefined, listener: (progress: UploadProgress) => void): Promise { let fileSize = fs.statSync(file).size; let fileExt = path.extname(file).toLowerCase(); let md5 = await this.GetMd5(file); // 生成objectKey前缀 objectKeyPartial let today = new Date() let month = today.getMonth() + 1; let day = today.getDate(); let y = today.getFullYear(); let m = month >= 10 ? month.toString() : '0' + month; let d = day >= 10 ? day : '0' + day; let objectKeyPartial = `${this._objectKeyPrefix}/${y}/${m}/${d}/`; let client = await this.getS3orOss(); let bs = new BackendServer(this._url, this._token); // 查找上传状态缓存 let checkpoint: StatusCheckpoint | undefined; try { let taskStatus = await TaskStatus.build(); checkpoint = await taskStatus.query(md5); if (checkpoint) await taskStatus.delete(md5); // 拿到checkpoint之后立即从本地删除 } catch (e) { console.debug(e); } // 秒传查询 let ret = await bs.QueryFileExistingOnServer(md5, fileSize, duration); let fileNeedUpload = false; if(this._uploadTask instanceof FileUploadTask) { if(ret.MediaInfo?.ImageGroup) { // 之前已经按图片上传 fileNeedUpload = true; } } // 状态为0,未上传,则开始上传 // 否则直接返回结果 if (ret.Status === 0 || fileNeedUpload) { let mediaId: string; // 有缓存的上传状态,则不用再次申请mediaId // 否则申请上传mediaId if (checkpoint) { // 更新 midiaId、objectkeyPartial、fileExt mediaId = checkpoint.media_id; objectKeyPartial = this.getObjectKeyPartial(checkpoint.object_key); fileExt = path.extname(checkpoint.object_key); } else { let serverType = this._minor_used ? 2 : 3; // // 申请上传mediaId let applyRequest = this._uploadTask.makeApplyUploadRequest(md5, objectKeyPartial, file, this._region, this._bucket, fileExt, serverType); if(duration == undefined || duration == 0) { ; } else if(duration > 0) { applyRequest.StorageDuration = duration; } else if(duration < 0) { applyRequest.StorageDuration = -1; } let applyRet = await bs.ApplyUpload(applyRequest); // 状态为1,且MediaID有值, 则申请成功 // 失败抛出异常 if (applyRet.Status == 1 && applyRet.MediaID) { mediaId = applyRet.MediaID; } else { throw 'apply upload failed!'; } } // 开始上传任务 let uploadRet = await this._uploadTask.makeUploadTask(client, this._bucket, objectKeyPartial, mediaId, file, fileExt, md5, checkpoint, listener); // 更新上传状态 bs.UpdateByMediaId(mediaId, 1).catch(e => console.error(`UpdateByMediaId: mediaId=${mediaId}, ${e}`)); // 生成结果 return this._uploadTask.makeResult(mediaId, fileSize, mark, uploadRet); } else { // 已存在 //console.log('file exsit:' + ret.MediaID); let result: { Url: string, Bucket: string, Key: string }[] | undefined; let fileExt = ret?.MediaInfo?.FileExt ?? ''; if (ret.MediaType == 'image') { let imageGroups = ret.MediaInfo?.ImageGroup; if(imageGroups) { result = imageGroups.map(g => { let realExtname = g.Name == 'Original' ? fileExt : '.jpg'; let realObjectKey = `${ret.FileName}${ret.MediaID}_${g.Resolution.width}x${g.Resolution.height}${realExtname}`; let url = ret.ServerType === 3 ? `${this._receive_proxy}/${realObjectKey}` : `${this._minor_receive_proxy}/${realObjectKey}`; return { Key: realObjectKey, Url: url, Bucket: this._bucket } }); } else { // imageGroup不存在,则返回文件链接 if(ret.MediaInfo?.FileExt) { let realObjectKey = `${ret.FileName}${ret.MediaID}${ret.MediaInfo.FileExt}`; let url = ret.ServerType === 3 ? `${this._receive_proxy}/${realObjectKey}` : `${this._minor_receive_proxy}/${realObjectKey}`; result = [{ Key: realObjectKey, Url: url, Bucket: this._bucket }]; } } } else if (ret.MediaType == 'video') { let arr = ret.FileName.split('/'); arr[0] = 'images'; arr[arr.length - 1] = ret.MediaID + '.jpg'; let proxyUrl = ret.ServerType === 3 ? this._receive_proxy : this._minor_receive_proxy; // 封面objectkey let coverKey = arr.join('/'); let coverUrl = `${proxyUrl}/${coverKey}`; let videoKey = `${ret.FileName}${ret.MediaID}.mp4`; let videoUrl = `${proxyUrl}/${videoKey}`; result = [ { Key: videoKey, Url: videoUrl, Bucket: this._bucket }, { Key: coverKey, Url: coverUrl, Bucket: this._bucket } ]; } else { //if(ret.MediaType == 'other' || ret.MediaType == 'audio') let realFileExt = ret.MediaInfo?.FileExt ? ret.MediaInfo.FileExt : fileExt; let realObjectKey = `${ret.FileName}${ret.MediaID}${realFileExt}`; let proxyUrl = ret.ServerType === 3 ? this._receive_proxy : this._minor_receive_proxy; let url = `${proxyUrl}/${realObjectKey}`; result = [{ Key: realObjectKey, Url: url, Bucket: this._bucket }]; } if (!result) { throw 'upload result is empty.'; } return this._uploadTask.makeResult(ret.MediaID, fileSize, mark, result); } } /** * 异步获取存储对象 * @remark 拿取s3对象的时候使用该方法 */ private async getS3orOss() { await this.getSettings(); if (UploadDispatcher._s3 == null || (new Date().getTime() - UploadDispatcher._time) > 50 * 60000) { if (!this._minor_used) { let bs = new BackendServer(this._url, this._token); let s3 = await bs.CreateS3(this._region); this.setS3(s3); return s3; } else { let bs = new BackendServer(this._url, this._token); let oss = await bs.CreateOSSClient(this._minor_region, this._minor_bucket); this.setS3(oss); return oss; } } else { return UploadDispatcher._s3; } } /** * 设置s3对象 * @param s3 * @remark 会更新生成s3的时间 */ private setS3(s3: S3 | OSS) { UploadDispatcher._s3 = s3; UploadDispatcher._time = new Date().getTime(); } private async getSettings() { if (UploadDispatcher._settings == null || (new Date().getTime() - UploadDispatcher._timeSettings) > 50 * 6000) { let bs = new BackendServer(this._url, this._token); let ret = await bs.GetSdkSettings(); // 设置major配置 this._bucket = ret.Vod.bucket; this._region = ret.Vod.region; this._receive_proxy = ret.Vod.receive_proxy; // 设置minor配置 // 考虑国内网互传情况 if (ret.Vod.minor && ret.Vod.minor.bucket && ret.Vod.minor.region) { this._minor_bucket = ret.Vod.minor.bucket; this._minor_region = ret.Vod.minor.region; this._minor_receive_proxy = ret.Vod.minor.receive_proxy; } if (ret.Vod.country && ret.Vod.country == "中国") { this._minor_used = true; } else { this._minor_used = false; } let proxyUrl = this._minor_used ? this._minor_receive_proxy : ret.Vod.receive_proxy; if (proxyUrl.length > 1 && proxyUrl.charAt(proxyUrl.length - 1) == '/') { proxyUrl = proxyUrl.substr(0, proxyUrl.length - 1); } IUploadTask.proxyUrl = proxyUrl; } } /** * 计算md5值 * @param file */ private GetMd5(file: string): Promise { return new Promise((resolve, reject) => { let hash = crypto.createHash('md5'); let stream = fs.createReadStream(file); stream.on('data', (buffer) => { hash.update(buffer); }); stream.on('end', () => { resolve(hash.digest('hex').toUpperCase()); }); stream.on('error', err => { reject(err); }) }); } private ReplaceUrl(urlstr: string): string { if (urlstr == '') return urlstr; if (this._receive_proxy == null) return urlstr; let url = new URL(urlstr); return this._receive_proxy + '/' + url.pathname; } private getObjectKeyPartial(objectKey: string): string { let index = objectKey.lastIndexOf('/'); if (index > 0) { return objectKey.substring(0, index); } else { return ''; } } }