大文件分片上传、分片进度以及整体进度、断点续传【前端原生、后端 Koa、Node 原生】(一)

news2025/1/11 1:43:53

分片(500MB)进度效果展示

效果展示,一个分片是 500MB 的

请添加图片描述

分片(10MB)进度效果展示

请添加图片描述


大文件分片上传效果展示

请添加图片描述


前端

思路

前端的思路:将大文件切分成多个小文件,然后并发给后端。

页面构建

先在页面上写几个组件用来获取文件。

<body>
  <input type="file" id="file" />
  <button id="uploadButton">点击上传</button>
</body>

功能函数:生成切片

切分文件的核心函数是 slice,没错,就是这么的神奇啊

我们把切好的 chunk 放到数组里,等待下一步的包装处理

/**
 * 默认切片大小 10 MB
 */
const SIZE = 10 * 1024 * 1024;

/**
 * 功能:生成切片
 */
function handleCreateChunk(file, size = SIZE) {
  const fileChunkList = [];
  let cur = 0;
  while (cur < file.size) {
    fileChunkList.push({
      file: file.slice(cur, cur + size),
    });
    cur += size;
  }
  return fileChunkList;
}

功能函数:请求逻辑

在这里简单封装一下 XMLHttpRequest

/**
 * 功能:封装请求
 * @param {*} param0
 * @returns
 */
function request({ url, method = 'post', data, header = {}, requestList }) {
  return new Promise((resolve, reject) => {
    let xhr = new XMLHttpRequest();
    xhr.open(method, url);
    Object.keys(header).forEach((item) => {
      xhr.setRequestHeader(item, header[item]);
    });
    xhr.onloadend = function (e) {
      resolve({
        data: e.target.response,
      });
    };
    xhr.send(data);
  });
}

功能函数:上传切片

/**
 * 功能: 上传切片
 * 包装好 FormData 之后通过 Promise.all() 并发所有切片
 */
async function uploadChunks(hanldleData, fileName) {
  const requestList = hanldleData
    .map(({ chunk, hash }) => {
      const formData = new FormData();
      formData.append('chunk', chunk);
      formData.append('hash', hash);
      formData.append('filename', fileName);
      return formData;
    })
    .map((formData) => {
      request({
        // url: 'http://localhost:3001/upload',
        url: 'upload',
        data: formData,
      });
    });

  await Promise.all(requestList);
}

/**
 * 功能:触发上传
*/
document.getElementById('uploadButton').onclick = async function () {
  // 切片
  const file = document.getElementById('file').files[0];
  console.log(file);
  const fileName = file.name;
  const fileChunkList = handleCreateChunk(file);
  // 包装
  const hanldleData = fileChunkList.map(({ file }, index) => {
    return {
      chunk: file,
      hash: `${fileName}_${index}`,
    };
  });
  await uploadChunks(hanldleData, fileName);
};

可以在请求中看到有很多个请求并发的上传

在这里插入图片描述

优化:进度条的生成

自己简单撸了几个 cube 进度条

<style>
  #uploadCube {
    margin-top: 10px;
    /* width: 520px; */
    overflow: hidden;
  }

  .cube {
    width: 50px;
    height: 50px;
    background-color: #fff;
    float: left;
    border: 1px solid #000;
    .progress {
      height: 100%;
      line-height: 50px;
      text-align: center;
    }
    .uploading {
      background-color: #409eff;
    }
    .success {
      background-color: #51f400;
    }
    .error {
      background-color: #ff9090;
    }
  }
</style>
<body>
  <input type="file" id="file" />
  <button id="uploadButton">点击上传</button>
  <div id="uploadCube"></div>
</body>
/**
 * 功能:生成页面进度的 HTML
 */
function handleUpdateHTML(progressData) {
  let uploadCube = document.querySelector('#uploadCube');
  let html = '';
  progressData.forEach((item) => {
    const { presentage } = item;
    let className = '';
    if (presentage < 100) {
      className = 'progress uploading';
    } else if (presentage == 100) {
      className = 'progress success';
    }
    html += ` <div class="cube">
    <div class="${className}" style="width: ${presentage}%">${presentage}%</div>
  </div>`;
  });
  uploadCube.innerHTML = html;
}

/**
 * 功能:处理每个 chunk 的 xhr.upload.onprogress,拿到各个 chunk 的上传进度
 * - 1. 同时通过 handleUpdateHTML 更新进度页面
 * - 2. progressData 用来记录各个 chunk 的进度
 */
let progressData = [];
function handleCreateOnProgress(data) {
  return (e) => {
    data.presentage = ((e.loaded / e.total) * 100).toFixed(2);
    console.log(JSON.stringify(progressData));
    handleUpdateHTML(progressData);
  };
}

后端 (Koa)

后端的思路是:

  1. 把 Node 暂存的 chunk 文件转移到我想处理的地方(也可以直接处理,看你的)
  2. 创建写入流,把各个 chunk 合并,前端会给你每个 chunk 的大小,还有 hash 值来定位每个 chunk 的位置

获取 chunk 切片文件

先把上传的接口写好

const Koa = require('koa');
const Views = require('koa-views');
const Router = require('koa-router');
const Static = require('koa-static');
const { koaBody } = require('koa-body');
const fs = require('fs');
const fse = require('fs-extra');

const app = new Koa();
const router = new Router();
app.use(Views(__dirname));
app.use(Static(__dirname));
app.use(
  koaBody({
    multipart: true,
    formidable: {
      maxFields: 1000 * 1024 * 1024,
    },
  })
);

router.get('/', async (ctx) => {
  await ctx.render('index.html');
});

