写在前面
在我们使用各种网盘的时候,可以随时的暂停上传,然后继续上传,这其实就是断点续传的功能,本文就看下在netty中如何实现断点续传的功能。
1:核心点介绍
1.1:RandomAccessFile
RandomAccessFile类有一个seek方法,通过该方法可以从文件的指定位置开始读取内容,基于此,我们就可以实现从断点处继续上传
的效果,其实也就是实现断点续传了。
1.1:client和server交互协议的封装
定义如下的类来封装交互协议:
public class FileTransferProtocol {
private Integer transferType; //0请求传输文件、1文件传输指令、2文件传输数据
private Object transferObj; //数据对象;(0)FileDescInfo、(1)FileBurstInstruct、(2)FileBurstData
public Integer getTransferType() {
return transferType;
}
public void setTransferType(Integer transferType) {
this.transferType = transferType;
}
public Object getTransferObj() {
return transferObj;
}
public void setTransferObj(Object transferObj) {
this.transferObj = transferObj;
}
}
其中transferType有如下的值:
1:0请求传输文件
客户端请求开始上传文件,对应的信息封装类是FileDescInfo,描述了要上传的文件的名称大小等信息
2:1文件传输指令
客户端和服务端共同使用,对应的信息封装类是FileBurstInstruct,通过抽象的指令值来标记当前传输处于哪个阶段
3:2文件传输数据
用来封装具体要上传的数据,位置信息等
1.3:protostuff
数据传输的序列化方式采用protostuff,因为其在对象序列化上的性能表现还是比较优秀(序列化的速度以及序列化的大小),并且使用方式也比较简单。
2:正式编码
2.1:server
server main:
package com.dahuyou.netty.transferfile.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
//配置服务端NIO线程组
private EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
private EventLoopGroup childGroup = new NioEventLoopGroup();
private Channel channel;
public ChannelFuture bing(int port) {
ChannelFuture channelFuture = null;
try {
ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class) //非阻塞模式
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new MyChannelInitializer());
channelFuture = b.bind(port).syncUninterruptibly();
this.channel = channelFuture.channel();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != channelFuture && channelFuture.isSuccess()) {
System.out.println("netty server start done. {}");
} else {
System.out.println("netty server start error. {}");
}
}
return channelFuture;
}
public void destroy() {
if (null == channel) return;
channel.close();
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
public Channel getChannel() {
return channel;
}
}
MyChannelInitializer:
package com.dahuyou.netty.transferfile.server;
import com.dahuyou.netty.transferfile.codec.ObjDecoder;
import com.dahuyou.netty.transferfile.codec.ObjEncoder;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) {
//对象传输处理
channel.pipeline().addLast(new ObjDecoder(FileTransferProtocol.class));
channel.pipeline().addLast(new ObjEncoder(FileTransferProtocol.class));
// 在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(new MyServerHandler());
}
}
这里设置了基于protostuff的编解码器,以及消息处理的handler:
package com.dahuyou.netty.transferfile.server;
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.domain.*;
import com.dahuyou.netty.transferfile.util.CacheUtil;
import com.dahuyou.netty.transferfile.util.FileUtil;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
System.out.println("链接报告开始");
/*System.out.println("链接报告信息:有一客户端链接到本服务端。channelId:" + channel.id());
System.out.println("链接报告IP:" + channel.localAddress().getHostString());
System.out.println("链接报告Port:" + channel.localAddress().getPort());
System.out.println("链接报告完毕");*/
}
/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//数据格式验证
if (!(msg instanceof FileTransferProtocol)) return;
FileTransferProtocol fileTransferProtocol = (FileTransferProtocol) msg;
//0传输文件'请求'、1文件传输'指令'、2文件传输'数据'
switch (fileTransferProtocol.getTransferType()) {
case 0:
FileDescInfo fileDescInfo = (FileDescInfo) fileTransferProtocol.getTransferObj();
//断点续传信息,实际应用中需要将断点续传信息保存到数据库中
FileBurstInstruct fileBurstInstructOld = CacheUtil.burstDataMap.get(fileDescInfo.getFileName());
if (null != fileBurstInstructOld) {
if (fileBurstInstructOld.getStatus() == Constants.FileStatus.COMPLETE) {
CacheUtil.burstDataMap.remove(fileDescInfo.getFileName());
}
//传输完成删除断点信息
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件请求[断点续传]。" + JSON.toJSONString(fileBurstInstructOld));
ctx.writeAndFlush(MsgUtil.buildTransferInstruct(fileBurstInstructOld));
return;
}
//发送信息
FileTransferProtocol sendFileTransferProtocol = MsgUtil.buildTransferInstruct(Constants.FileStatus.BEGIN, fileDescInfo.getFileUrl(), 0);
ctx.writeAndFlush(sendFileTransferProtocol);
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件请求。" + JSON.toJSONString(fileDescInfo));
break;
case 2:
FileBurstData fileBurstData = (FileBurstData) fileTransferProtocol.getTransferObj();
FileBurstInstruct fileBurstInstruct = FileUtil.writeFile("E://", fileBurstData);
//保存断点续传信息
CacheUtil.burstDataMap.put(fileBurstData.getFileName(), fileBurstInstruct);
ctx.writeAndFlush(MsgUtil.buildTransferInstruct(fileBurstInstruct));
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件数据。" + JSON.toJSONString(fileBurstData));
//传输完成删除断点信息
if (fileBurstInstruct.getStatus() == Constants.FileStatus.COMPLETE) {
CacheUtil.burstDataMap.remove(fileBurstData.getFileName());
}
break;
default:
break;
}
}
/**
* 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println("异常信息:\r\n" + cause.getMessage());
}
}
主要看方法channelRead,分为如下几种情况:
0:
根据是否是续传返回不同的消息,控制client上传的不同行为
2:
如果是上传文件,则保存文件,完成当前文件内容的上传,并返回续传信息给client,client继续上传
2.2:client
client main:
package com.dahuyou.netty.transferfile.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
//配置服务端NIO线程组
private EventLoopGroup workerGroup = new NioEventLoopGroup();
private Channel channel;
public ChannelFuture connect(String inetHost, int inetPort) {
ChannelFuture channelFuture = null;
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.AUTO_READ, true);
b.handler(new MyChannelInitializer());
channelFuture = b.connect(inetHost, inetPort).syncUninterruptibly();
this.channel = channelFuture.channel();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != channelFuture && channelFuture.isSuccess()) {
System.out.println("netty client start done. {}");
} else {
System.out.println("netty client start error. {}");
}
}
return channelFuture;
}
public void destroy() {
if (null == channel) return;
channel.close();
workerGroup.shutdownGracefully();
}
}
MyChannelInitializer:
package com.dahuyou.netty.transferfile.client;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.codec.ObjDecoder;
import com.dahuyou.netty.transferfile.codec.ObjEncoder;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//对象传输处理
channel.pipeline().addLast(new ObjDecoder(FileTransferProtocol.class));
channel.pipeline().addLast(new ObjEncoder(FileTransferProtocol.class));
// 在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(new MyClientHandler());
}
}
同样设置了protostuff的编解码器,以及消息处理类:
package com.dahuyou.netty.transferfile.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.domain.Constants;
import com.dahuyou.netty.transferfile.domain.FileBurstData;
import com.dahuyou.netty.transferfile.domain.FileBurstInstruct;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import com.dahuyou.netty.transferfile.util.FileUtil;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
System.out.println("链接报告开始");
/*System.out.println("链接报告信息:本客户端链接到服务端。channelId:" + channel.id());
System.out.println("链接报告IP:" + channel.localAddress().getHostString());
System.out.println("链接报告Port:" + channel.localAddress().getPort());
System.out.println("链接报告完毕");*/
}
/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("断开链接" + ctx.channel().localAddress().toString());
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//数据格式验证
if (!(msg instanceof FileTransferProtocol)) return;
FileTransferProtocol fileTransferProtocol = (FileTransferProtocol) msg;
//0传输文件'请求'、1文件传输'指令'、2文件传输'数据'
switch (fileTransferProtocol.getTransferType()) {
case 1:
FileBurstInstruct fileBurstInstruct = (FileBurstInstruct) fileTransferProtocol.getTransferObj();
//Constants.FileStatus {0开始、1中间、2结尾、3完成}
if (Constants.FileStatus.COMPLETE == fileBurstInstruct.getStatus()) {
ctx.flush();
ctx.close();
System.exit(-1);
return;
}
FileBurstData fileBurstData = FileUtil.readFile(fileBurstInstruct.getClientFileUrl(), fileBurstInstruct.getReadPosition());
ctx.writeAndFlush(MsgUtil.buildTransferData(fileBurstData));
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 客户端传输文件信息。 FILE:" + fileBurstData.getFileName() + " SIZE(byte):" + (fileBurstData.getEndPos() - fileBurstData.getBeginPos()));
break;
default:
break;
}
/**模拟传输过程中断,场景测试可以注释掉
*
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " [主动断开链接,模拟断点续传]");
ctx.flush();
ctx.close();
System.exit(-1);*/
}
/**
* 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println("异常信息:\r\n" + cause.getMessage());
}
}
主要看方法channelRead,处理文件传输,根据是首次上传还是续传,从要上传的文件中获取字节码数据写到server,其中,体现续传的代码为FileUtil.readFile:
public class FileUtil {
private static final int READ_BYTE_ONCE = 1024;
public static FileBurstData readFile(String fileUrl, Integer readPosition) throws IOException {
File file = new File(fileUrl);
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");//r: 只读模式 rw:读写模式
// 这里体现了断点续传的续哦!!!
randomAccessFile.seek(readPosition);
}
}
randomAccessFile.seek(readPosition);
这里跳一下子
就体现了断点续传的续哦!!!
2.3:测试
server启动类:
package com.dahuyou.netty.transferfile.test;
import com.dahuyou.netty.transferfile.server.NettyServer;
public class NettyServerTest {
public static void main(String[] args) {
System.out.println("hi netty server");
//启动服务
new NettyServer().bing(7397);
}
}
client启动类:
package com.dahuyou.netty.transferfile.test;
import com.dahuyou.netty.transferfile.client.NettyClient;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import io.netty.channel.ChannelFuture;
import java.io.File;
public class NettyClientTest {
public static void main(String[] args) {
//启动客户端
ChannelFuture channelFuture = new NettyClient().connect("127.0.0.1", 7397);
//文件信息{文件大于1024kb方便测试断点续传}
// File file = new File("C:\\Users\\fuzhengwei1\\Desktop\\测试传输文件.rar");
File file = new File("D:\\xiaofuge_sourcecode\\interview-master\\dahuyou-study-netty\\transferfile\\src\\test\\java\\com\\dahuyou\\netty\\transferfile\\test\\测试传输文件.rar");
FileTransferProtocol fileTransferProtocol = MsgUtil.buildRequestTransferFile(file.getAbsolutePath(), file.getName(), file.length());
//发送信息;FILE:测试传输文件请求传输文件
channelFuture.channel().writeAndFlush(fileTransferProtocol);
}
}
在client中首次启动发送请求上传文件的协议消息,发起文件上传的流程,我们测试的文件大小为1360字节,而首次上传文件的大小为1024字节,如下代码:
public class FileUtil {
private static final int READ_BYTE_ONCE = 1024;
public static FileBurstData readFile(String fileUrl, Integer readPosition) throws IOException {
File file = new File(fileUrl);
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");//r: 只读模式 rw:读写模式
randomAccessFile.seek(readPosition);
// byte[] bytes = new byte[1024 * 100];
byte[] bytes = new byte[READ_BYTE_ONCE];
}
所以第一次上传后文件是打不开的如下:
再次上传后文件就可以正常打开了。
最后看下日志输出:
写在后面
参考文章列表
protostuff序列化方式学习 。
netty编程之使用protostuff作为数据传输载体 。