import {QiniuError, QiniuErrorName, QiniuRequestError} from '../errors' import * as utils from '../utils' import Base, {Progress, UploadInfo, Extra} from './base' import {CreateMultipartUploadCommand} from "@aws-sdk/client-s3"; import {UploadPartCommand} from "@aws-sdk/client-s3"; import {CompleteMultipartUploadCommand} from "@aws-sdk/client-s3"; import {S3ClientHttpRequestHandler} from "../utils/S3ClientHttpRequestHandler"; import {createXHR} from "../utils"; export interface AwsMultiPart { ETag: string | undefined PartNumber: number } export interface UploadedChunkStorage { ETag: string | undefined PartNumber: number MD5: undefined | string } export interface ChunkLoaded { mkFileProgress: 0 | 1 chunks: number[] } export interface ChunkInfo { chunk: Blob index: number } export interface LocalInfo { data: UploadedChunkStorage[] id: string | undefined } export interface ChunkPart { ETag: string | undefined PartNumber: number } export interface UploadChunkBody extends Extra { Parts: ChunkPart[] } /** 是否为正整数 */ function isPositiveInteger(n: number) { const re = /^[1-9]\d*$/ return re.test(String(n)) } export default class Resume extends Base { /** * @description 文件的分片 chunks */ private chunks: Blob[] /** * @description 使用缓存的 chunks */ private usedCacheList: boolean[] /** * @description 来自缓存的上传信息 */ private cachedUploadedList: UploadedChunkStorage[] /** * @description 当前上传过程中已完成的上传信息 */ private uploadedList: UploadedChunkStorage[] /** * CompleteMultipartUploadCommand中的Parts参数 * @private */ private parts: AwsMultiPart[] /** * @description 当前上传片进度信息 */ private loaded: ChunkLoaded /** * @description 当前上传任务的 id */ private uploadId: string | undefined /** * fixme * AWS Bucket Owner Account ID * @private */ private expectedBucketOwner: string | undefined /** * @returns * @description 实现了 Base 的 run 接口,处理具体的分片上传事务,并抛出过程中的异常。 */ protected async run() { this.logger.info('start run Resume.') if (!this.config.chunkSize || !isPositiveInteger(this.config.chunkSize)) { this.config.chunkSize = 5 } if (this.config.chunkSize > 1024) { throw new QiniuError( QiniuErrorName.InvalidChunkSize, 'chunkSize maximum value is 1024' ) } // 主要为了获取uploadId try { await this.initBeforeUploadChunks() } catch (error) { throw error } const pool = new utils.Pool( (chunkInfo: ChunkInfo) => this.uploadChunk(chunkInfo), this.config.concurrentRequestLimit ) const localKey = this.getLocalKey() const uploadChunks = this.chunks.map((chunk, index) => pool.enqueue({chunk, index})) try { // 上传所有分片 await Promise.all(uploadChunks) // 合并 await this.mkFileReq() } catch (error) { // uploadId 无效,上传参数有误(多由于本地存储信息的 uploadId 失效) if (error instanceof QiniuRequestError && (error.code === 612 || error.code === 400)) { utils.removeLocalFileInfo(localKey, this.logger) } throw error } // 上传成功,清理本地缓存数据 utils.removeLocalFileInfo(localKey, this.logger) } private async uploadChunk(chunkInfo: ChunkInfo) { const {index, chunk} = chunkInfo const cachedInfo = this.cachedUploadedList[index] this.logger.info(`upload part ${index}, cache:`, cachedInfo) const shouldCheckMD5 = this.config.checkByMD5 const reuseSaved = () => { this.usedCacheList[index] = true this.updateChunkProgress(chunk.size, index) this.uploadedList[index] = cachedInfo this.updateLocalCache() } // FIXME: 至少判断一下 size if (cachedInfo && !shouldCheckMD5) { reuseSaved() return } const md5 = await utils.computeMd5(chunk) this.logger.info('computed part md5.', md5) if (cachedInfo && md5 === cachedInfo.MD5) { reuseSaved() return } // 没有使用缓存设置标记为 false this.usedCacheList[index] = false const onProgress = (data: Progress) => { this.updateChunkProgress(data.loaded, index) } const xhr = createXHR() const requestHandler = new S3ClientHttpRequestHandler(xhr) this.addXhr(xhr) requestHandler.onProgress$.subscribe(progress => { this.updateChunkProgress(progress.progressEvent.loaded, index) }); this.s3Client.config.requestHandler = requestHandler this.logger.info(`part ${index + 1} start uploading.`) const res = await this.s3Client.send(new UploadPartCommand({ Body: chunk, Bucket: this.bucketName, Key: this.key, PartNumber: index + 1, UploadId: this.getUploadInfo().id })); const metadata = res.$metadata; if (metadata && metadata.httpStatusCode != 200) { throw new QiniuError(QiniuErrorName.RequestError, `上传分片${index + 1}失败`); } this.logger.info(`part ${index + 1} upload completed.`) // 在某些浏览器环境下,xhr 的 progress 事件无法被触发,progress 为 null,这里在每次分片上传完成后都手动更新下 progress onProgress({ loaded: chunk.size, total: chunk.size }) this.uploadedList[index] = { ETag: res.ETag, PartNumber: index + 1, MD5: md5 } this.updateLocalCache() } private async mkFileReq() { this.parts = []; for (let i = 0, n = this.uploadedList.length; i < n; i++) { let part = this.uploadedList[i] this.parts[i] = { ETag: part.ETag, PartNumber: part.PartNumber, } } const xhr = createXHR() const requestHandler = new S3ClientHttpRequestHandler(xhr) this.addXhr(xhr) this.s3Client.config.requestHandler = requestHandler // CompleteMultipartUploadCommand 在sdk 3.24.0有个bug,见:https://github.com/aws/aws-sdk-js-v3/issues/1814,暂时需要手动修改了sdk const res = await this.s3Client.send(new CompleteMultipartUploadCommand({ Bucket: this.bucketName, Key: this.key, UploadId: this.getUploadInfo().id, MultipartUpload: { Parts: this.parts } })); const metadata = res.$metadata; if (metadata && metadata.httpStatusCode != 200) { throw new QiniuError(QiniuErrorName.RequestError, `合并分片失败`); } this.logger.info('finish Resume Progress.') this.updateMkFileProgress(1) } private async initBeforeUploadChunks() { this.uploadedList = [] this.usedCacheList = [] const cachedInfo = utils.getLocalFileInfo(this.getLocalKey(), this.logger) // 分片必须和当时使用的 uploadId 配套,所以断点续传需要把本地存储的 uploadId 拿出来 // 假如没有 cachedInfo 本地信息并重新获取 uploadId if (!cachedInfo) { this.logger.info('init upload parts from api.') const res = await this.s3Client.send(new CreateMultipartUploadCommand({ Bucket: this.bucketName, Key: this.key, ContentType: 'application/octet-stream' })); if (!res.UploadId) { throw new QiniuError(QiniuErrorName.RequestError, `创建上传任务失败`); } this.logger.info(`inited upload parts of id: ${res.UploadId}.`) this.uploadId = res.UploadId this.cachedUploadedList = [] } else { const infoMessage = [ 'resume upload parts from local cache,', `total ${cachedInfo.data.length} part,`, `id is ${cachedInfo.id}.` ] this.logger.info(infoMessage.join(' ')) this.cachedUploadedList = cachedInfo.data this.uploadId = cachedInfo.id } this.chunks = utils.getChunks(this.file, this.config.chunkSize) this.loaded = { mkFileProgress: 0, chunks: this.chunks.map(_ => 0) } this.notifyResumeProgress() } private getUploadInfo(): UploadInfo { return { id: this.uploadId, url: '' } } private getLocalKey() { return utils.createLocalKey(this.file.name, this.key, this.file.size) } private updateLocalCache() { utils.setLocalFileInfo(this.getLocalKey(), { id: this.uploadId, data: this.uploadedList }, this.logger) } private updateChunkProgress(loaded: number, index: number) { this.loaded.chunks[index] = loaded this.notifyResumeProgress() } private updateMkFileProgress(progress: 0 | 1) { this.loaded.mkFileProgress = progress this.notifyResumeProgress() } private notifyResumeProgress() { this.progress = { total: this.getProgressInfoItem( utils.sum(this.loaded.chunks) + this.loaded.mkFileProgress, // FIXME: 不准确的 fileSize this.file.size + 1 // 防止在 complete 未调用的时候进度显示 100% ), chunks: this.chunks.map((chunk, index) => { const fromCache = this.usedCacheList[index] return this.getProgressInfoItem(this.loaded.chunks[index], chunk.size, fromCache) }), uploadInfo: { id: this.uploadId, url: '' } } this.onData(this.progress) } }