Netty基本介绍,参考 Netty与网络编程
1.1 什么是粘包拆包
例如:发送 ABC, DEF两个报文
-
收到ABCDEF一个报文,发生了粘包
-
收到AB,C,DEF三个报文,ABC发生了拆包
-
收到AB,CD,EF三个报文,即发生了拆包又发生了粘包
1.2 看一个粘包半包样例
- 客户端每次把消息“ABC,DEF,GHI,JKL,MNO\n" 发生一百次给服务端
- 服务端将每次收到的消息输出,并记录收到的次数,然后将消息返回客户端
我们看下面服务端输出的结果:
- 服务端一共接收到两次消息,说明消息被合并了,发生了粘包
- 第一次输出的消息最后一行只有"ABC,",这说明发生了拆包,一个完整的消息被拆分了才会出现这种情况
输出结果:
Server Accept[ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,] and the counter is:1
Server Accept[DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
] and the counter is:2
代码:
EchoClient.java
public class EchoClient {
private final int port;
private final String host;
public EchoClient(int port, String host) {
this.port = port;
this.host = host;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
try {
final Bootstrap b = new Bootstrap();;/*客户端启动必须*/
b.group(group)/*将线程组传入*/
.channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/
.remoteAddress(new InetSocketAddress(host,port))/*配置要连接服务器的ip地址和端口*/
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient(9999,"127.0.0.1").start();
}
}
EchoClientHandler.java
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private AtomicInteger counter = new AtomicInteger(0);
/*** 客户端读取到网络数据后的处理*/
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
+"] and the counter is:"+counter.incrementAndGet());
}
/*** 客户端被通知channel活跃后,做事*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg = null;
String request = "ABC,DEF,GHI,JKL,MNO"
+ System.getProperty("line.separator");
//发送100次
for(int i=0;i<100;i++){
msg = Unpooled.buffer(request.length());
msg.writeBytes(request.getBytes());
ctx.writeAndFlush(msg);
}
}
/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
EchoServer.java
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws InterruptedException {
EchoServer echoServer = new EchoServer(9999);
System.out.println("服务器即将启动");
echoServer.start();
System.out.println("服务器关闭");
}
public void start() throws InterruptedException {
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
try {
ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
b.group(group)/*将线程组传入*/
.channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
.localAddress(new InetSocketAddress(port))/*指定服务器监听端口*/
/*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
所以下面这段代码的作用就是为这个子channel增加handle*/
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);/*添加到该子channel的pipeline的尾部*/
}
});
ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
System.out.println("服务器启动完成,等待客户端的连接和数据.....");
f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
} finally {
group.shutdownGracefully().sync();/*优雅关闭线程组*/
}
}
}
EchoServerHandler.java
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0);
/*** 服务端读取到网络数据后的处理*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
String request = in.toString(CharsetUtil.UTF_8);
System.out.println("Server Accept["+request
+"] and the counter is:"+counter.incrementAndGet());
String resp = "Hello,"+request+". Welcome to Netty World!"
+ System.getProperty("line.separator");
ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
}
/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
1.3 解决方法
TCP粘包/半包发生的原因:TCP是流式协议,消息无边界,消息发送的节点为了加快转发速度,会等到缓冲区满再发送,那么他可能会对消息拆包或者合并包满足缓冲区再发送
解决问题的方法也很简单:找出消息边界,确定了消息的边界,我们就能找到完整的消息
方式 | 确定消息边界 | 优点 | 缺点 | 推荐 |
---|---|---|---|---|
TCP连接改成短连接 | 每个链接发送一个消息 | 简单 | 效率低下 | 不推荐 |
固定长度 | 按长度取消息 | 简单 | 空间浪费(消息太短需要填充) | 不推荐 |
分隔符 | 分隔符之间就是一个消息 | 空间不浪费 | 分隔符需要转义 | 推荐 |
消息头和消息体 | 根据消息头确定一个消息 | 精确且不用转义 | 实现相对复杂 | 推荐 |
下面我们依次看一下上面几种方法如何实现:
1.4 短连接(不推荐)
每一次发送都创建一个新的连接,就可以避免粘包和拆包问题,但是这种方式效率低下,TCP三次握手四次挥手非常消耗性能,不推荐。
1.5 固定长度方式实现
客户端(服务端同样处理)
实现ChannelInitializer类,添加下面两行代码new FixedLengthFrameDecoder(FixedLengthEchoServer.RESPONSE.length())表示固定长度消息
将其作为handler交给Bootstrap来处理消息
1.6 分割符
1.6.1 换行符作分隔符(客户端(服务端同样处理)):
实现ChannelInitializer类,添加下面两行代码new LineBasedFrameDecoder(1024)表示固定长度消息,
同样需要将ChannelInitializerImp交给Bootstrap来处理消息
1.6.2 自定义分隔符:
实现ChannelInitializer类,添加下面两行代码new DelimiterBasedFrameDecoder(1024, delimiter)表示固定长度消息,
同样需要将ChannelInitializerImp交给Bootstrap来处理消息
1.7 消息头和消息体
自己重新定义一种消息头和消息体,根据消息头来确定消息边界,就可以知道是否发生粘包和拆包
1.8 总结:
Netty对粘包和拆包的处理进行了封装,开发者使用很方便,比较推荐的实现方式是1.6 和1.7。
- 如果消息中没有什么特殊字符,可以采用1.6的方式,特殊字符当成分割符,实现简单方便。
- 如果消息中不确定有哪些特殊字符,可以采用1.7的方式,但是实现复杂一点,相当于自己定义一个应用层协议。
备注:各种解决方案的代码后续分享会出来