前言
1:在实际应用中我们经常使用到网盘服务,他们可以高效的上传下载较大文件。那么这些高性能文件传输服务,都需要实现的分片发送、断点续传功能。
2:在Java文件操作中有RandomAccessFile类,他可以支持文件的定位读取和写入,这样就满足了我们对文件分片的最基础需求。
3:Netty服务端启动后,可以向客户端发送文件传输指令;允许接收文件、控制读取位点、记录传输标记、文件接收完成。
4:为了保证传输性能我们采用protostuff二进制流进行传输。
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//对象传输处理
channel.pipeline().addLast(new ObjDecoder(FileTransferProtocol.class));
channel.pipeline().addLast(new ObjEncoder(FileTransferProtocol.class));
// 在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(new MyClientHandler());
}
}
public class MyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
System.out.println("链接报告开始");
System.out.println("链接报告信息:本客户端链接到服务端。channelId:" + channel.id());
System.out.println("链接报告IP:" + channel.localAddress().getHostString());
System.out.println("链接报告Port:" + channel.localAddress().getPort());
System.out.println("链接报告完毕");
}
/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("断开链接" + ctx.channel().localAddress().toString());
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//数据格式验证
if (!(msg instanceof FileTransferProtocol)) return;
FileTransferProtocol fileTransferProtocol = (FileTransferProtocol) msg;
//0传输文件'请求'、1文件传输'指令'、2文件传输'数据'
switch (fileTransferProtocol.getTransferType()) {
case 1:
FileBurstInstruct fileBurstInstruct = (FileBurstInstruct) fileTransferProtocol.getTransferObj();
//Constants.FileStatus {0开始、1中间、2结尾、3完成}
if (Constants.FileStatus.COMPLETE == fileBurstInstruct.getStatus()) {
ctx.flush();
ctx.close();
System.exit(-1);
return;
}
FileBurstData fileBurstData = FileUtil.readFile(fileBurstInstruct.getClientFileUrl(), fileBurstInstruct.getReadPosition());
ctx.writeAndFlush(MsgUtil.buildTransferData(fileBurstData));
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 关注明哥客户端传输文件信息。 FILE:" + fileBurstData.getFileName() + " SIZE(byte):" + (fileBurstData.getEndPos() - fileBurstData.getBeginPos()));
break;
default:
break;
}
/**模拟传输过程中断,场景测试可以注释掉
*
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 关注明哥客户端传输文件信息[主动断开链接,模拟断点续传]");
ctx.flush();
ctx.close();
System.exit(-1);*/
}
/**
* 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println("异常信息:\r\n" + cause.getMessage());
}
}
public class NettyClient {
//配置服务端NIO线程组
private EventLoopGroup workerGroup = new NioEventLoopGroup();
private Channel channel;
public ChannelFuture connect(String inetHost, int inetPort) {
ChannelFuture channelFuture = null;
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.AUTO_READ, true);
b.handler(new MyChannelInitializer());
channelFuture = b.connect(inetHost, inetPort).syncUninterruptibly();
this.channel = channelFuture.channel();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != channelFuture && channelFuture.isSuccess()) {
System.out.println("itstack-demo-netty client start done. {关注明哥,获取源码}");
} else {
System.out.println("itstack-demo-netty client start error. {关注明哥,获取源码}");
}
}
return channelFuture;
}
public void destroy() {
if (null == channel) return;
channel.close();
workerGroup.shutdownGracefully();
}
}
public class FileBurstData {
private String fileUrl; //客户端文件地址
private String fileName; //文件名称
private Integer beginPos; //开始位置
private Integer endPos; //结束位置
private byte[] bytes; //文件字节;再实际应用中可以使用非对称加密,以保证传输信息安全
private Integer status; //Constants.FileStatus {0开始、1中间、2结尾、3完成}
public FileBurstData(){
}
public String getFileUrl() {
return fileUrl;
}
public void setFileUrl(String fileUrl) {
this.fileUrl = fileUrl;
}
public FileBurstData(Integer status){
this.status = status;
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public Integer getBeginPos() {
return beginPos;
}
public void setBeginPos(Integer beginPos) {
this.beginPos = beginPos;
}
public Integer getEndPos() {
return endPos;
}
public void setEndPos(Integer endPos) {
this.endPos = endPos;
}
public byte[] getBytes() {
return bytes;
}
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
}
public class FileBurstInstruct {
private Integer status; //Constants.FileStatus {0开始、1中间、2结尾、3完成}
private String clientFileUrl; //客户端文件URL
private Integer readPosition; //读取位置
public FileBurstInstruct(){}
public FileBurstInstruct(Integer status) {
this.status = status;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public String getClientFileUrl() {
return clientFileUrl;
}
public void setClientFileUrl(String clientFileUrl) {
this.clientFileUrl = clientFileUrl;
}
public Integer getReadPosition() {
return readPosition;
}
public void setReadPosition(Integer readPosition) {
this.readPosition = readPosition;
}
}
public class FileTransferProtocol {
private Integer transferType; //0请求传输文件、1文件传输指令、2文件传输数据
private Object transferObj; //数据对象;(0)FileDescInfo、(1)FileBurstInstruct、(2)FileBurstData
public Integer getTransferType() {
return transferType;
}
public void setTransferType(Integer transferType) {
this.transferType = transferType;
}
public Object getTransferObj() {
return transferObj;
}
public void setTransferObj(Object transferObj) {
this.transferObj = transferObj;
}
}
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) {
//对象传输处理
channel.pipeline().addLast(new ObjDecoder(FileTransferProtocol.class));
channel.pipeline().addLast(new ObjEncoder(FileTransferProtocol.class));
// 在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(new MyServerHandler());
}
}
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
System.out.println("链接报告开始");
System.out.println("链接报告信息:有一客户端链接到本服务端。channelId:" + channel.id());
System.out.println("链接报告IP:" + channel.localAddress().getHostString());
System.out.println("链接报告Port:" + channel.localAddress().getPort());
System.out.println("链接报告完毕");
}
/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//数据格式验证
if (!(msg instanceof FileTransferProtocol)) return;
FileTransferProtocol fileTransferProtocol = (FileTransferProtocol) msg;
//0传输文件'请求'、1文件传输'指令'、2文件传输'数据'
switch (fileTransferProtocol.getTransferType()) {
case 0:
FileDescInfo fileDescInfo = (FileDescInfo) fileTransferProtocol.getTransferObj();
//断点续传信息,实际应用中需要将断点续传信息保存到数据库中
FileBurstInstruct fileBurstInstructOld = CacheUtil.burstDataMap.get(fileDescInfo.getFileName());
if (null != fileBurstInstructOld) {
if (fileBurstInstructOld.getStatus() == Constants.FileStatus.COMPLETE) {
CacheUtil.burstDataMap.remove(fileDescInfo.getFileName());
}
//传输完成删除断点信息
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件请求[断点续传]。" + JSON.toJSONString(fileBurstInstructOld));
ctx.writeAndFlush(MsgUtil.buildTransferInstruct(fileBurstInstructOld));
return;
}
//发送信息
FileTransferProtocol sendFileTransferProtocol = MsgUtil.buildTransferInstruct(Constants.FileStatus.BEGIN, fileDescInfo.getFileUrl(), 0);
ctx.writeAndFlush(sendFileTransferProtocol);
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件请求。" + JSON.toJSONString(fileDescInfo));
break;
case 2:
FileBurstData fileBurstData = (FileBurstData) fileTransferProtocol.getTransferObj();
FileBurstInstruct fileBurstInstruct = FileUtil.writeFile("E://", fileBurstData);
//保存断点续传信息
CacheUtil.burstDataMap.put(fileBurstData.getFileName(), fileBurstInstruct);
ctx.writeAndFlush(MsgUtil.buildTransferInstruct(fileBurstInstruct));
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 关注明哥服务端,接收客户端传输文件数据。" + JSON.toJSONString(fileBurstData));
//传输完成删除断点信息
if (fileBurstInstruct.getStatus() == Constants.FileStatus.COMPLETE) {
CacheUtil.burstDataMap.remove(fileBurstData.getFileName());
}
break;
default:
break;
}
}
/**
* 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println("异常信息:\r\n" + cause.getMessage());
}
}
public class NettyServer {
//配置服务端NIO线程组
private EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
private EventLoopGroup childGroup = new NioEventLoopGroup();
private Channel channel;
public ChannelFuture bing(int port) {
ChannelFuture channelFuture = null;
try {
ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class) //非阻塞模式
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new MyChannelInitializer());
channelFuture = b.bind(port).syncUninterruptibly();
this.channel = channelFuture.channel();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != channelFuture && channelFuture.isSuccess()) {
System.out.println("itstack-demo-netty server start done. {关注明哥,获取源码}");
} else {
System.out.println("itstack-demo-netty server start error. {关注明哥,获取源码}");
}
}
return channelFuture;
}
public void destroy() {
if (null == channel) return;
channel.close();
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
public Channel getChannel() {
return channel;
}
}
public class CacheUtil {
public static Map<String, FileBurstInstruct> burstDataMap = new ConcurrentHashMap<>();
}
public class FileUtil {
public static FileBurstData readFile(String fileUrl, Integer readPosition) throws IOException {
File file = new File(fileUrl);
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");//r: 只读模式 rw:读写模式
randomAccessFile.seek(readPosition);
byte[] bytes = new byte[1024 * 100];
int readSize = randomAccessFile.read(bytes);
if (readSize <= 0) {
randomAccessFile.close();
return new FileBurstData(Constants.FileStatus.COMPLETE);//Constants.FileStatus {0开始、1中间、2结尾、3完成}
}
FileBurstData fileInfo = new FileBurstData();
fileInfo.setFileUrl(fileUrl);
fileInfo.setFileName(file.getName());
fileInfo.setBeginPos(readPosition);
fileInfo.setEndPos(readPosition + readSize);
//不足1024需要拷贝去掉空字节
if (readSize < 1024 * 100) {
byte[] copy = new byte[readSize];
System.arraycopy(bytes, 0, copy, 0, readSize);
fileInfo.setBytes(copy);
fileInfo.setStatus(Constants.FileStatus.END);
} else {
fileInfo.setBytes(bytes);
fileInfo.setStatus(Constants.FileStatus.CENTER);
}
randomAccessFile.close();
return fileInfo;
}
public static FileBurstInstruct writeFile(String baseUrl, FileBurstData fileBurstData) throws IOException {
if (Constants.FileStatus.COMPLETE == fileBurstData.getStatus()) {
return new FileBurstInstruct(Constants.FileStatus.COMPLETE); //Constants.FileStatus {0开始、1中间、2结尾、3完成}
}
File file = new File(baseUrl + "/" + fileBurstData.getFileName());
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");//r: 只读模式 rw:读写模式
randomAccessFile.seek(fileBurstData.getBeginPos()); //移动文件记录指针的位置,
randomAccessFile.write(fileBurstData.getBytes()); //调用了seek(start)方法,是指把文件的记录指针定位到start字节的位置。也就是说程序将从start字节开始写数据
randomAccessFile.close();
if (Constants.FileStatus.END == fileBurstData.getStatus()) {
return new FileBurstInstruct(Constants.FileStatus.COMPLETE); //Constants.FileStatus {0开始、1中间、2结尾、3完成}
}
//文件分片传输指令
FileBurstInstruct fileBurstInstruct = new FileBurstInstruct();
fileBurstInstruct.setStatus(Constants.FileStatus.CENTER); //Constants.FileStatus {0开始、1中间、2结尾、3完成}
fileBurstInstruct.setClientFileUrl(fileBurstData.getFileUrl()); //客户端文件URL
fileBurstInstruct.setReadPosition(fileBurstData.getEndPos() + 1); //读取位置
return fileBurstInstruct;
}
}
public class MsgUtil {
/**
* 构建对象;请求传输文件(客户端)
*
* @param fileUrl 客户端文件地址
* @param fileName 文件名称
* @param fileSize 文件大小
* @return 传输协议
*/
public static FileTransferProtocol buildRequestTransferFile(String fileUrl, String fileName, Long fileSize) {
FileDescInfo fileDescInfo = new FileDescInfo();
fileDescInfo.setFileUrl(fileUrl);
fileDescInfo.setFileName(fileName);
fileDescInfo.setFileSize(fileSize);
FileTransferProtocol fileTransferProtocol = new FileTransferProtocol();
fileTransferProtocol.setTransferType(0);//0请求传输文件、1文件传输指令、2文件传输数据
fileTransferProtocol.setTransferObj(fileDescInfo);
return fileTransferProtocol;
}
/**
* 构建对象;文件传输指令(服务端)
* @param status 0请求传输文件、1文件传输指令、2文件传输数据
* @param clientFileUrl 客户端文件地址
* @param readPosition 读取位置
* @return 传输协议
*/
public static FileTransferProtocol buildTransferInstruct(Integer status, String clientFileUrl, Integer readPosition) {
FileBurstInstruct fileBurstInstruct = new FileBurstInstruct();
fileBurstInstruct.setStatus(status);
fileBurstInstruct.setClientFileUrl(clientFileUrl);
fileBurstInstruct.setReadPosition(readPosition);
FileTransferProtocol fileTransferProtocol = new FileTransferProtocol();
fileTransferProtocol.setTransferType(Constants.TransferType.INSTRUCT); //0传输文件'请求'、1文件传输'指令'、2文件传输'数据'
fileTransferProtocol.setTransferObj(fileBurstInstruct);
return fileTransferProtocol;
}
/**
* 构建对象;文件传输指令(服务端)
*
* @return 传输协议
*/
public static FileTransferProtocol buildTransferInstruct(FileBurstInstruct fileBurstInstruct) {
FileTransferProtocol fileTransferProtocol = new FileTransferProtocol();
fileTransferProtocol.setTransferType(Constants.TransferType.INSTRUCT); //0传输文件'请求'、1文件传输'指令'、2文件传输'数据'
fileTransferProtocol.setTransferObj(fileBurstInstruct);
return fileTransferProtocol;
}
/**
* 构建对象;文件传输数据(客户端)
*
* @return 传输协议
*/
public static FileTransferProtocol buildTransferData(FileBurstData fileBurstData) {
FileTransferProtocol fileTransferProtocol = new FileTransferProtocol();
fileTransferProtocol.setTransferType(Constants.TransferType.DATA); //0传输文件'请求'、1文件传输'指令'、2文件传输'数据'
fileTransferProtocol.setTransferObj(fileBurstData);
return fileTransferProtocol;
}
}
测试结果
启动NettyServerTest
启动NettyClientTest *设置传输文件
传输后的效果
好了到这里就结束了netty之Netty传输文件、分片发送、断点续传的学习,大家一定要跟着动手操作起来。需要的源码的 可si我获取;