目录
1、什么是断点续传
2、分块文件
3、合并文件
4、 Minio 分布式文件系统整合断点续传
4.1 进行文件分块上传到 Minio
4.2 进行 Minio 中分块文件的合并
5、使用 Minio 进行断点续传的注意事项
相信很多小伙伴在上传下载图片或者视频的时候,突然间(没错就是这么秃然),断电或者断网了,但是等恢复正常后,之前的图片或视频又需要重0开始进行上传或者下载!体验感那可是相当的差劲,这不,断点续传技术孕育而生......
1、什么是断点续传
由来:
传统的文件传输方式通常是一次性地将整个文件进行传输,如果在传输过程中发生中断或失败,需要重新开始传输整个文件,这可能会浪费时间和带宽资源。而通过断点续传的机制,可以在传输过程中记录下已经成功传输的部分,如果传输中断,则可以从中断的位置继续传输,节省时间和资源
定义:
断点续传指的是在下载或上传时,将下载或上传任务(一个文件或一个压缩包)人为的划分为几个部分,每一个部分采用一个线程进行上传或下载,如果碰到网络故障,可以从已经上传或下载的部分开始继续上传下载未完成的部分,而没有必要从头开始上传下载,断点续传可以提高节省操作时间,提高用户体验性
断点续传通常涉及以下几个关键要素:
-
传输状态的保存:需要在传输过程中记录已经成功传输的部分,通常使用文件或数据库来保存传输状态信息
-
传输中断的处理:当传输中断时,需要能够检测到中断事件,并记录下中断的位置信息
-
传输的恢复:在中断后重新开始传输时,根据记录的中断位置信息,可以通过读取文件或数据库中的状态信息,定位到中断的位置,并从该位置开始继续传输
-
数据校验:为确保传输的准确性,通常会使用校验和或哈希算法对传输的数据进行校验,以检测数据的完整性
断点续传大致流程图:
流程解读:
前端上传文件前,先将其分成若干块 >>>>>> 一块一块的上传,上传中断后重新上传,已上传的分块则不用再上传 >>>>>> 各分块上传完成最后在服务端合并文件
2、分块文件
定义:
分块文件(Chunked File)是将一个文件切分为较小的块或片段的文件格式或存储方式。每个块都是源文件的一部分,通过切分和组合这些块,可以还原出完整的原始文件
使用场景:
-
分布式存储:在分布式系统中,将大文件切分为多个块并存储在不同的节点上。这种方式可以提高存储和读取的效率,并允许数据并行处理
-
块存储设备:某些存储设备(如磁盘阵列)将数据组织为固定大小的块,并使用块地址来访问和管理数据
-
数据传输和网络传输:通过将文件切分为多个块,可以分批传输和处理大文件,从而提高数据传输的速度和可靠性
-
容灾备份:将文件切分为多个块,并存储在不同的位置或设备上,以提供容灾备份和恢复能力
代码实现:
首先,我们得指定需要进行分块传输的视频文件以及分块存储的目的地;同时,因为文件需要分块,所以要定义每个分块文件的大小
//1.指定目标源文件
File sourceFile = new File("C:\\Users\\DELL\\Videos\\cy01.mp4");
//2.分块文件的存储路径
String chunkPath = "C:\\Users\\DELL\\Videos\\chunk\\";
//2.1分块文件的大小
int chunkSize = 1024*1024; //兆 MB
//2.2分块文件的个数
int chunkNum = (int) Math.ceil(sourceFile.length()*1.0 / chunkSize); //进行向上取整
一开始,该分块路径下并没有任何的文件
然后,经过上面的定义过后,使用流的方式进行读写文件数据
这里,选择使用 RandomAccessFile类 用于对文件进行随机访问,即可以在文件中以任意顺序读取和写入数据。相比于其他文件读写类,RandomAccessFile 提供了更灵活的访问方式,其具有:非顺序访问、可读写性、文件编辑与更新、大型文件的处理以及记录式文件的处理功能
总之,RandomAccessFile类 的作用是提供对文件的随机访问功能,可以根据需要定位到文件的任意位置进行读取和写入。它适用于需要处理大文件、进行非顺序访问以及在特定位置修改数据等场景
//3.使用流从源文件上读数据,向分块文件中写数据
byte[] bytes = new byte[1024];
//3.1 读数据
RandomAccessFile readInfo = new RandomAccessFile(sourceFile, "r");
for(int i=0;i<chunkNum;i++){
File chunkFile = new File(chunkPath + i);
//3.2写数据
RandomAccessFile writeInfo = new RandomAccessFile(chunkFile, "rw");
int length;
while ((length=readInfo.read(bytes))!=-1){
writeInfo.write(bytes,0,length);
if(chunkFile.length()>=chunkSize){ //若比定义的块大,则结束写入
break;
}
}
writeInfo.close();
}
readInfo.close();
运行结果:
很显然,之前 7+ MB 的文件,现在有序的分成了多块,达到了预想的效果
完整代码:
//1.指定目标源文件
File sourceFile = new File("C:\\Users\\DELL\\Videos\\cy01.mp4");
//2.分块文件的存储路径
String chunkPath = "C:\\Users\\DELL\\Videos\\chunk\\";
//2.1分块文件的大小
int chunkSize = 1024*1024; //兆 MB
//2.2分块文件的个数
int chunkNum = (int) Math.ceil(sourceFile.length()*1.0 / chunkSize); //进行向上取整
//3.使用流从源文件上读数据,向分块文件中写数据
byte[] bytes = new byte[1024];
//3.1 读数据
RandomAccessFile readInfo = new RandomAccessFile(sourceFile, "r");
for(int i=0;i<chunkNum;i++){
File chunkFile = new File(chunkPath + i);
//3.2写数据
RandomAccessFile writeInfo = new RandomAccessFile(chunkFile, "rw");
int length;
while ((length=readInfo.read(bytes))!=-1){
writeInfo.write(bytes,0,length);
if(chunkFile.length()>=chunkSize){ //若比定义的块大,则结束写入
break;
}
}
writeInfo.close();
}
readInfo.close();
3、合并文件
定义:
将多个部分下载的文件块(或分块)组合成一个完整的文件。在文件传输过程中,一般会将大文件分成多个较小的文件块进行传输。当其中一个或多个文件块传输失败或中断时,合并文件的步骤就变得重要
代码实现:
收集文件块:首先,需要从不同的来源(例如服务器或其他传输源)收集已经成功下载的文件块。这些文件块可能以不同的顺序或位置存在
//1.指定需要合并的分块文件
File chunkFile = new File("C:\\Users\\DELL\\Videos\\chunk\\");
//2.源文件
File sourceFile = new File("C:\\Users\\DELL\\Videos\\cy01.mp4");
//3.自定义合并后的文件的名称以及路径
File mergeFile = new File("C:\\Users\\DELL\\Videos\\mergeFile.mp4");
//4.取出所有的分块文件
File[] chunkFiles = chunkFile.listFiles();
确定文件块顺序:根据文件块的序号或其他标识信息,确定文件块的正确顺序。这是因为文件块可能以异步方式传输或以不同的顺序到达
注意:这里需要将分块的文件进行排序,然后按顺序的进行合并;好比源文件是一个完整的积木,现在被打乱了,分成了很多快小积木,如果将中间部分的小积木放在原来的最底下,那么完整的积木指定是拼不成功的;所以,需要按顺序一步步来
//4.1 将数组转成 list 集合
List<File> chunkFileList = Arrays.asList(chunkFiles);
//4.2进行排序
chunkFileList.sort(new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
return Integer.parseInt(o1.getName()) - Integer.parseInt(o2.getName());
}
});
合并文件块:将收集到的文件块按照正确的顺序合并到一个完整的文件中。这涉及将每个文件块的内容按照其在整个文件中的位置进行追加或插入
//5.遍历分块文件,向之前定义的合并文件中进行写数据
RandomAccessFile writeInfo = new RandomAccessFile(mergeFile,"rw");
byte[] bytes = new byte[1024];
chunkFileList.stream()
.forEach(new Consumer<File>() {
@Override
public void accept(File chunkSortFile) {
try {
RandomAccessFile readInfo = new RandomAccessFile(chunkSortFile, "r");
int length;
while ((length= readInfo.read(bytes))!=-1){
writeInfo.write(bytes,0,length);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
writeInfo.close();
检验和校验:在合并文件完成后,通常会进行校验以确保合并后的文件完整性。可以使用校验和或哈希算法(如MD5、SHA1等)对整个文件进行校验,以验证文件的准确性和完整性;这里使用MD5进行校验前后文件
//6.将合并完成后的文件与源文件进行校验
String sourceMD5 = DigestUtils.md5Hex(new FileInputStream(sourceFile));
String mergeMD5 = DigestUtils.md5Hex(new FileInputStream(mergeFile));
if(sourceMD5.equals(mergeMD5)){
System.out.println("文件合并成功!");
}
运行结果:
可见,合并的文件大小与源文件一致,效果达成 o(* ̄▽ ̄*)ブ
完整代码:
//1.指定需要合并的分块文件
File chunkFile = new File("C:\\Users\\DELL\\Videos\\chunk\\");
//2.源文件
File sourceFile = new File("C:\\Users\\DELL\\Videos\\cy01.mp4");
//3.自定义合并后的文件的名称以及路径
File mergeFile = new File("C:\\Users\\DELL\\Videos\\mergeFile.mp4");
//4.取出所有的分块文件
File[] chunkFiles = chunkFile.listFiles();
if(chunkFiles!=null) {
//4.1 将数组转成 list 集合
List<File> chunkFileList = Arrays.asList(chunkFiles);
//4.2进行排序
chunkFileList.sort(new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
return Integer.parseInt(o1.getName()) - Integer.parseInt(o2.getName());
}
});
//5.遍历分块文件,向之前定义的合并文件中进行写数据
RandomAccessFile writeInfo = new RandomAccessFile(mergeFile,"rw");
byte[] bytes = new byte[1024];
chunkFileList.stream()
.forEach(new Consumer<File>() {
@Override
public void accept(File chunkSortFile) {
try {
RandomAccessFile readInfo = new RandomAccessFile(chunkSortFile, "r");
int length;
while ((length= readInfo.read(bytes))!=-1){
writeInfo.write(bytes,0,length);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
writeInfo.close();
//6.将合并完成后的文件与源文件进行校验
String sourceMD5 = DigestUtils.md5Hex(new FileInputStream(sourceFile));
String mergeMD5 = DigestUtils.md5Hex(new FileInputStream(mergeFile));
if(sourceMD5.equals(mergeMD5)){
System.out.println("文件合并成功!");
}
}
4、 Minio 分布式文件系统整合断点续传
大致流程图:
流程解读:
- 前端对文件进行分块 >>>>>> 前端上传分块文件前请求媒资服务检查文件是否存在,如果已经存
- 在则不再上传 >>>>>> 如果分块文件不存在则前端开始上传 >>>>>> 前端请求媒资服务上传分块
- >>>>>> 媒资服务将分块上传至Minio >>>>>> 前端将分块上传完毕请求媒资服务合并分块 >>>>>>
- 媒资服务判断分块上传完成则请求Minio合并文件 >>>>>> 合并完成校验合并后的文件是否完整,
- 如果不完整则删除文件
4.1 进行文件分块上传到 Minio
首先,定义 minio 的地址以及账号和密码
MinioClient minioClient =
MinioClient.builder()
//分布式文件 minio 地址
.endpoint("http://localhost:9000")
//对应的 minio 账号以及密码
.credentials("minioadmin", "minioadmin")
.build();
先是通过传入的文件后缀,以流的方式获取该文件的类型,即 contentType;然后通过 获取本地分块文件的集合,以动态的获取分块的数量;最后,调用 Minio Java SDK 提供的 UploadObjectArgs 方法进行分块文件的上传(这里部分地方就先写死了)
//1.通过扩展名得到媒资类型
ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(".mp4");
String streamValue = MediaType.APPLICATION_OCTET_STREAM_VALUE; //默认的字节流类型
if(extensionMatch!=null){
streamValue = extensionMatch.getMimeType();
}
//1.1获取到本地目录下的分块文件的集合
String chunkFilePath = "C:\\Users\\DELL\\Videos\\chunk\\"; //指定本地分块文件地址
File chunkFile = new File(chunkFilePath);
File[] chunkFileList = chunkFile.listFiles(); //以集合的形式获取分块文件
if(chunkFileList!=null) {
for (int i = 0; i < chunkFileList.length; i++) {
//2.文件相关参数的传递
UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder()
.bucket("mediafiles") //桶的名称
.filename(chunkFilePath + i)
.object("chunk/" + i) //传入 minio 中后的对象名
.contentType(streamValue) //设置媒体文件的类型
.build();
//3.进行上传文件
minioClient.uploadObject(uploadObjectArgs);
System.out.println("上传第" + i + "个分块文件成功!");
}
}
本地文件中:
Minio 中:
运行之后:
4.2 进行 Minio 中分块文件的合并
前提引入:
这里使用 ComposeSourse 类型进行合并,下面是该类通常的使用场景:
媒体合成:如果您正在开发一个多媒体应用程序,
ComposeSource
可以用来表示各种媒体元素的组合,例如音频、视频或图像的集合。您可以将这些元素组合成一个复合媒体源文档合成:如果您正在处理文档或报告生成,
ComposeSource
可以用来表示文本块、表格、图表等多个组成部分。您可以将这些部分组合成一个完整的文档或报告界面元素组合:在用户界面设计中,
ComposeSource
可以用来表示具有不同样式和布局的界面元素,例如按钮、文本字段、标签等。您可以使用ComposeSource
来描述界面的不同组成部分,以便灵活地构建和渲染用户界面数据结构组合:
ComposeSource
也可以表示具有层次结构的数据组合,例如树、图或图表数据。您可以使用ComposeSource
来存储和操作这些复杂的数据结构
首先,创建一个 ComposeSource 类型的集合,将 minio 中 chunk 目录下的分块文件进行封装, 便于之后的统一处理
//1.获取 minio 中分块文件的信息
List<ComposeSource> sources = new ArrayList<>();
//2.将需要合并的文件放入 sources 集合进行统一处理
String chunkFilePath = "C:\\Users\\DELL\\Videos\\chunk\\"; //指定本地分块文件地址
File chunkFile = new File(chunkFilePath);
File[] chunkFileList = chunkFile.listFiles(); //以集合的形式获取分块文件
if(chunkFileList!=null) {
sources = Stream.iterate(0, i -> ++i)
.limit(chunkFileList.length)
.map(i -> ComposeSource.builder()
.bucket("mediafiles")
.object("chunk/" + i)
.build()).collect(Collectors.toList());
}
这里进行合并操作,合并之前,需要自定义 object 合并后的文件名(注意:这里的文件后缀名需 要与源文件一致);然后传入之前被封装好的分块文件的集合 sources ,最后调用 API 进行合并
//3.指定合并后,minio 中的文件(自定义名称或者目录位置)
ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder()
.bucket("mediafiles")
.object("mergeFile.mp4")
.sources(sources) //指定需要进行合并的分块文件
.build();
//4.进行合并文件
minioClient.composeObject(composeObjectArgs);
接下来,就到了校验环节;先使用 Minio Java SDK 提供的 GetObjectArgs 方法,获取到 Minio 中合并之后文件的数据流;然后使用 MD5 分别进行 Minio 合并文件与本地源文件的校验
//5.将 minio 中合并后的文件与本地源文件进行校验
//5.1进行指定分布式文件
GetObjectArgs getObjectArgs = GetObjectArgs.builder()
.bucket("mediafiles")
.object("mergeFile.mp4") //这里是合并后的文件
.build();
//5.2获取 minio 合并文件中的数据流
FilterInputStream minioStream = minioClient.getObject(getObjectArgs);
//6.进行校验
//6.1 minio 合并文件
String minio_MD5 = DigestUtils.md5DigestAsHex(inputStream);
//6.2 源文件
String source_MD5 = DigestUtils.md5DigestAsHex(new FileInputStream(new File("C:\\Users\\DELL\\Videos\\cy02.mp4")));
if(minio_MD5.equals(source_MD5)){
System.out.println("合并文件成功!");
}
inputStream.close();
运行结果:
这里有警告,主要是因为版本的问题,这个问题对合并文件影响不大
以上需要注意的是,有些小伙伴可能会出现以下的错误:
java.lang.IllegalArgumentException: source testbucket/chunk/0: size 1048576 must be greater than 5242880
原因分析:minio合并文件默认分块最小5MB,我们将分块改为5MB再次测试
minio 中合并的文件可正常播放;可见,合并文件成功 o(* ̄▽ ̄*)ブ
完整代码:
//1.获取 minio 中分块文件的信息
List<ComposeSource> sources = new ArrayList<>();
//2.将需要合并的文件放入 sources 集合进行统一处理
String chunkFilePath = "C:\\Users\\DELL\\Videos\\chunk\\"; //指定本地分块文件地址
File chunkFile = new File(chunkFilePath);
File[] chunkFileList = chunkFile.listFiles(); //以集合的形式获取分块文件
if(chunkFileList!=null) {
sources = Stream.iterate(0, i -> ++i)
.limit(chunkFileList.length)
.map(i -> ComposeSource.builder()
.bucket("mediafiles")
.object("chunk/" + i)
.build()).collect(Collectors.toList());
}
//3.指定合并后,minio 中的文件(自定义名称或者目录位置)
ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder()
.bucket("mediafiles")
.object("mergeFile.mp4")
.sources(sources) //指定需要进行合并的分块文件
.build();
//4.进行合并文件
minioClient.composeObject(composeObjectArgs);
//5.将 minio 中合并后的文件与本地源文件进行校验
//5.1进行指定分布式文件
GetObjectArgs getObjectArgs = GetObjectArgs.builder()
.bucket("mediafiles")
.object("mergeFile.mp4") //这里是合并后的文件
.build();
//5.2获取 minio 合并文件中的数据流
FilterInputStream inputStream = minioClient.getObject(getObjectArgs);
//6.进行校验
//6.1 minio 合并文件
String input_MD5 = DigestUtils.md5DigestAsHex(inputStream);
//6.2 源文件
String out_MD5 = DigestUtils.md5DigestAsHex(new FileInputStream(new File("C:\\Users\\DELL\\Videos\\cy02.mp4")));
if(input_MD5.equals(out_MD5)){
System.out.println("合并文件成功!");
}
inputStream.close();
5、使用 Minio 进行断点续传的注意事项
- 分片大小:在进行断点续传时,需要将文件拆分为较小的分片。较小的分片大小有助于在连接中断或错误发生时更轻松地恢复传输。通常建议使用256 KB到1 MB的分片大小
- 分片上传:将文件按照分片进行上传,而不是一次性上传整个文件。可以使用MinIO提供的
putObject
方法来进行分片上传。在上传每个分片时,将分片编号和总分片数作为参数提供给MinIO客户端- 断点续传逻辑:在上传过程中,需要确保在传输失败或连接中断时能够恢复上传。为此,可以记录已成功上传的分片,并在需要时从上次中断的位置继续上传。可以使用标记来追踪已上传的分片编号
- 检查已上传的分片:在恢复上传过程中,首先需要检查已上传的分片,并确定哪些分片需要继续上传。可以使用MinIO提供的
listObjects
方法来列出存储桶中已经上传的分片- 并发上传:为了加快上传速度,可以考虑使用并发上传的方式。使用多个线程或协程同时上传不同的分片可以提高整体上传性能。请确保对并发上传进行适当的调度和同步,以避免竞争条件和数据冲突
- 对象合并:当所有分片都上传完毕后,需要执行分片合并操作将它们合并为完整的对象。可以使用MinIO提供的
composeObject
方法或其他相应的API来执行分片合并操作- 错误处理和重试:上传过程中可能会遇到网络错误、连接中断或其他异常情况。需要合理处理这些错误,并进行重试机制以确保上传的可靠性。设置适当的重试次数和重试间隔,以应对可能的传输问题