文章目录
- 前言
- 《多文件云传输》框架概述
- 简介
- 技术实现
- 框架基本思想
- 《多文件云传输》框架思考
- 需求分析
- 《多文件云传输》框架实现
- 数据基础实现
- 技术难点实现
前言
《多文件云传输》框架的实现是本人的一个编程训练项目,为了提升本人的编程能力、JAVA 编程思想,基于框架的角度出发,完成对资源文件传输的实现,对《多文件云传输》框架技术的个人理解和底层实现方法进行阐述。
此《多文件云传输》框架 “不重复造方轮子” ,只是自我的编程训练的项目。
作为信息时代下的信息获取者(包括个人和 APP 层),都会有从网上下载 app、视频文件、音频文件等文件的需求,基于本人对文件传输其中共通之处的思考,为了方便更多人的使用,提高资源文件传输效率,其独立于某个 APP 而产生的框架,其强大之处在于适用于不同的 APP,不被约束,而提出《多文件云传输》这个框架。
《多文件云传输》框架概述
简介
《多文件云传输》框架又称 分布式带有自平衡、断点续传的多文件云传输框架,即是一种在分布式系统中进行多文件云传输的方式,它可以自动调整传输任务的负载,实现传输任务的自平衡,从而提高传输效率和可靠性。
-
多文件传输
即在文件传输过程中,同时传输多个文件的操作。
-
所谓的“云”传输
“云”传输是指资源请求者向资源拥有者请求资源文件结束后,转变成为资源拥有者。
- 多个资源拥有者可以向资源请求者发送同一资源;
- 多个资源拥有者可以向资源请求者发送同一资源的不同文件;
- 多个资源拥有者可以向资源请求者发送同一资源的同一文件的不同片段;
-
自平衡(负载均衡)
负载均衡是指将流量分配到多个服务器或计算机上,以达到均衡负载、提高系统可用性和性能的目的。
- 大文件的分片传输,将同一文件分割成大小相同的文件片段进行传输。
- 设置资源发送方的健康值,避免一个资源发送方同时参与过多资源发送,负载过重的问题。
- 基于默认的或 APP 自己实现的节点选择策略,判断是否允许某资源发送方参加本次发送任务,从而实现负载均衡。
-
断点续传
- 断点续传是指在文件传输过程中,当传输中断或失败时,可以从中断或失败的位置继续传输文件,而不需要重新开始传输。
- 将文件分成小片段,记录已传输和未传输的片段,重新传输时调取未传输的片段继续传输。
断点续传可以提高文件传输的可靠性和效率,特别是在大文件传输和不稳定的网络环境下,其作用更为明显。
技术实现
《多文件云传输》框架涵盖了许多技术问题:
- 基于个人开发的《资源发现》框架
- 带有目录树结构的资源的目录结构的复制
- 单个文件或多个文件的传输
- 资源分配策略,基于网络的大文件的分片传输
- 未接受文件片段信息的计算
- 多个发送方向一个接收方发送不同文件片段
- 可实现负载均衡
- 可实现断点续传
- 多线程安全问题(单例模式应用)
等等技术问题……
框架基本思想
多文件云传输,基于分布式系统中进行多文件云传输的方式,自动调整负载均衡,实现传输过程的自平衡,最终完成资源文件的传输过程实现。
《多文件云传输》框架思考
需求分析
所谓“多文件云传输”,假设最开始的原始文件资源只有一个资源最初拥有者的总服务器(APP服务器)拥有该资源,各个用户向资源总服务器发出请求,通过网络来获取自己想要的资源,可如果这个资源文件非常火爆,请求的用户很多的话,这时候我们的资源最初拥有者的服务器压力就会很大,服务器就会出现故障。所以假如每一个向资源最初拥有者的服务器发出请求获取到该资源后,自己也成为一个新的拥有该资源的服务器,长此以往,可作为发送节点的节点个数也逐渐增多,资源最初拥有者的服务器的压力就会减小,甚至不参加未来资源的发送。
基于这个多文件传输的方式,我们大概明确了,多文件云传输实现中必然存在资源发现的过程,而所谓的资源发现也基于服务发现(服务发现架构中有主要的三者组成:注册中心、服务提供者、服务消费者)
基于《服务发现》,我们的资源发现也就有的雏形,也就有了核心的三大角色:资源注册中心,资源请求者,资源拥有者
资源发现的具体内容请见: HB个人博客:模拟实现 《资源发现》框架
当一个资源请求者客户端请求资源时,会从注册中心得到拥有该资源的所有网络节点地址列表,该资源请求者会选出当前压力最小的K个发送者,对他们请求这个资源的不同部分,并将有下线或异常掉线的资源节点地址汇报给资源注册中心,资源注册中心将该资源地址注销,最终资源请求者得到他所请求的资源,并在得到该资源后,在资源注册中心注册资源,成为新的资源拥有者。
所以我们《多文件云传输》所希望实现的功能也就如下:
-
实现文件“云”传输:当资源请求者请求到相关资源后,变成为资源拥有者,并参与后续资源的发送。
-
多个资源发送者向同一资源请求者发送同一资源。
-
多个资源发送者向同一资源请求者发送同一资源的不同文件。
-
多个资源发送者向同一资源请求者发送同一文件的不同片段(文件较大)。
-
将文件分割成片段是为了负载均衡,让多个资源发送者发送量相近;
-
当资源发送者数量较多时,该发送者将不再参与这个资源的发送。
-
在特殊情况下:
1、例如资源发送端向请求端发送资源时,它还没发送完就下线了,最终请求端将接收不到完整的资源,我们希望接收端能清楚自己的哪些文件没有收到,并重新向其他拥有该资源的在线节点进行进行请求。
2、请求端接收到一半,电脑本地机宕机了,我们希望他重新开机后,能从断点处继续下载,而不是重头下载。需要将文件分成小片段,记录已传输和未传输的片段,重新传输时调取未传输的片段继续传输。
-
资源接受者在成为某个资源的发送者后,其依然可以是其他资源的资源请求者。
《多文件云传输》框架实现
数据基础实现
- 文件信息
public class FileInfo {
private int fileId;
private String fileName;
private long fileLength;
FileInfo() {
}
……一系列getter、setter、toString、hashCode、equals方法
}
- 文件片段头信息
经过试验得出,文件片段传输过程可能会出错,向byte数组中读取数据流length长度的数据会出问题,于是用一下的方式解决
public static byte[] receive(InputStream dis, int length) throws IOException {
byte[] receiveData = new byte[length];
int offset = 0;
int len;
while (length > 0) {
len = dis.read(receiveData, offset, length);
offset += len;
length -= len;
}
return receiveData;
}
1、为了保证文件片段传输完整性,需要知道文件片段的长度(length)
2、需要知道片段所属的文件,就是文件句柄(fileId)
3、需要知道片段在文件中的位置,就是偏移量(offset)
4、保证片段传输的准确性:校验和(sum)
/**
* 功能:
* 1. 文件片段头的功能是为了描述一个文件片段属于哪一个文件,在此文件中的偏移量是多少,以及该片段内容的长度。
* 2. 由于网络间的传输是以byte为单位的,所以我们需要提供将文件片段头变换为byte[]类型的方法,当然我们也提供了提供反变换的方法。
*/
public class FileSection {
public static final int FILE_SECTION_HEAD_LEN = 24;
public static final int EOF = -1;
public static final int ACCEPT_ERROR = -2;
private int fileId;
private long offset;
private long length;
private int sum;
public FileSection() {
}
……一系列getter、setter、toString、hashCode、equals方法
}
技术难点实现
- 文件控制块池——极大加快了程序运行速度
以前的处理是在片段传输中存在大量的文件开关操作,极大影响代码执行时间,因而引进文件控制块池子,避免了因为文件传输频繁打开和关闭文件,加快程序运行速度。
/**
* 资源文件的随机访问控制指针键集。<br>
* 作用:相当于RandomAccessFile池,以避免对同一个文件的频繁打开和关闭,以提高
* 文件访问效率。
* 该类既用于资源发送者在发送文件片段前,读指定文件的指定片段;
* 也用于资源接受者在接收文件片段后,写入指定文件的指定片段。
* 但是,这个类只关心:提供RandomAccessFile,不关心具体的文件读写操作。
* 另外,本类提供关闭指定文件RandomAccessFile的操作。
* 本类中的成员:SourceFileList sourceFileList是指定资源的原始文件列表。
*/
public class RandomAccessFileMap {
public static final int READ_ONLY = 1;
public static final int READ_WRITE = 2;
private SourceFileInfoList sourceFileInfoList;
/**
* 以文件编号为键,以文件随机访问控制块为值。
*/
private Map<Integer, RandomAccessFile> rafPool;
private int mode;
RandomAccessFileMap(SourceFileInfoList sourceFileInfoList) {
this.sourceFileInfoList = sourceFileInfoList;
this.rafPool = new HashMap<Integer, RandomAccessFile>();
}
void setMode(int mode) {
this.mode = mode;
}
/**
* 多线程安全的、单例的,获取fileId对应的文件的RandomAccessFile对象。
* @param fileId
* @return
* @throws FileNotFoundException
*/
RandomAccessFile getRandomAccessFile(int fileId) throws FileNotFoundException {
RandomAccessFile raf = this.rafPool.get(fileId);
if (raf == null) {
synchronized (this.rafPool) {
raf = this.rafPool.get(fileId);
if (raf == null) {
raf = openFile(fileId);
this.rafPool.put(fileId, raf);
}
}
}
return raf;
}
private RandomAccessFile openFile(int fileId) throws FileNotFoundException {
String filePath = this.sourceFileInfoList.getFileName(fileId);
String mode = this.mode == RandomAccessFileMap.READ_ONLY ? "r" : "rw";
return new RandomAccessFile(filePath, mode);
}
/**
* 关闭RandomAccessFile后,只是将Map的值改为null,并没有从Map中删除这个键值对!
* @param fileId
* @throws IOException
*/
void closeRandomAccessFile(int fileId) throws IOException {
synchronized (this.rafPool) {
RandomAccessFile randomAccessFile = this.rafPool.get(fileId);
if (randomAccessFile != null) {
randomAccessFile.close();
randomAccessFile = null;
}
}
}
}
- 带有目录树结构的资源的目录结构的复制
对于资源,存在“源资源绝对根”和“目标绝对根”两个概念,需要将源资源绝对根下的目录结构,复制到目标绝对根下。
public void sourceRequesting(SourceFileInfoList sourceFileList, JFrame owner) {
SourcePool sourcePool = SourcePool.getInstance();
sourcePool.addSourceFileList(sourceFileList);
this.requestFileCount = 0;
this.sourceFileList = sourceFileList;
List<FileInfo> orgFileList = sourceFileList.getFileInfoList();
//获取资源文件的源资源绝对根;
String absoluteRoot = sourceFileList.getAbsoluteRoot();
for (FileInfo fileInfo : orgFileList) {
int fileId = fileInfo.getFileId();
long fileSize = fileInfo.getFileLength();
String filePath = absoluteRoot + fileInfo.getFileName();
//将源资源绝对根下的目录结构,复制到目标绝对根下
File file = new File(filePath);
if (!file.exists() || file.length() != fileSize) {
FileSection fileSection = new FileSection();
fileSection.setFileId(fileId);
fileSection.setOffset(0);
fileSection.setLength(fileSize);
UnreceivedSection unreceivedSection = new UnreceivedSection(fileSection);
this.unreceivedSectionPool.put(fileId, unreceivedSection);
this.requestFileSectionPool.addFileSection(fileSection);
++this.requestFileCount;
}
}
- 基于网络的大文件的分片传输
我们收到了资源的信息列表,可以知道每一个文件的文件编号、文件大小
我们根据文件的大小进行分片,设置一个固定的文件片段长度,DEFAULT_SECTION_LENGTH
,它的长度为1 << 15(32768 kb)= 32MB,然后我们将每一个文件片段都分片为 32MB 大小的文件片段进行传输。如果该文件的大小小于默认大小,不用进行分片,如果大于默认大小就要进行分片。
public class RequestFileSectionPool {
public static final int DEFAULT_SECTION_LENGTH = 1 << 15;
private int maxSectionLength;
private List<FileSection> requestSections;
RequestFileSectionPool() {
this.maxSectionLength = DEFAULT_SECTION_LENGTH;
this.requestSections = new LinkedList<FileSection>();
}
…………
void addFileSection(FileSection fileSection) {
int fileId = fileSection.getFileId();
long restLength = fileSection.getLength();
long orgOffset = fileSection.getOffset();
long offset = 0;
while (restLength > 0) {
long len = restLength > this.maxSectionLength ? this.maxSectionLength : restLength;
FileSection section = new FileSection();
section.setFileId(fileId);
section.setOffset(orgOffset + offset);
section.setLength(len);
this.requestSections.add(section);
offset += len;
restLength -= len;
}
}
}
- 未接受文件片段信息的计算
在接收文件片段信息时,得到的文件头部信息包括:- 文件编号(fileId);
- 片段偏移量(offset);
- 片段长度(length);
- 校验和(sum)
未接收文件片段信息计算基本思路:
首先对于每一个将要接收的文件,存在文件编号、文件长度两个基本信息,可以由上述基本信息构建一个“未接收文件片段信息列表”,这个列表初始只有一个元素:偏移量(0)、长度(文件长度)
以后,每完成接收一段文件片段内容,就将所接收到的片段信息,从上述信息中“删除”,其实是:根据当前所接收到的片段信息,将未接收片段分成两部分:左侧和右侧。
数据推导过程:
假设文件长度为12个单位(也许11.3个单位),写成如下图:
_ _ _ _ _ _ _ _ _ _ _ _
0 1 2 3 4 5 6 7 8 9 0 1
1
又假设,当前接收到的片段信息为:
接收片段:
4 : 3
而初始未接收片段信息是:
0 : 12
可以按照4 : 3将0 : 12分成左右两部分:
_ _ _ _ x x x _ _ _ _ _
0 1 2 3 4 5 6 7 8 9 0 1
1
0 : 4
7 : 5
再假设,现在又接收到片段:
1 : 3
_ y y y x x x _ _ _ _ _
0 1 2 3 4 5 6 7 8 9 0 1
1
分成的未接收片段信息是:
0 : 1
7 : 5
再假设现在又接收到:
8 : 3
_ y y y x x x _ z z z _
0 1 2 3 4 5 6 7 8 9 0 1
1
未接收片段信息是:
0 : 1
7 : 1
11 : 1
上面的处理过程是“视觉”过程,需要找出“数学”过程!
class SegmentInfo {
offset
length
}
class UnreceivedSegment {
id
List<SegmentInfo> segmentList;
}
UnreceivedSegment对象unreceivedSegment初值为:
id : X
List<?> segmentList只有一个元素:
os.offset : os.len
0 : 12
当前接收到的片段信息是:rs
rs.offset : rs.len
3 : 3
_ _ _ x x x _ _ _ _ _ _
0 1 2 3 4 5 6 7 8 9 0 1
1
os.leftOffset : os.leftLen
os.offset : rs.offset - os.offset (0 : 3)
os.rightOffset : os.rightLen
rs.offset + rs.len : os.offset + os.len - (rs.offset + rs.len) (6 : 6)
当前接收到的片段信息是:rs
rs.offset : rs.len
8 : 3
当前未接收文件片段如下:
0 : 3
6 : 6
_ _ _ x x x _ _ y y y _
0 1 2 3 4 5 6 7 8 9 0 1
1
第一步应该确定对哪个未接收片段进行分解(计算):
第一项:0 : 3不符合要求;
第二项:6 : 6符合分解要求!
那么,符合分解要求的条件是:
rs.offset + rs.len <= os.offet + os.len
os.leftOffset : os.leftLen
os.offset : rs.offset - os.offset (6 : 2)
os.rightOffset : os.rightLen
rs.offset + rs.len : os.offset + os.len - (rs.offset + rs.len) (11 : 1)
当前接收到的片段信息是:rs
rs.offset : rs.len
3 : 3
_ _ _ x x x _ _ _ _ _ _
0 1 2 3 4 5 6 7 8 9 0 1
1
所形成的未接收片段信息:
0 : 3
6 : 6
再接收到的片段信息:
rs.offset : rs.len
6 : 3
_ _ _ x x x x x x _ _ _
0 1 2 3 4 5 6 7 8 9 0 1
1
os.leftOffset : os.leftLen
os.offset : rs.offset - os.offset (6 : 0)
os.rightOffset : os.rightLen
rs.offset + rs.len : os.offset + os.len - (rs.offset + rs.len) (9 : 3)
当前未接收片段信息应该是:
0 : 3
9 : 3
未接收文件片段信息逻辑分析:
1、定位需要分解的片段;(按上述条件进行定位,且,第一个满足条件的片段就是应该被分解的片段)
2、删除这个片段;
3、生成右片段,若右片段长度不为0,则,插入到当前位置;
4、生成左片段,若左片段长度不为0,则,插入到当前位置。
/**
*未接收文件片段信息计算基本思路:首先对于每一个将要接收的文件,是存在如下基本信息的:
*文件编号;
*文件长度。
*可以有上述基本信息构建一个“未接收文件片段信息列表”,这个列表初始只有一个元素:
*偏移量:0
*长度:文件长度
*
*以后,每完成接收一段文件片段内容,就将所接收到的片段信息,从上述信息中删除,其实是:
*根据当前所接收到的片段信息,将未接收片段分成两部分:左侧和右侧。
*计算公式:
*os.leftOffset : os.leftLen
*os.offset : rs.offset - os.offset (6 : 0)
*
*os.rightOffset : os.rightLen
*rs.offset + rs.len : os.offset + os.len - (rs.offset + rs.len) (9 : 3)
*
*这样,在多线程安全处理后,对于每一个需要接收的文件来说,未接收文件片段信息总是准确的
*/
void receiveSection(FileSection section) {
synchronized (this.sectionList) {
//获取section在列表中哪个部分的下标
int index = locationSection(section);
if (index == -1) {
throw new RuntimeException("片段出错:("
+ section.getFileId() + " : "
+ section.getOffset() + ":"
+ section.getLength() + ")");
}
FileSection os = this.sectionList.remove(index);
//根据剩余右侧的长度判断右片段是否存在
int len = (int) (os.getOffset() + os.getLength() - section.getOffset() - section.getLength());
if (len > 0) {
FileSection right = new FileSection();
right.setOffset(section.getOffset() + section.getLength());
right.setLength(len);
this.sectionList.add(index, right);
}
//根据剩余左侧的长度判断左片段是否存在
len = (int) (section.getOffset() - os.getOffset());
if (len > 0) {
FileSection left = new FileSection();
left.setOffset(os.getOffset());
left.setLength((int) (section.getOffset() - os.getOffset()));
this.sectionList.add(index, left);
}
}
}
//定位section属于列表中哪个片段
private int locationSection(FileSection section) {
int index;
for (index = 0; index < this.sectionList.size(); index++) {
FileSection os = this.sectionList.get(index);
if (section.getOffset() + section.getLength() <= os.getOffset() + os.getLength()) {
return index;
}
}
return -1;
}
UnReceiveSection也为我们的断点续传实现提供了基础准备。
- 断点续传实现
如果特殊情况下:
1、例如资源发送端向请求端发送资源时,它还没发送完就下线了,最终请求端将接收不到完整的资源,我们希望接收端能清楚自己的哪些文件没有收到,并重新向其他拥有该资源的在线节点进行进行请求。
2、请求端接收到一半,电脑本地机宕机了,我们希望他重新开机后,能从断点处继续下载,而不是重头下载。
所以博主就提出了断点续传的基础 UnReceiveSection 类
用从资源拥有者处得到的资源信息列表,与本地机文件比对,收集本次应该接收的文件列表;
@Override
public void run() {
try {
Source source = sourceFileList.getSource();
this.dos.writeUTF(ArgumentMaker.gson.toJson(source));
FileSection sectionHead = SourceReceiver.this.requestFileSectionPool.pop();
while (!FileSection.isEof(sectionHead)) {
FileSection.sendFileSectionHead(this.dos, sectionHead);
FileSection fileSection = FileSection.receiveFileSectionHead(this.dis);
int fileId = fileSection.getFileId();
long offset = fileSection.getOffset();
int length = (int) fileSection.getLength();
int orgSum = fileSection.getSum();
byte[] context = MecIo.receive(this.dis, length);
int sum = TypeValue.getSum(context);
if (sum != orgSum) {
SourceReceiver.this.requestFileSectionPool.push(sectionHead);
// System.out.println("本次片段【" + fileSection + "】接收有问题,需要重新接收");
} else {
RandomAccessFile file = SourceReceiver.this.randomAccessFileMap.getRandomAccessFile(fileId);
MecIo.write(file, offset, context);
UnreceivedSection unreceivedSection = SourceReceiver.this.unreceivedSectionPool.get(fileId);
synchronized (unreceivedSection) {
unreceivedSection.receiveSection(fileSection);
if (unreceivedSection.isReceived()) {
SourceReceiver.this.randomAccessFileMap.closeRandomAccessFile(fileId);
if (SourceReceiver.this.fileReceiveProgress != null) {
// SourceReceiver.this.fileReceiveProgress.
}
}
if (SourceReceiver.this.fileReceiveProgress != null) {
SourceReceiver.this.fileReceiveProgress.receivedFileSection(fileId, length);
if (unreceivedSection.isReceived()) {
SourceReceiver.this.fileReceiveProgress.receivedOver(fileId);
}
}
}
}
sectionHead = SourceReceiver.this.requestFileSectionPool.pop();
}
FileSection.sendFileSectionHead(this.dos, sectionHead);
} catch (IOException e) {
}
}
- 多个发送方向一个接收方发送不同文件片段
@Override
public void afterShowModalDialog() {
new Thread(new Runnable() {
@Override
public void run() {
if (fileReceiveProgress != null) {
fileReceiveProgress.getRequestFileCount(requestFileCount);
}
Source source = sourceFileList.getSource();
List<Node> sendAddressList = sourceRequester.getAddresses(source, maxSenderCount);
if (fileReceiveProgress != null) {
fileReceiveProgress.getSenderCount(sendAddressList.size());
}
for (Node senderAddress : sendAddressList) {
String ip = senderAddress.getIp();
int port = senderAddress.getPort();
try {
Socket sender = new Socket(ip, port);
new SourceFileReceiver(sender);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}).start();
}
- 负载均衡实现(健康值)
获取资源文件发送方的健康值,基于默认的或 APP 自己实现的节点选择策略,判断是否允许参加本次发送任务,从而实现负载均衡。
在初始化的时候,获取本地健康值,若健康值大于最大允许量(非常不健康),则,拒绝本次发送任务; 并关闭本次连接。
synchronized (address) {
int health = this.address.getHealth();
if (health == SendServer.DEATH_HEALTH || health >= this.maxSendingCount) {
refuseSending();
close();
return;
}
1、健康值允许参加本次发送任务:
/**
* 1、获取本地健康值;
* 2、对健康值增1;
* 3、向自愿请求方(资源文件接收方)发送本地健康值;
* 4、向资源注册中心汇报新的健康值;
* 5、启动发送线程。
* @throws IOException
*/
private void sending() throws IOException {
synchronized (this.address) {
int health = this.address.getHealth();
sendHealth(++health);
this.sendServerSourceRegistryCenterAction.reportHealth(health);
}
new Thread(this).start();
}
2、健康值不允许本次发送任务:
/**
* 设置本地健康值为DEATH_HEALTH;
* 向资源请求方(资源文件接收方)发送本地健康值,并向资源注册中心报告健康值。
* 此时的健康值为最差值!
* @throws IOException
*/
private void refuseSending() throws IOException {
this.address.setHealth(SendServer.DEATH_HEALTH);
sendHealth(this.address.getHealth());
this.sendServerSourceRegistryCenterAction.reportHealth(SendServer.DEATH_HEALTH);
}
private void sendHealth(int health) throws IOException {
byte[] byteHealth = TypeValue.intTobytes(health);
MecIo.send(this.dos, byteHealth);
}
- 多线程安全问题(单例模式应用)
- 单例模式
private volatile static SourcePool me;
static SourcePool getInstance() {
if (SourcePool.me == null) {
synchronized (SourcePool.class) {
if (SourcePool.me == null) {
SourcePool.me = new SourcePool();
}
}
}
return SourcePool.me;
}
- 与APP无关
本框架可以根据需求在不同的APP上实现不同的功能,如:
1、选择资源绝对根获取其下面的所有文件进行注册
2、选择资源绝对跟和其下的某个文件进行单一注册
3、进度条的实现:根据资源数量获取文件传输的进度并显示在进度条中