/**
 * 功能:上传接口
 * - 从 ctx.request.body 中获取 hash 以及 filename
 * - 从 ctx.request.files 中拿到分片数据
 * - 然后再把 node 帮我们临时存放的 chunk 文件的 filepath 拿到,之后移动到我们想要存放的路径下
 * - filepath 和 hash 是一一对应的关系
 */
router.post('/upload', async (ctx) => {
  const { hash, filename } = ctx.request.body;
  const { filepath } = ctx.request.files?.chunk;
  const chunkPath = `${__dirname}/chunkPath/${filename}`;
  if (!fse.existsSync(chunkPath)) {
    await fse.mkdirs(chunkPath);
  }
  await fse.move(filepath, `${chunkPath}/${hash}`);
  ctx.body = {
    code: 1,
  };
});

app.use(router.routes());
app.listen(3000, () => {
  console.log(`server start: http://localhost:3000`);
});

写完这些就可以拿到 chunk
在这里插入图片描述

合并接口

先写一个接口,用来拿到 hash文件名

/**
 * 功能: merge 接口
 * - hasMergeChunk 变量是上面用来记录的
 * - mergePath 定义一下合并后的文件的路径
 */
router.post('/merge', async (ctx) => {
  // console.log(ctx.request.body);
  const { fileName, size } = ctx.request.body;
  hasMergeChunk = {};
  const mergePath = `${__dirname}/merge/${fileName}`;
  if (!fse.existsSync(`${__dirname}/merge`)) {
    fse.mkdirSync(`${__dirname}/merge`);
  }
  await mergeChunk(mergePath, fileName, size);
  ctx.body = {
    data: '成功',
  };
});

合并分片的功能函数

然后开始合并

/**
 * 功能:合并 Chunk
 * - 1. chunkDir: 是 chunks 文件们所在的文件夹的路径
 * - 2. chunkPaths: 是个 Array,数组中包含所有的 chunk 的 path
 * - 3. 因为 每个 chunk 的 path 命名是通过 hash 组成的,所以我们先排序一下,
 * - 算是为 createWriteStream 中的 start 做准备
 * - 4. 为每个 chunk 的 path 创建写入流,写到 mergePath 这个路径下。因为已经
 * - 排序了,所以 start 就是每个文件的 index * eachChunkSize
 * @param {*} mergePath
 * @param {*} name
 * @param {*} eachChunkSize
 */
async function mergeChunk(mergePath, name, eachChunkSize) {
  const chunkDir = `${__dirname}/chunkPath/${name}`;
  const chunkPaths = await fse.readdir(chunkDir);
  chunkPaths.sort((a, b) => a.split('_')[1] - b.split('_')[1]);

  await Promise.all(
    chunkPaths.map((chunk, index) => {
      const eachChunkPath = `${chunkDir}/${chunk}`;
      const writeStream = fse.createWriteStream(mergePath, {
        start: index * eachChunkSize,
      });
      return pipeStream(eachChunkPath, writeStream);
    })
  );
  console.log('合并完成');
  fse.rmdirSync(chunkDir);
  console.log(`删除 ${chunkDir} 文件夹`);
}

接着就是写入流

/**
 * 功能:创建 pipe 写文件流
 * - 1. [首先了解一下什么是输入输出流](https://www.jmjc.tech/less/111)
 * - 2. hasMergeChunk 变量用于记录一下那些已经合并完成了,也可以写成数组,都行。
 * - 3. 可以检测输出流的 end 事件,表示我这个 chunk 已经流完了,然后写一下善后逻辑。
 * @param {*} path
 * @param {*} writeStream
 * @returns
 */
let hasMergeChunk = {};
function pipeStream(path, writeStream) {
  return new Promise((resolve) => {
    const readStream = fse.createReadStream(path); // 输出流
    readStream.pipe(writeStream); // 输出通过管道流向输入
    readStream.on('end', () => {
      hasMergeChunk[path] = 'finish';
      fse.unlinkSync(path); // 删除此文件
      resolve();
      console.log(`合并 No.${path.split('_')[1]}, 已经合并${Object.keys(hasMergeChunk).length}`);
    });
  });
}

至此一个基本的逻辑上传就做好了!


后端 (Node 原生)

想了想还是有必要用原生写一下 ,复习一下。

基础:搭建简单的服务

先写一个基本的服务框架

const http = require('http');
const server = http.createServer();
server.on('request', async (req, res) => {
  res.setHeader('Access-Control-Allow-Origin', '*');
  res.setHeader('Access-Control-Allow-Headers', '*');
  if (req.method === 'OPTION') {
    res.status = 200;
    res.end();
    return;
  }
  res.end('hello node');
});
const POST = 3000;
server.listen(POST, null, null, () => {
  console.log(`server start: http://localhost:${POST}`);
});

基础:资源返回

添加页面的返回,以及资源的返回

const http = require('http');
const server = http.createServer();
const url = require('url');
const fs = require('fs');
const path = require('path');
const MIME = require('./mime.json');
server.on('request', async (req, res) => {
  res.setHeader('Access-Control-Allow-Origin', '*');
  res.setHeader('Access-Control-Allow-Headers', '*');
  res.setHeader('content-type', 'text/html;charset=utf-8');
  if (req.method === 'OPTION') {
    res.status = 200;
    res.end();
    return;
  }

  let { pathname } = url.parse(req.url); // 解析一下 url,因为 req.url 可能会带一些参数
  //   console.log('req.url:>>', req.url);
  //   console.log('url.parse(req.url):>>', url.parse(req.url));
  console.log(`进入${pathname}`);
  switch (pathname) {
    case '/':
    case '/index': {
      let rs = fs.createReadStream('./index.html');
      rs.pipe(res);
      break;
    }
    case '/favicon.ico': {
      res.end('我没有哦');
      break;
    }
    default: {
      let ext = path.extname(pathname);
      res.setHeader('Content-Type', MIME[ext]); // 通过请求的资源后缀名,来返回对应的 Content-type 的类型
      let rs = fs.createReadStream(`.${pathname}`);
      rs.pipe(res);
    }
  }
});
const POST = 3000;
server.listen(POST, null, null, () => {
  console.log(`server start: http://localhost:${POST}`);
});

