import BackendServer from '../BackendServer'; import {ApplyUploadRequest, SdkSettingsResult} from '../BackendServerModel'; import {UploadProgress, UploadComplete, UploadResult, AllFileContext, VideoComplete} from '../Model'; import S3 from 'aws-sdk/clients/s3'; import OSS from 'ali-oss'; import fs from 'fs'; import path from 'path'; import crypto from 'crypto'; import IUploadTask from './ICryptUploadTask'; import { getPathDigest } from '../Crypt/Encrypt'; import {SencondPass} from "../PassFunction/SencondPass"; import CacheManager from "../StreamCache/CacheManager"; import {CheckPass} from "../PassFunction/CheckPass"; import {TimeUtil} from "../utils/TimeUtil"; import UGCPublish from "../index"; import {CheckBucket} from "../StreamCache/CheckCache"; import {VedioExceptionModel} from "../utils/ExceptionUtil"; /** * 抽象上传方法 */ export class CryptUploadDispatcher { private _url: string = ''; private _token: string; private static _s3: S3 | OSS | null = null; private static _time: number = 0; private static _settings: SdkSettingsResult | null = null; private static _timeSettings: number = 0; static _cacheManager: CacheManager = new CacheManager(); static _sencondPass: SencondPass = new SencondPass(); static _checkPass: CheckPass = new CheckPass(); static _timeUtil: TimeUtil = new TimeUtil(); private _region = 'ap-northeast-1'; private _bucket = 'didisit'; private _receive_proxy: string | null = null; private _minor_region: string = ''; private _minor_bucket: string = ''; private _minor_receive_proxy: string = ''; private _minor_used: boolean = false; private _objectKeyPrefix: string; private _uploadHandler?: (mediaId: string, listener: (progress: UploadProgress) => void) => void; private _listener?: (progress: UploadProgress) => void; private _uploadTask: IUploadTask; private _uploadType: string; constructor(uploadTask: IUploadTask, url: string, token: string,type:string) { this._uploadType = type; this._uploadTask = uploadTask; this._objectKeyPrefix = uploadTask.objectKeyPrefix; this._url = url; this._token = token; } set onUpload(uploadHandler: (mediaId: string, listener: (progress: UploadProgress) => void) => void) { this._uploadHandler = uploadHandler; } set progressListener(listener: (progress: UploadProgress) => void) { this._listener = listener; } /** * 开始上传 * @param file 文件路径 * @param mark 标识 * @param duration 时长 * @param listener 进度监听 */ async start(file: string, mark: string, duration: number | undefined, listener: (progress: UploadProgress) => void): Promise { // 否则直接返回结果 let allFileInfo: AllFileContext = { /** 密文的Md5 */ md5:"", /** 原文的路径*/ file:file, /** uploadTaskType:string */ uploadTaskType:"", /** client */ client: null, /** 原文件大小 */ fileSize: 0, /** 当前日期数据 */ today: null, /** 转换后的时间日期 */ timeData: null, /** 摘要信息 */ digest: null, /** 媒体ID */ mediaId: "", /** 封面信息 */ coverFile: "", /** 媒体信息 */ mediaType: "", /** 减去上传的oss或者S3的地址的信息 */ objectKeyPartial: "", /** 文件后缀 */ fileExt: "", /** 类型id */ categoryId: 0, /** 媒体信息 */ mediaInfo: "", /** 时间长度 */ duration: duration, /** 桶明 */ bucket:null }; /** 获取参数信息 */ let verifInfo; try { verifInfo = await UGCPublish.processUtil.getAllFileContextParam(this._uploadType, file, this._objectKeyPrefix,this, this._bucket ,allFileInfo); if (verifInfo.exception == false) { console.log("获取参数信息失败"); return verifInfo.Complete; } allFileInfo = verifInfo.allFileInfo; }catch (e) { console.log("获取参数信息异常",e); return verifInfo.Complete; } /** 执行获取续传信息参数 */ let checkpoint; try { checkpoint = await UGCPublish.processUtil.getCheckPassInfo(allFileInfo.digest.toString()); if (checkpoint == null){ console.log("续传信息不存在"); } }catch (e){ console.log("获取续传信息异常",e); } /** 创建上上传信息 */ let applyRequest; try { applyRequest = await UGCPublish.processUtil.getMakeApplyUploadRequest(this._uploadTask,this._minor_used,allFileInfo,file, this._region, this._bucket); /** 存在错误信息则直接返回 */ if (applyRequest.exception != true ){ return applyRequest.Complete; } }catch (e) { verifInfo.Complete = { Exception: VedioExceptionModel.E_GET_FILE_INFO_AND_CLIENT_INFOEXCEPTION }; return verifInfo.Complete; } /** 将applyRequest的信息添加到上下文中 */ let allFileInfoAddApplyUploadRequest; try { allFileInfoAddApplyUploadRequest = await UGCPublish.processUtil.applyUploadRequestAddAllFileInfo(checkpoint,allFileInfo,applyRequest); /** 存在错误信息则直接返回 */ if (allFileInfoAddApplyUploadRequest.exception != true){ return allFileInfoAddApplyUploadRequest.Complete; } allFileInfo = allFileInfoAddApplyUploadRequest.allFileInfo; }catch (e){ return allFileInfoAddApplyUploadRequest.Complete; } /** 开始上传任务 */ let uploadRet = await this._uploadTask.makeUploadTask(allFileInfo.client, this._bucket, allFileInfo.objectKeyPartial, allFileInfo.mediaId, file, allFileInfo.fileExt, checkpoint, allFileInfo.today,this._receive_proxy, this._minor_receive_proxy, this._bucket,allFileInfo, listener); /** 更新上传状态 */ try { await CryptUploadDispatcher._sencondPass.SencondPassStorage(uploadRet, allFileInfo.timeData, file, this._bucket, allFileInfo); }catch (e){ console.log("保存秒传数据异常"); } return this._uploadTask.makeResult(allFileInfo.mediaId, allFileInfo.fileSize, mark, uploadRet); } /** * 异步获取存储对象 * @remark 拿取s3对象的时候使用该方法 */ public async getS3orOss() { await this.getSettings(); if (CryptUploadDispatcher._s3 == null || (new Date().getTime() - CryptUploadDispatcher._time) > 50 * 60000) { if (!this._minor_used) { /** console.log("开始获取S3客户端"); */ let bs = new BackendServer(this._url, this._token); let s3 = await bs.CreateS3(this._region); this.setS3(s3); return s3; } else { /** console.log("开始获取OSS客户端"); */ let bs = new BackendServer(this._url, this._token); let oss = await bs.CreateOSSClient(this._minor_region, this._minor_bucket); this.setS3(oss); return oss; } } else { return CryptUploadDispatcher._s3; } } /** * 设置s3对象 * @param s3 * @remark 会更新生成s3的时间 */ private setS3(s3: S3 | OSS) { CryptUploadDispatcher._s3 = s3; CryptUploadDispatcher._time = new Date().getTime(); } private async getSettings() { //console.log("开始获取参数配置"); if (CryptUploadDispatcher._settings == null || (new Date().getTime() - CryptUploadDispatcher._timeSettings) > 50 * 6000) { let bs = new BackendServer(this._url, this._token); //console.log("开始获取SDK设置"); let ret = await bs.GetSdkSettings(); //console.log("获取SDK设置成功"); // 设置major配置 this._bucket = ret.Vod.bucket; this._region = ret.Vod.region; this._receive_proxy = ret.Vod.receive_proxy; // 设置minor配置 // 考虑国内网互传情况 if (ret.Vod.minor && ret.Vod.minor.bucket && ret.Vod.minor.region) { this._minor_bucket = ret.Vod.minor.bucket; this._minor_region = ret.Vod.minor.region; this._minor_receive_proxy = ret.Vod.minor.receive_proxy; } if (ret.Vod.country && ret.Vod.country == "中国") { //console.log("获取国内地址"); this._minor_used = true; } else { //console.log("获取国外地址"); this._minor_used = false; } let proxyUrl = this._minor_used ? this._minor_receive_proxy : ret.Vod.receive_proxy; if (proxyUrl.length > 1 && proxyUrl.charAt(proxyUrl.length - 1) == '/') { proxyUrl = proxyUrl.substr(0, proxyUrl.length - 1); } IUploadTask.proxyUrl = proxyUrl; } } /** * 计算md5值 * @param file */ private GetMd5(file: string): Promise { return new Promise((resolve, reject) => { let hash = crypto.createHash('md5'); let stream = fs.createReadStream(file); stream.on('data', (buffer) => { hash.update(buffer); }); stream.on('end', () => { resolve(hash.digest('hex').toUpperCase()); }); stream.on('error', err => { reject(err); }) }); } private ReplaceUrl(urlstr: string): string { if (urlstr == '') return urlstr; if (this._receive_proxy == null) return urlstr; let url = new URL(urlstr); return this._receive_proxy + '/' + url.pathname; } private getObjectKeyPartial(objectKey: string): string { let index = objectKey.lastIndexOf('/'); if (index > 0) { return objectKey.substring(0, index); } else { return ''; } } }