文章目录
- 普通文件上传
- 分片上传
- 整体流程
- 技术点分析
- 文件选择方式
- @隐藏input框,自定义trigger
- @拖拽上传
- 分片
- @动态分片
- 计算哈希
- @worker
- @requestIdleCallback
- @抽样
- 请求
- @并发控制
- @进度展示
- @手动中止/暂停
- 合并
- @流式并发合并
- 反思
- 分片命名问题
- 并发控制代码实现的问题
- 参考文献
普通文件上传
一般我们是用 FormData
上传,比较简单,注意 headers
加上"Content-Type": "multipart/form-data"
const pictureUpload = (file) => {
const formData = new FormData();
formData.append("file", file);
return request({
url: "/upload",
method: "post",
headers: { "Content-Type": "multipart/form-data" },
data: formData,
});
};
分片上传
当遇到文件太大、网络不好等情况时,如果发生连接中断、挂掉,那整个文件就白传,需要重头再传,这是非常不人性化的用户体验。所以我们可以根据网络情况将文件分成小碎片,最后在服务端将碎片合并,以降低网络传输中断带来的风险。
整体流程
- 【前端】文件分片,以序号命名(序号在后端被用于保证合并时的顺序)
- 【前端】计算文件hash(hash是文件的摘要,用于唯一标识)
- 【前端】根据 fileName + hash 查询; 【后端】返回文件是否已经存在(秒传),或者返回文件已经上传了哪些片(断点续传)
- 【前端】上传分片,携带分片的命名和文件hash;【 后端】保存分片,建立一个以hash做名字的目录,将所有分片保存在该目录下
- 【前端】分片传递完毕,携带 fileName + hash发起合并请求;【后端】将hash目录下的所有分片合并成文件,并删除碎片
技术点分析
文件选择方式
@隐藏input框,自定义trigger
常规的 input
存在样式问题,大多数情况下会自定义一个 trigger
。
<template>
<div class="container">
<div class="trigger" @click="trigger">
<div v-if="theFile" class="preview">
{{ theFile.name }}
</div>
<div v-else class="cross">
<div class="bar vertical"></div>
<div class="bar horizontal"></div>
</div>
</div>
<input
ref="inputRef"
type="file"
@change="fileHandler"
style="display: none"
/>
</div>
</template>
<style scoped lang="scss">
.container {
width: v-bind(sizePx);
height: v-bind(sizePx);
border: 1px dashed #ccc;
.trigger {
--bar-width: 20px;
width: 100%;
height: 100%;
position: relative;
cursor: pointer;
.bar {
height: var(--bar-width);
width: 80%;
background-color: #ccc;
position: absolute;
top: calc(50% - var(--bar-width) / 2);
left: 10%;
}
.vertical {
transform: rotate(90deg);
}
}
}
</style>
@拖拽上传
利用拖拽事件的dataTransfer
来获取文件
<div
class="cross"
@drop.prevent="fileDropHandler"
@dragover.prevent="dragOverHandler"
@dragleave.prevent="dragLeaveHandler"
>
function fileDropHandler(e: DragEvent) {
if (e.dataTransfer?.files) {
console.log(e.dataTransfer.files);
const file: File = e.dataTransfer.files[0];
}
}
function dragLeaveHandler() {
triggerRef.value.style.border = "1px dashed #ccc";
}
function dragOverHandler() {
triggerRef.value.style.border = "1px solid #a00";
}
分片
固定分片大小
const file = event.target.files[0];
const chunk_size = 100 * 1024; // 100 KB
const chunks = [];
let curIndex = 0;
let curSize = 0;
while (curSize <= file.size) {
chunks[curIndex] = {
blob: file.slice(curSize, curSize + chunk_size),
name: curIndex + "",
};
curIndex++;
curSize = curIndex * chunk_size;
}
console.log(chunks);
@动态分片
根据网络状况来动态分片,切片大小随着网速适应变化。
计算哈希
直接在主线程计算哈希,有两个问题:1.影响主线程其他操作,导致卡死;2. 时间太久。
(计算哈希使用md5算法,由于该算法的实现方式,后分组依赖于前分组的计算结果,故无法并发地分片求hash)【3】
比较常用的两款哈希计算库有 spark-md5
和 hash-wasm
。 这里用后者。
针对第一个问题,可以考虑使用 DedicatedWorker
或者 requestIdleCallback
。
@worker
在vite
项目中,可以直接使用 import MyWorker from './worker?worker'
语法来引入一个worker
import { createMD5 } from "hash-wasm";
self.onmessage = async (e) => {
const md5 = await createMD5();
md5.init();
console.log(e.data);
md5.update(new Uint8Array(await e.data.arrayBuffer()));
const hash = md5.digest();
self.postMessage(hash);
};
import HashWorker from "./HashWorker?worker";
const w = new HashWorker();
w.onmessage = (e) => {
console.log(e.data);
};
w.postMessage(file);
将文件内容从主线程传递到worker也会导致内存暴增,可以利用分片结果进行增量式postMessage
减缓内存压力
import { createMD5 } from "hash-wasm";
import { IHasher } from "hash-wasm/dist/lib/WASMInterface";
let md5: IHasher | null = null;
self.onmessage = async (e) => {
if (!md5) {
md5 = await createMD5();
md5.init();
}
if (e.data.done) {
const hash = md5.digest();
self.postMessage({ done: true, hash });
self.close();
} else {
md5.update(new Uint8Array(await e.data.blob.arrayBuffer()));
self.postMessage({ done: false, progress: e.data.name });
}
};
const worker = new HashWorker();
worker.onmessage = (e) => {
if (e.data.done) {
console.log(e.data.hash);
worker.terminate();
} else {
console.log(e.data.progress);
loadChunk();
}
};
curIndex = 0;
function loadChunk() {
if (curIndex < chunks.length) {
worker.postMessage(chunks[curIndex]);
curIndex++;
} else {
worker.postMessage({ done: true });
}
}
loadChunk();
@requestIdleCallback
因为文件分片了,利用事件循环的空隙来计算哈希,是很精妙的一个思路,不会影响用户交互,但是实测计算速度还是很慢,跟worker没法比。
let md5 = await createMD5();
md5.init();
curIndex = 0;
async function loadChunk() {
if (curIndex < chunks.length) {
md5.update(new Uint8Array(await chunks[curIndex].blob.arrayBuffer()));
curIndex++;
percentage.value = Math.floor((curIndex / chunks.length) * 100);
requestIdleCallback((deadline) => {
if (deadline.timeRemaining() > 1) {
loadChunk();
}
});
} else {
const hash = md5.digest();
console.log(hash);
console.timeEnd("hash");
percentage.value = 100;
}
}
console.time("hash");
loadChunk();
@抽样
针对第二个问题,可以牺牲hash的准确性,减少工作量,从而缩短时间。网上比较多的策略可以是完整保留首尾两片,其余片都取一个bit。我这里直接简单修改一下,只取偶数片来哈希,哈希所花的时间理论上会少一半。
function loadChunk() {
if (curIndex < chunks.length) {
worker.postMessage(chunks[curIndex++]);
curIndex++;
} else {
worker.postMessage({ done: true });
}
}
请求
@并发控制
分片过多,使用串行上传速度肯定慢,但是使用 Promise.all
不限制并发数也会在建立TCP连接的时候浏览器会直接卡死。最好的办法是手动控制并发数。
async function upload(chunks: any[], hash: string, fileName: string) {
const uploadedChunks = await queryChunks({ hash, fileName: fileName });
if (!uploadedChunks) return; // 文件已经存在了,如果完全没传则返回[]
const toUploadChunks = chunks.filter(
(ck) => !uploadedChunks.includes(ck.name)
);
/* -- Promise.all -- */
// await Promise.all(
// toUploadChunks.map((ck) => {
// const fd = new FormData();
// fd.append("file", ck.blob);
// fd.append("hash", hash);
// fd.append("chunkName", ck.name);
// return uploadChunks(fd);
// })
// );
await queueUpload(
toUploadChunks.map((ck) => {
const fd = new FormData();
fd.append("file", ck.blob);
fd.append("hash", hash);
fd.append("chunkName", ck.name);
return fd;
})
);
await mergeChunks({ hash, fileName: fileName });
}
import { uploadChunks } from "@/api/backend";
export async function queueUpload(tasks = [], max = 5) {
return new Promise((resolve, reject) => {
const results = [];
if (tasks.length === 0) {
resolve(results);
return;
}
let curTask = 0;
let count = 0;
function run() {
while (count < max && curTask < tasks.length) {
const task = tasks[curTask++];
count++;
uploadChunks(task)
.then((res) => {
count--;
if (curTask === tasks.length) {
resolve(results);
} else {
results.push(res);
run();
}
})
.catch((e) => reject(e));
}
}
run();
});
}
上面这段代码是有问题的。读者可以自己看一下,我在反思中会解释。
@进度展示
利用xhr的 progress
事件来获取进度。axios
用 onUploadProgress
。
import { uploadChunks } from "@/api/backend";
export async function queueUpload(tasks = [], max = 5, onProgress) {
return new Promise((resolve, reject) => {
const results = [];
if (tasks.length === 0) {
resolve(results);
return;
}
let curTask = 0;
let count = 0;
const progress = [];
function run() {
while (count < max && curTask < tasks.length) {
const task = tasks[curTask++];
count++;
uploadChunks(task, onUploadProgress(curTask))
.then((res) => {
count--;
if (curTask === tasks.length && count === 0) {
resolve(results);
} else {
results.push(res);
run();
}
})
.catch((e) => reject(e));
}
}
function onUploadProgress(curTask) {
return (pe) => {
progress[curTask] = pe.loaded / pe.total;
onProgress(progress.reduce((a, c) => a + c) / tasks.length);
};
}
run();
});
}
至于可视化展示上,自由发挥啦,完全可以做出很多令人拍案叫绝的用户体验。
整体进度可以使用内外两个环来分表表示 hash 和 上传 的进度,外圈完成后开始内圈。
分片进度可以参考大圣的方案,对每一个分片用一个小方块来表示,用背景色的高度来表示当前分片的进度。
<div class="progress">
<div
v-for="p in progress"
class="progress_item"
:style="{
'--p': p + '%',
}"
></div>
</div>
.progress {
display: flex;
flex-wrap: wrap;
border: 1px dashed #ccc;
.progress_item {
margin: 1px;
background-image: linear-gradient(
180deg,
dodgerblue 0%,
dodgerblue var(--p),
white var(--p),
white 100%
);
width: 24px;
height: 24px;
border: 1px dashed #ccc;
}
}
@手动中止/暂停
如果在上传过程中关闭标签页,上传就会被终止,下次进来重新选择文件,拿到hash再查询已传碎片,达到断点续传。但有时有用户可能想优雅地主动点击暂停。这种情况就需要利用 AbortController
来实现取消请求。
const controller = new AbortController();
axios.get('/foo/bar', {
signal: controller.signal
})
controller.abort()
值得注意的是,如果增加了手动中止功能,基本上都要实现恢复/继续上传的功能,那此时就要注意进度展示时的进度显示问题。可能会出现进度条残存,断点续传时出现进度条倒退等问题。
合并
const chunks = readdirSync(storageHashContainerPath);
const numericAscend = (a, b) => +a - +b;
chunks.sort(numericAscend).forEach((ck) => {
const chunkPath = join(storageHashContainerPath, ck);
appendFileSync(storageFilePath, readFileSync(chunkPath));
unlinkSync(chunkPath);
});
@流式并发合并
串行合并效率比较低,因为所有序号已知,且碎片间无依赖关系,完全可以采用并发合并。
// hash + fileName
app.get("/upload/merge", function (req, res) {
console.log(req.query);
const storageHashContainerPath = join(storageDir, req.query.hash);
const storageFilePath = join(storageHashContainerPath, req.query.fileName);
try {
accessSync(storageFilePath);
const msg = "文件已存在";
res.json({ code: 0, msg, result: storageFilePath });
} catch (e) {
try {
accessSync(storageHashContainerPath);
// 合并目录下所有的片,并清理碎片
const chunks = readdirSync(storageHashContainerPath);
console.log(chunks);
// 筛选出分片(约定的分片命名为 `fileName_startBit`)
const getKey = (ck) => +ck.split("_").slice(-1)[0]; // 拿到 startBit
const numericAscend = (a, b) => +a - +b;
const comparator = (a, b) => numericAscend(...[a, b].map(getKey));
/* - sync - */
// chunks
// .filter((ck) => ck.startsWith(req.query.fileName))
// .sort(comparator)
// .forEach((ck) => {
// console.log(ck);
// const chunkPath = join(storageHashContainerPath, ck);
// appendFileSync(storageFilePath, readFileSync(chunkPath));
// unlinkSync(chunkPath);
// });
/* -stream- */
const _chunks = chunks
.filter((ck) => ck.startsWith(req.query.fileName))
.sort(comparator);
Promise.all(
_chunks.map((ck) => {
const chunkPath = join(storageHashContainerPath, ck);
return pipeline(
createReadStream(chunkPath),
createWriteStream(storageFilePath, {
start: getKey(ck),
})
);
})
).then(() => {
Promise.all(
_chunks.map((ck) => {
const chunkPath = join(storageHashContainerPath, ck);
return unlink(chunkPath);
})
).then(() => {
res.json({ code: 0, msg: "文件上传成功", result: storageFilePath });
});
});
} catch (err) {
console.log(err);
res.json({ code: 1, msg: "合并文件出错" });
}
}
});
反思
分片命名问题
对于文件分片,我们传递的时候使用了 序号 来做标识,合并时也按照序号来合并。 其实还可以改用分片在源文件中的起始bit位置,这个位置是升序的,不仅表明了顺序,还可以得出分片在源文件中的位置。唯一的不足可能就是大文件的这个值会比较大,要考虑溢出的风险(考虑JavaScript最大的安全整数是9007199254740991,即意味着允许最大8192TB的文件,这基本是足够了的)。
然后,考虑一个文件,修改命名后,上传两次,会出现什么结果。首先,他们会有相同的hash,也就会出现在同一个hash目录下,那么合并时的逻辑就得考虑,是两个都保留呢,还是后来的覆盖先来的。正常我们应该将两个都保留,因为用户可能只在意文件名,而不会关心hash一不一样。为了将两个文件都保留下来,以及考虑到将来磁盘空间有限,我们肯定要做碎片定期清理,那我们就得做到能够区分文件和分片,因此我们可以对分片命名做一些标记,以此来从目录下的筛选出分片。(这里我遇到一个有意思的BUG,就是你去打开看这个文件没有问题,但是每传一次这个文件,最后合并出来的体积就会增大一点。这个bug就是因为没有区分文件和碎片,每次都合并hash目录下的所有文件,导致合并结果其实包含了已经存在的文件和其他所有分片。O(∩_∩)O哈哈~)
如何作标记呢?给分片加一个统一的前缀,比如 __
不就可以了?
请考虑一下断点续传。如果第一次只传递了一些分片,没有完成合并操作,然后开始传第二个异名同hash的文件,如何区分两者各自的分片呢?
所以我们还应该为分片命名加上文件名来区分不同文件名对应的分片,以避免断点续传时查到错误的分片列表。
综上,我设计了分片命名规则为 fileName_startBit
,后端在合并时根据 fileName
筛选过滤出正确的分片列表,然后选择 startBit
进行合并操作。
const chunks = [];
let curIndex = 0;
let curBit = 0;
while (curBit < file.size) {
const endBit = Math.min(curBit + CHUNK_SIZE, file.size);
chunks[curIndex] = {
blob: file.slice(curBit, endBit),
name: [file.name, curBit].join("_"),
};
curIndex++;
curBit = endBit;
}
我们要获取分片顺序时,取分片名最后一个 _
后的数字即可,文件名本身中有 _
也不会有任何影响。
并发控制代码实现的问题
在上面提到的并发控制的代码中,最后 resolve
的判断条件是有问题的。我在实测中发现概率性的出现了,文件合成了,最后还有多余的分片的情况。
然后看请求,发现merge请求在uplaod未全部完成的时候就已经发送了。
问题很好定位,因为我们最后resolve
的时机不对,不能只判断任务数量达到最大,还得判断占用count
为0,才能确定是最后一个上传请求完成了。修改如下:
if (curTask === tasks.length && count === 0) {
参考文献
- 字节跳动面试官:请你实现一个大文件上传和断点续传 - 掘金
- 字节跳动面试官,我也实现了大文件上传和断点续传 - 掘金
- MD5加密概述,原理及实现_md5加密原理_Oliver_xpl的博客-CSDN博客
- Node.js 多文件 Stream 合并,串行和并发两种模式实现_nodejs打包后怎么合并_高先生的猫的博客-CSDN博客