MIME.json 在这个文章的最底下, 或者可以自己找个更全的。

功能:上传接口

接着写上传的接口,这里参考大圣老师的代码,写一个类来收集方法。

类定义如下,新建一个文件 controller.js

/**
 * module.exports 方法用于在服务器端导出模块,并以 CommonJS 格式提供。
 * - 参考:https://www.delftstack.com/zh/howto/node.js/create-and-export-classes/
 */
const multiparty = require('multiparty');
const fse = require('fs-extra');
class Controller {
  constructor(dirPath) {
    this.chunkPath = dirPath;
  }
  /**
   * multiparty 使用方法:https://www.npmjs.com/package/multiparty
   * - chunkFileDirPath 为关于文件 chunks 的文件夹路径,每个大文件根据文件名生成相关的文件夹
   * - 注意回调函数里的 this
   * @param {*} url
   * @param {*} path
   */
  async handleUpload(req, res) {
    const _this = this;
    const form = new multiparty.Form();
    form.parse(req, async function (err, fields, files) {
      if (err) {
        console.log(err);
        return;
      }
      const [chunk] = files.chunk;
      const [hash] = fields.hash;
      const [filename] = fields.filename;

      const chunkFileDirPath = `${_this.chunkPath}/${filename}`;
      if (!fse.existsSync(chunkFileDirPath)) {
        await fse.mkdirs(chunkFileDirPath);
      }
      await fse.move(chunk?.path, `${chunkFileDirPath}/${hash}`);
      res.end('收到文件 chunks');
    });
  }
}
module.exports = Controller;

然后在主服务里引入这个类,再上传接口这里调用一下类方法。

// ... 
const Controller = require('./controller');
const UPLOAD_DIR = `${__dirname}/chunkPath`; // chunks 上传的文件夹
const controller = new Controller(UPLOAD_DIR);
// ... 
case '/upload': {
    await controller.handleUpload(req, res);
    break;
}

功能:写合并接口

合并的逻辑跟 Koa 几乎没什么差别,只不过我都把方法封装到类里了。

首先写路由

case '/merge': {
  await controller.handleMerge(req, res);
  break;
}

然后在类中定义合并的方法

/**
 * 功能:合并
 * - 1. handlePostData 用来处理 POST 传递的数据,具体怎么处理的请查看方法
 * - 2. 把各个文件的 path 先想清楚要存到哪儿,建议自己写一写。
 * - 我是把所有的 chunks 都放到大目录 chunkPath 中,
 * - 然后在用文件名新建文件夹,再把chunks放到子文件夹中。
 * @param {*} req
 * @param {*} res
 */
async handleMerge(req, res) {
  const postData = await handlePostData(req);
  const { fileName, size: eachChunkSize } = postData;
  const mergePath = `${__dirname}/merge`;
  const mergeFilePath = `${__dirname}/merge/${fileName}`;
  if (!fse.existsSync(mergePath)) {
    fse.mkdirSync(mergePath);
  }
  const mergeOptions = { chunksPath: this.chunksPath, mergeFilePath, fileName, eachChunkSize };
  await handleMergeChunks(mergeOptions);
  console.log('Success Merge');
  res.end(
    JSON.stringify({
      code: 1,
      message: 'success merge',
    })
  );
}

这里的 POST 请求需要处理一下

/**
 * 功能:处理 POST 请求
 * - 与 GET 数据相比,POST 数据量大,需要分段。
 * - 通过 req.on('data', function(data) {}) 监听
 * - 当有一段数据到达的时候执行回调,回调函数参数 data 为每段达到的数据。
 * - 当数据全部到达时会触发 req.on('end', function() {}) 里面的回调函数。
 * - 可以通过 JSON.parse(str) 解析成我们想要的 POST 请求数据格式。
 * - 参考资料:https://juejin.cn/post/7142700338414518286#heading-2
 * @param {*} req
 * @returns
 */
function handlePostData(req) {
  return new Promise((resolve, reject) => {
    let allData = '';
    let i = 0;
    req.on('data', function (chunkData) {
      //   console.log(`第 ${++i} 次收到数据`);
      allData += chunkData;
    });
    req.on('end', function () {
      const POST_MESSAGE = JSON.parse(allData);
      resolve(POST_MESSAGE);
    });
  });
}

然后就是合并 chunks,具体的注释我都放到代码里了

/**
 * 功能:合并 chunks
 * - 1. 首先根据 fileChunksDir 拿到所有 chunks 的文件名
 * - 2. 然后拼接成 fileAllChunksPaths <Array> 数组,然后一一创建可写流
 * - 3. fileAllChunksPaths 注意这里需要排序一下,不然就是乱的,这也是我们创建可写流 srart 位置的基础
 * - 4. 然后这里通过 pipeStream 函数用 Promise 包装了一下可读流,代码需要慢慢读去理解。
 * - 5. 我们这里的 可写流们,是根据 chunks 的不同,定义好写入的文件 path,
 * - 以及每个块儿写的开始位置和写入大小,每个可写流都是不一样的!
 * -
 * @param {*} param0
 */
async function handleMergeChunks({ chunksPath, mergeFilePath, fileName, eachChunkSize }) {
  const fileChunksDir = `${chunksPath}/${fileName}`;
  const fileAllChunksPaths = await fse.readdir(fileChunksDir);
  console.log(fileAllChunksPaths);
  fileAllChunksPaths.sort((a, b) => a.split('_')[1] - b.split('_')[1]);
  const promiseArray = fileAllChunksPaths.map((chunk, index, array) => {
    const eachChunkPath = `${fileChunksDir}/${chunk}`;
    const writeStream = fse.createWriteStream(mergeFilePath, {
      start: index * eachChunkSize,
    });
    return pipeStream(eachChunkPath, writeStream, array.length);
  });
  await Promise.all(promiseArray);
}

把创建写文件流功能也拆分出来。不了解流的概念的话,首先了解一下什么是输入可读流。

/**
 * 功能:创建 pipe 写文件流
 * - 1. [首先了解一下什么是输入可读流](https://www.jmjc.tech/less/111)
 * - 方便记忆:可读流 通过 管道 流入 可写流。    可读流  =======> 可写流
 * - 2. hasMergeChunk 变量用于记录一下那些已经合并完成了,也可以写成数组,都行。
 * - 3. 可以检测可读流的 end 事件,表示我这个 chunk 已经流完了,然后写一下善后逻辑。
 * @param {*} path
 * @param {*} writeStream
 * @returns
 */
let hasMergeChunk = {};
function pipeStream(path, writeStream, length) {
  return new Promise((resolve) => {
    const readStream = fse.createReadStream(path);
    readStream.pipe(writeStream);
    readStream.on('end', function () {
      hasMergeChunk[path] = 'finished';
      fse.unlinkSync(path);
      resolve();
      console.log(
        `doing: No.${path.split('_')[1]} progress: [ ${Object.keys(hasMergeChunk).length} / ${length} ]`
      );
    });
  });
}

结语

真的只是收藏不点赞嘛…

上传还有合并这两大功能基本上也就完成啦!觉得有用的话,请点个赞吧~谢谢吴彦祖们!!!
请添加图片描述


参考文章

  1. 字节跳动面试官:请你实现一个大文件上传和断点续传
  2. 字节跳动面试官,我也实现了大文件上传和断点续传

Q & A

Q: 发送片段之后的合并可能出现错误

这个情况分析了一下是前端的锅啊,前端的 await Promise.all() 并不能保证后端的文件流都写完了。

在这里插入图片描述

Q: 进度条直接从 0 到了 100

我发现我的请求写错了

在这里插入图片描述

完整代码

前端

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>Document</title>
    <script src="request.js"></script>
    <style>
      #uploadCube {
        margin-top: 10px;
        /* width: 520px; */
        overflow: hidden;
      }

      .cube {
        width: 50px;
        height: 50px;
        background-color: #fff;
        float: left;
        border: 1px solid #000;
        .progress {
          height: 100%;
          line-height: 50px;
          text-align: center;
        }
        .uploading {
          background-color: #409eff;
        }
        .success {
          background-color: #51f400;
        }
        .error {
          background-color: #ff9090;
        }
      }
    </style>
  </head>
  <body>
    <input type="file" id="file" />
    <button id="uploadButton">点击上传</button>
    <!-- <button id="mergeButton">点击合并</button> -->
    <div id="uploadCube"></div>
  </body>

  <script>
    /**
     * 默认切片大小
     */
    const SIZE = 10 * 1024 * 1024;

    /**
     * 功能:生成切片
     */
    function handleCreateChunk(file, size = SIZE) {
      const fileChunkList = [];
      let cur = 0;
      while (cur < file.size) {
        fileChunkList.push({
          file: file.slice(cur, cur + size),
        });
        progressData.push({ presentage: 0 });
        cur += size;
      }
      return fileChunkList;
    }

    /**
     * 功能:生成页面进度的 HTML
     */
    function handleUpdateHTML(progressData) {
      let uploadCube = document.querySelector('#uploadCube');
      let html = '';
      progressData.forEach((item) => {
        const { presentage } = item;
        let className = '';
        if (presentage < 100) {
          className = 'progress uploading';
        } else if (presentage == 100) {
          className = 'progress success';
        }
        html += ` <div class="cube">
        <div class="${className}" style="width: ${presentage}%">${presentage}%</div>
      </div>`;
      });
      uploadCube.innerHTML = html;
    }

    /**
     * 功能:处理每个 chunk 的 xhr.upload.onprogress,拿到各个 chunk 的上传进度
     * - 1. 同时通过 handleUpdateHTML 更新进度页面
     * - 2. progressData 用来记录各个 chunk 的进度
     */
    let progressData = [];
    function handleCreateOnProgress(data) {
      return (e) => {
        data.presentage = ((e.loaded / e.total) * 100).toFixed(2);
        console.log(JSON.stringify(progressData));
        handleUpdateHTML(progressData);
      };
    }

    /**
     * 功能: 上传切片
     * - 注意 map 里别忘了写 return
     */
    async function uploadChunks(hanldleData, fileName) {
      const requestList = hanldleData
        .map(({ chunk, hash, index }) => {
          const formData = new FormData();
          formData.append('chunk', chunk);
          formData.append('hash', hash);
          formData.append('filename', fileName);
          return { formData, index };
        })
        .map(({ formData, index }) => {
          return request({
            url: 'upload',
            data: formData,
            onprogress: handleCreateOnProgress(progressData[index]),
          });
        });
      await Promise.all(requestList).then((res) => {
        console.log('所有上传结束', res);
      });
      console.log('发送合并请求');
      await request({
        url: 'merge',
        headers: {
          'content-type': 'application/json',
        },
        data: JSON.stringify({
          size: SIZE,
          fileName,
        }),
      });
    }

    document.getElementById('uploadButton').onclick = async function () {
      // 切片
      const file = document.getElementById('file').files[0];
      const fileName = file.name;
      const fileChunkList = handleCreateChunk(file);
      // 包装
      const hanldleData = fileChunkList.map(({ file }, index) => {
        return {
          chunk: file,
          hash: `${fileName}_${index}`,
          index,
        };
      });
      await uploadChunks(hanldleData, fileName);
    };
  </script>
</html>

后端 Koa

const Koa = require('koa');
const Views = require('koa-views');
const Router = require('koa-router');
const Static = require('koa-static');
const { koaBody } = require('koa-body');
const fse = require('fs-extra');

const app = new Koa();
const router = new Router();
app.use(Views(__dirname));
app.use(Static(__dirname));
app.use(
  koaBody({
    multipart: true,
    formidable: {
      maxFileSize: 1000 * 1024 * 1024,
    },
  })
);

router.get('/', async (ctx) => {
  await ctx.render('index.html');
});

/**
 * 功能:上传接口
 * - 从 ctx.request.body 中获取 hash 以及 filename
 * - 从 ctx.request.files 中拿到分片数据
 * - 然后再把 node 帮我们临时存放的 chunk 文件的 filepath 拿到,之后移动到我们想要存放的路径下
 * - filepath 和 hash 是一一对应的关系
 */
router.post('/upload', async (ctx) => {
  const { hash, filename } = ctx.request.body;
  const { filepath } = ctx.request.files?.chunk;
  const chunkPath = `${__dirname}/chunkPath/${filename}`;
  if (!fse.existsSync(chunkPath)) {
    await fse.mkdirs(chunkPath);
  }
  await fse.move(filepath, `${chunkPath}/${hash}`);
  ctx.body = {
    code: 1,
  };
});

/**
 * 功能:创建 pipe 写文件流
 * - 1. [首先了解一下什么是输入可读流](https://www.jmjc.tech/less/111)
 * - 2. hasMergeChunk 变量用于记录一下那些已经合并完成了,也可以写成数组,都行。
 * - 3. 可以检测可读流的 end 事件,表示我这个 chunk 已经流完了,然后写一下善后逻辑。
 * @param {*} path
 * @param {*} writeStream
 * @returns
 */
let hasMergeChunk = {};
function pipeStream(path, writeStream) {
  return new Promise((resolve) => {
    const readStream = fse.createReadStream(path); // 可读流
    readStream.pipe(writeStream); // 可读流通过管道流向可写流
    readStream.on('end', () => {
      hasMergeChunk[path] = 'finish';
      fse.unlinkSync(path); // 删除此文件
      resolve();
      console.log(`合并 No.${path.split('_')[1]}, 已经合并${Object.keys(hasMergeChunk).length}`);
    });
  });
}

/**
 * 功能:合并 Chunk
 * - 1. chunkDir: 是 chunks 文件们所在的文件夹的路径
 * - 2. chunkPaths: 是个 Array,数组中包含所有的 chunk 的 path
 * - 3. 因为 每个 chunk 的 path 命名是通过 hash 组成的,所以我们先排序一下,
 * - 算是为 createWriteStream 中的 start 做准备
 * - 4. 为每个 chunk 的 path 创建写入流,写到 mergePath 这个路径下。因为已经
 * - 排序了,所以 start 就是每个文件的 index * eachChunkSize
 * - 5. 每个写入流都用 Promise 包装了一下,然后用 await Promise.all() 等待处理完
 * @param {*} mergePath
 * @param {*} name
 * @param {*} eachChunkSize
 */
async function mergeChunk(mergePath, name, eachChunkSize) {
  const chunkDir = `${__dirname}/chunkPath/${name}`;
  const chunkPaths = await fse.readdir(chunkDir);
  chunkPaths.sort((a, b) => a.split('_')[1] - b.split('_')[1]);

  await Promise.all(
    chunkPaths.map((chunk, index) => {
      const eachChunkPath = `${chunkDir}/${chunk}`;
      // 创建输入流,并为每个 chunk 定好位置
      const writeStream = fse.createWriteStream(mergePath, {
        start: index * eachChunkSize,
      });
      return pipeStream(eachChunkPath, writeStream);
    })
  );
  console.log('合并完成');
  fse.rmdirSync(chunkDir);
  console.log(`删除 ${chunkDir} 文件夹`);
}

/**
 * 功能: merge 接口
 * - hasMergeChunk 变量是上面用来记录的
 * - mergePath 定义一下合并后的文件的路径
 */
router.post('/merge', async (ctx) => {
  // console.log(ctx.request.body);
  const { fileName, size } = ctx.request.body;
  hasMergeChunk = {};
  const mergePath = `${__dirname}/merge/${fileName}`;
  if (!fse.existsSync(`${__dirname}/merge`)) {
    fse.mkdirSync(`${__dirname}/merge`);
  }
  await mergeChunk(mergePath, fileName, size);
  ctx.body = {
    data: '成功',
  };
});

app.use(router.routes());
app.listen(3000, () => {
  console.log(`server start: http://localhost:3000`);
});

request.js 的封装

/**
 * 功能:封装请求
 * - 1. xhr.upload.onprogress 注意不要拉下 upload
 * @param {*} param0
 * @returns
 */
function request({ url, method = 'post', data, headers = {}, onprogress = (e) => e, requestList }) {
  return new Promise((resolve, reject) => {
    let xhr = new XMLHttpRequest();
    xhr.open(method, url);
    Object.keys(headers).forEach((item) => {
      xhr.setRequestHeader(item, headers[item]);
    });
    xhr.upload.onprogress = onprogress;
    xhr.onloadend = function (e) {
      resolve({
        data: e.target.response,
      });
    };
    xhr.send(data);
  });
}

后端原生

主服务

const http = require('http');
const server = http.createServer();
const url = require('url');
const fs = require('fs');
const path = require('path');
const MIME = require('./mime.json');

const Controller = require('./controller');
const UPLOAD_DIR = `${__dirname}/chunkPath`; // chunks 上传的文件夹
const controller = new Controller(UPLOAD_DIR);

server.on('request', async (req, res) => {
  res.setHeader('Access-Control-Allow-Origin', '*');
  res.setHeader('Access-Control-Allow-Headers', '*');
  res.setHeader('content-type', 'text/html;charset=utf-8');
  if (req.method === 'OPTION') {
    res.status = 200;
    res.end();
    return;
  }
  let { pathname } = url.parse(req.url); // 解析一下 url,因为 req.url 可能会带一些参数
  // console.log('req.url:>>', req.url); console.log('url.parse(req.url):>>', url.parse(req.url));
  console.log(`进入${pathname}`);
  switch (pathname) {
    case '/':
    case '/index': {
      let rs = fs.createReadStream('./index.html');
      rs.pipe(res);
      break;
    }
    case '/upload': {
      await controller.handleUpload(req, res);
      break;
    }
    case '/merge': {
      await controller.handleMerge(req, res);
      break;
    }
    case '/favicon.ico': {
      res.end('我没有哦');
      break;
    }
    default: {
      let ext = path.extname(pathname);
      res.setHeader('Content-Type', MIME[ext]); // 通过请求的资源后缀名,来返回对应的 Content-type 的类型
      let rs = fs.createReadStream(`.${pathname}`);
      rs.pipe(res);
    }
  }
});
const POST = 3000;
server.listen(POST, null, null, () => {
  console.log(`server start: http://localhost:${POST}`);
});

/**
 * module.expothis.chunksPathrts 方法用于在服务器端导出模块,并以 CommonJS 格式提供。
 * - 参考:https://www.delftstack.com/zh/howto/node.js/create-and-export-classes/
 */
const multiparty = require('multiparty');
const fse = require('fs-extra');
const { handlePostData, handleMergeChunks } = require('./tools');
class Controller {
  constructor(dirPath) {
    this.chunksPath = dirPath;
  }

  /**
   * 功能:合并
   * - 1. handlePostData 用来处理 POST 传递的数据,具体怎么处理的请查看方法
   * - 2. 把各个文件的 path 先想清楚要存到哪儿,建议自己写一写。
   * - 我是把所有的 chunks 都放到大目录 chunkPath 中,
   * - 然后在用文件名新建文件夹,再把chunks放到子文件夹中。
   * @param {*} req
   * @param {*} res
   */
  async handleMerge(req, res) {
    const postData = await handlePostData(req);
    const { fileName, size: eachChunkSize } = postData;
    const mergePath = `${__dirname}/merge`;
    const mergeFilePath = `${__dirname}/merge/${fileName}`;
    if (!fse.existsSync(mergePath)) {
      fse.mkdirSync(mergePath);
    }
    const mergeOptions = { chunksPath: this.chunksPath, mergeFilePath, fileName, eachChunkSize };
    await handleMergeChunks(mergeOptions);
    console.log('Success Merge');
    res.end(
      JSON.stringify({
        code: 1,
        message: 'success merge',
      })
    );
  }

  /**
   * multiparty 使用方法:https://www.npmjs.com/package/multiparty
   * - chunkFileDirPath 为关于文件 chunks 的文件夹路径,每个大文件根据文件名生成相关的文件夹
   * - 注意回调函数里的 this
   * @param {*} url
   * @param {*} path
   */
  async handleUpload(req, res) {
    const _this = this;
    const form = new multiparty.Form();
    form.parse(req, async function (err, fields, files) {
      if (err) {
        console.log(err);
        return;
      }
      const [chunk] = files.chunk;
      const [hash] = fields.hash;
      const [filename] = fields.filename;

      const chunkFileDirPath = `${_this.chunksPath}/${filename}`;
      if (!fse.existsSync(chunkFileDirPath)) {
        await fse.mkdirs(chunkFileDirPath);
      }
      await fse.move(chunk?.path, `${chunkFileDirPath}/${hash}`);
      res.end('收到文件 chunks');
    });
  }
}
module.exports = Controller;

工具函数

/**
 * 学习:__dirname 就是跟文件一起的,不会因为引用关系而恒定
 */
// console.log(__dirname);
const fse = require('fs-extra');
/**
 * 功能:处理 POST 请求
 * - 与 GET 数据相比,POST 数据量大,需要分段。
 * - 通过 req.on('data', function(data) {}) 监听
 * - 当有一段数据到达的时候执行回调,回调函数参数 data 为每段达到的数据。
 * - 当数据全部到达时会触发 req.on('end', function() {}) 里面的回调函数。
 * - 可以通过 JSON.parse(str) 解析成我们想要的 POST 请求数据格式。
 * - 参考资料:https://juejin.cn/post/7142700338414518286#heading-2
 * @param {*} req
 * @returns
 */
function handlePostData(req) {
  return new Promise((resolve, reject) => {
    let allData = '';
    let i = 0;
    req.on('data', function (chunkData) {
      //   console.log(`第 ${++i} 次收到数据`);
      allData += chunkData;
    });
    req.on('end', function () {
      const POST_MESSAGE = JSON.parse(allData);
      resolve(POST_MESSAGE);
    });
  });
}

/**
 * 功能:创建 pipe 写文件流
 * - 1. [首先了解一下什么是输入可读流](https://www.jmjc.tech/less/111)
 * - 方便记忆:可读流 通过 管道 流入 可写流。    可读流  =======> 可写流
 * - 2. hasMergeChunk 变量用于记录一下那些已经合并完成了,也可以写成数组,都行。
 * - 3. 可以检测可读流的 end 事件,表示我这个 chunk 已经流完了,然后写一下善后逻辑。
 * @param {*} path
 * @param {*} writeStream
 * @returns
 */
let hasMergeChunk = {};
function pipeStream(path, writeStream, length) {
  return new Promise((resolve) => {
    const readStream = fse.createReadStream(path);
    readStream.pipe(writeStream);
    readStream.on('end', function () {
      hasMergeChunk[path] = 'finished';
      fse.unlinkSync(path);
      resolve();
      console.log(
        `doing: No.${path.split('_')[1]} progress: [ ${Object.keys(hasMergeChunk).length} / ${length} ]`
      );
    });
  });
}

/**
 * 功能:合并 chunks
 * - 1. 首先根据 fileChunksDir 拿到所有 chunks 的文件名
 * - 2. 然后拼接成 fileAllChunksPaths <Array> 数组,然后一一创建可写流
 * - 3. fileAllChunksPaths 注意这里需要排序一下,不然就是乱的,这也是我们创建可写流 srart 位置的基础
 * - 4. 然后这里通过 pipeStream 函数用 Promise 包装了一下可读流,代码需要慢慢读去理解。
 * - 5. 我们这里的 可写流们,是根据 chunks 的不同,定义好写入的文件 path,
 * - 以及每个块儿写的开始位置和写入大小,每个可写流都是不一样的!
 * -
 * @param {*} param0
 */
async function handleMergeChunks({ chunksPath, mergeFilePath, fileName, eachChunkSize }) {
  const fileChunksDir = `${chunksPath}/${fileName}`;
  const fileAllChunksPaths = await fse.readdir(fileChunksDir);
  console.log(fileAllChunksPaths);
  fileAllChunksPaths.sort((a, b) => a.split('_')[1] - b.split('_')[1]);
  const promiseArray = fileAllChunksPaths.map((chunk, index, array) => {
    const eachChunkPath = `${fileChunksDir}/${chunk}`;
    const writeStream = fse.createWriteStream(mergeFilePath, {
      start: index * eachChunkSize,
    });
    return pipeStream(eachChunkPath, writeStream, array.length);
  });
  await Promise.all(promiseArray);
}

module.exports = {
  handlePostData,
  handleMergeChunks,
};

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1283530.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

百望云供应链协同解决方案入选北大创新评论产业研究案例库

11月28日-29日&#xff0c;百望云受邀出席《北大创新评论》2023 Inno China 中国产业创新大会&#xff0c;从战略构建、生态塑造、科技创新等议题出发&#xff0c;与学术专家、产业专家、企业代表共赴盛会&#xff0c;思享汇聚。会上&#xff0c;《北大创新评论产业研究案例库&…

提升--21---JMM(Java内存模型)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 JMM--Java Memory ModelJMM 定义JMM规则&#xff1a;线程间通信的步骤&#xff1a; JMM的三大特性&#xff1a;原子性&#xff08;Atomicity&#xff09;可见性&…

uniapp是否可以用vant等移动端UI库、使用步骤以及需要注意的问题

文章目录 使用vant步骤使用中遇到的问题在浏览器中的运行效果综上&#xff0c;不建议uniapp项目使用vant。 使用vant步骤 首先vant可以兼容uniapp&#xff0c;直接用vant版就好。微信小程序专用版本是&#xff1a;vant-weapp。 基本使用步骤&#xff1a; 1、安装 # 安装 Va…

Django创建基本的app应用并配置URL路径-成功运行服务

开发环境&#xff1a;Pycharm2021 Win11 首先创建虚拟环境: 可参考&#xff1a; Pycharm开发环境下创建python运行的虚拟环境&#xff08;自动执行安装依赖包&#xff09;_pycharm自动下载依赖包_heda3的博客-CSDN博客 1、安装 Django 在虚拟环境下安装pip install django …

任意密码重置+CRRF

一、XSS漏洞 在商城的搜索处&#xff0c;输入标准语句的传参直接就可以弹窗 二、逻辑漏洞-用户枚举 在用户注册界面&#xff0c;点击发送验证码&#xff0c;然后用BURP发包 更改手机号传参&#xff0c;这里手机号传参没有进行加密&#xff0c;直接用手机号的位置进行爆破 正确的…

根文件系统lib库添加与初步测试

一. 简介 我们在编译 busybox源码时&#xff0c;选择的是动态编译&#xff0c;所以&#xff0c;制作生成的 根文件系统中/bin或 /sbin目录下软件运行时会调用到一些库文件的。库文件就是交叉编译器的库文件。 前面我们编译 busybox源码时&#xff0c;选择动态编译&#xff0…

JavaEE之多线程编程(一):基础篇

文章目录 一、关于操作系统一、认识进程 process二、认识线程三、进程和线程的区别&#xff08;重点&#xff01;&#xff09;四、Java的线程和操作系统线程的关系五、第一个多线程编程 一、关于操作系统 【操作系统】 驱动程序&#xff1a; 如&#xff1a;我们知道JDBC的驱动程…

【接口测试】POST请求提交数据的三种方式及Postman实现

1. 什么是POST请求&#xff1f; POST请求是HTPP协议中一种常用的请求方法&#xff0c;它的使用场景是向客户端向服务器提交数据&#xff0c;比如登录、注册、添加等场景。另一种常用的请求方法是GET&#xff0c;它的使用场景是向服务器获取数据。 2. POST请求提交数据的常见编…

笔记-模拟角频率和数字角频率的关系理解

先建议阅读前人此文&#xff08;点击这里&#xff09;&#xff0c;有助于理解。 模拟频率&#xff1a;f 模拟角频率&#xff1a;Ω 数字角频率&#xff1a;ω 其中&#xff1a;在模拟信号中Ω 2πf 正弦波表示&#xff1a;sin(2πft) sin(Ωt) 数字信号就是离散的&#xff…

Facebook推广工具功能科普!

随着社交媒体的普及&#xff0c;Facebook已经成为全球使用最广泛的社交平台之一&#xff0c;对于广大营销人员来说&#xff0c;利用Facebook推广工具进行营销已经成为不可或缺的一部分。 那么&#xff0c;这些推广工具到底有哪些功能呢?本文将为您揭秘Facebook推广工具的强大…

Pytest 使用及调用方法

使用python -m pytest调用pytest 2.0版本新增 你可以在命令行中通过Python编译器来调用Pytest执行测试: python -m pytest [...] 通过python调用会将当前目录也添加到sys.path中,除此之外,这几乎等同于命令行直接调用pytest [...]。 可能出现的执行退出code 执行pytest可能…

公众号50个数量怎么操作?

一般可以申请多少个公众号&#xff1f;公众号申请限额在过去几年内的经历了很多变化。对公众号申请限额进行调整是出于多种原因&#xff0c;确保公众号内容的质量和合规性。企业公众号的申请数量从50个到5个最后到2个&#xff0c;对于新媒体公司来说&#xff0c;这导致做不了公…

移动端APP自动化测试框架-UiAutomator2基础

很早以前&#xff0c;我用uiautomatorjava实践过Android APP自动化测试&#xff0c;不过今天要提的不是uiautomator&#xff0c;而是uiautomator2。听起来uiautomator2像是uiautomator的升级版&#xff0c;但是这两款框架仅仅是名字上比较相似&#xff0c;实际上没有任何关联。…

蓝桥杯物联网竞赛_STM32L071_10_温度传感器扩展模块

原理图&#xff1a; 温度传感器原理图&#xff1a; 其中芯片可以通过SCL和SDA引脚通过I2C通信向温度传感器指定地址获取温度的模拟量 再利用公式将模拟量转换成相应温度即可 实验板接口原理图&#xff1a; 模拟量转相应温度公式&#xff1a; CubMx配置&#xff1a; Keil配置&…

深度学习算法:探索人工智能的前沿

目录 引言 第一部分&#xff1a;深度学习的基础 1.1 什么是深度学习&#xff1f; 1.2 神经网络的演化 第二部分&#xff1a;深度学习的关键技术 2.1 卷积神经网络&#xff08;CNN&#xff09; 2.2 循环神经网络&#xff08;RNN&#xff09; 2.3 长短时记忆网络&#xf…

Vue3集成ThreeJS实现3D效果,threejs+Vite+Vue3+TypeScript 实战课程【一篇文章精通系列】

Vue3集成ThreeJS实现3D效果&#xff0c;threejsViteVue3TypeScript 实战课程【一篇文章精通系列】 项目简介一、项目初始化1、添加一些依赖项 二、创建3D【基础搭建】1、绘制板子&#xff0c;立方体&#xff0c;球体2、材质和光照3、材质和光照和动画4、性能监控5、交互控制6、…

Liunx系统使用超详细(三)

本篇内容开始逐渐描述有关liunx的各种命令的使用方法&#xff01; 目录 一、目录和文件区别 1.1目录&#xff1a; 1.2文件&#xff1a; 1.3总结&#xff1a; 二、Linux命令的写法 三、linux命令清屏 四、pwd命令 五、ls命令 5.1 ls&#xff1a; 5.2 ls -l&#xff1a…

【C++】异常处理 ⑧ ( 标准异常类 | 标准异常类继承结构 | 常用的标准异常类 | 自定义异常类继承 std::exception 基类 )

文章目录 一、抛出 / 捕获 多个类型异常对象1、标准异常类2、标准异常类继承结构3、常用的标准异常类 二、自定义异常类继承 std::exception 基类1、自定义异常类继承 std::exception 基类2、完整代码示例 - 自定义异常类继承 std::exception 基类 一、抛出 / 捕获 多个类型异常…

茄子科技张韶全:跨多云大数据平台DataCake在OceanBase的实践

11 月 16 日&#xff0c;OceanBase 在北京顺利举办 2023 年度发布会&#xff0c;正式宣布&#xff1a;将持续践行“一体化”产品战略&#xff0c;为关键业务负载打造一体化数据库。其中&#xff0c;在“数字化转型升级实践专场”&#xff0c;我们有幸邀请到了茄子科技大数据技术…

经验分享|MySQL分区实战(RANGE)

概述 分区概述 在 MySQL 中&#xff0c; InnoDB存储引擎长期以来一直支持表空间的概念。在 MySQL 8.0 中&#xff0c;同一个分区表的所有分区必须使用相同的存储引擎。但是&#xff0c;也可以为同一 MySQL 服务器甚至同一数据库中的不同分区表使用不同的存储引擎。 通俗地讲…