Netty进阶
1.黏包半包
1.1.黏包
服务端代码
public class HelloWorldServer {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("server error !", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
客户端代码
public class HelloWorldClient {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 会在连接 channel 成功后,触发active 事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
// 连接建立后,模拟发送数据,每次发送 16个字节 一共发送 10 次
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
// 写入channel
channel.writeAndFlush(buffer);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("client error!");
} finally {
worker.shutdownGracefully();
}
}
}
半包
需要对服务端 和客户端 的代码稍微修改下
// 设置每次接收缓冲区的大小,所以但是客户端每次发送的是16个字节 所以可以模拟半包情况
serverBootstrap.option(ChannelOption.SO_RCVBUF,10);
// 注意 如果不生效的话,建议服务端也设置响应的缓冲区大小
// 设置发送方缓冲区大小
bootstrap.option(ChannelOption.SO_SNDBUF, 10);
1.2.滑动窗口
TCP以一个段(
segment
)为单位,每次发送一个段就需要进行一次确认应答(ACK
),为了保证消息传输过程的稳定性,但是这样做的缺点就是会导致包的往返时间越长,性能就越差。
为了解决这个问题,引入窗口的概念,窗口的大小决定了无需等待应答而可以继续发送数据的最大值
窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用
- 图中深色的部门表示即将要发送的数据,高亮的部分就是窗口
- 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
- 如果1001 - 2000 这个段的数据ACK回来了,窗口就可以向前滑动
- 接收方也会维护一个窗口,只有落在窗口内的数据才允许接收
1.3.黏包半包现象分析
- 黏包
- 现象
- 发送 abc def 接收 abdcef
- 原因
- 应用层:接收方ByteBuf设置太大(Netty默认1024)
- 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但是由于接收方处理不及时,且窗口大小足够大,这256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口缓冲了多个报文就会黏包
- Nagle算法:会造成黏包
- 现象
- 半包
- 现象:发送 abcefg 接收方 abc efg
- 原因
- 应用层:接收方ByteBuf 设置容量大小,小于实际发送的数据量
- 滑动窗口:假设接收方的窗口只剩下了,128byte,发送方的报文大小是 256 byte,这时就会放不下,只能先发送 128 byte数据,然后等待ack确认后,才能发送剩下的部门,这时就造成了半包。
- MSS限制:当发送的数据超过了MSS的限制后,会将数据切割,然后分批发送,就会造成半包
- 为什么在数据传输截断存在数据分割呢?一个
TCP报文的有效数据(净荷数据)
是有大小容量限制的,这个报文有效数据的大小就被称为**MSS(Mixinum Segment Size) 最大报文字段长度**
。具体MSS的值会在三次握手阶段进行协商,但是最大长度不会超过**1460
**个字节
- 为什么在数据传输截断存在数据分割呢?一个
出现黏包半包的主要原因就是 TCP的消息没有边界
1.4.黏包半包解决
1.4.1.短链接(解决黏包)
客户端发送完后立马进行断开
短链接并不能半包问题
短链接虽然能解决黏包问题,但是缺点也是很明显的
- 连接建立开销高,因为需要进行握手等操作。
- 频繁的连接管理会增加服务器负担。
- 可能导致资源浪费,如 TCP 连接的建立和释放。
- 存在网络拥塞风险,特别是在高并发情况下。
- 难以维护状态,增加开发和维护的复杂性。
public class HelloWorldClient {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] args) {
// 短链接发发送
for (int i = 0; i < 10; i++) {
shortLinkedSend();
}
}
/**
* 短链接发送 测试
*/
private static void shortLinkedSend() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
// 设置发送方缓冲区大小
bootstrap.option(ChannelOption.SO_SNDBUF, 10);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 会在连接 channel 成功后,触发active 事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
// 连接建立后,模拟发送数据
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
// 发送数据
ctx.writeAndFlush(buffer);
// 主动断开链接
ctx.channel().close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("client error!");
} finally {
worker.shutdownGracefully();
}
}
}
1.4.2.定长解码器
- 固定长度限制:消息长度必须是固定的,这限制了处理可变长度消息的能力。
- 资源浪费:对于短消息,会浪费网络带宽和系统资源。
- 消息边界问题:无法处理不符合固定长度的消息,可能导致解码器阻塞或消息边界错误。
- 不适用于多种消息类型:无法处理多种长度不同的消息类型。
- 性能影响:对于长消息,可能会影响性能。
客户端代码
public static void main(String[] args) {
fixedLengthDecoder();
}
/**
* 定长解码器 测试
*/
private static void fixedLengthDecoder () {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
// 设置发送方缓冲区大小
bootstrap.option(ChannelOption.SO_SNDBUF, 10);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 会在连接 channel 成功后,触发active 事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
// 连接建立后,模拟发送数据
ByteBuf buffer = ctx.alloc().buffer(16);
for (int i = 0; i < 10; i++) {
String s = "hello," + new Random().nextInt(100000000);
logger.error("send data:{}", s);
buffer.writeBytes(fillString(16, s));
}
// 发送数据
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("client error!");
} finally {
worker.shutdownGracefully();
}
}
/**
* 编写要给方法 给定一个长度,和数值,
* 例如长度 16 数值 abc 剩下的填充*
*/
private static byte[] fillString(int length, String value) {
if (value.length() > length) {
return value.substring(0, length).getBytes();
}
StringBuilder sb = new StringBuilder(value);
for (int i = 0; i < length - value.length(); i++) {
sb.append("*");
}
return sb.toString().getBytes();
}
服务端
服务端的代码没有太大改动
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// 在打印日志前添加了定长解码器
// 添加定长解码器 16 消息长度必须发送方 和 接收方一致
// 注意顺序,必须要先解码,然后才能打印日志
channel.pipeline().addLast(new FixedLengthFrameDecoder(16));
channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
1.4.3.行解码器(分隔符)
\r \r\n
客户端
这里的客户端 代码 和上面一致,我们只针对客户端消息代码进行修改
// 每次发送消息的结尾加上换行符
String s = "hello," + new Random().nextInt(100000000) + "\n";
服务端
用的不多
// 添加行解码器,设置每次接收的数据大小
// 注意顺序,必须要先解码,然后才能打印日志
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
1.4.4.LTC解码器
LengthFieldBasedFrameDecoder
方法的工作原理以及各个参数的含义:
- maxFrameLength(最大帧长度):这个参数指定了一个帧的最大长度。当接收到的帧长度超过这个限制时,解码器会抛出一个异常。设置一个适当的最大帧长度可以防止你的应用程序受到恶意或错误消息的影响。
- lengthFieldOffset(长度字段偏移量):这个参数表示长度字段的偏移量,也就是在接收到的字节流中,长度字段从哪里开始的位置。通常,这个偏移量是相对于字节流的起始位置而言的。
- lengthFieldLength(长度字段长度):这个参数指定了长度字段本身所占用的字节数。在接收到的字节流中,长度字段通常是一个固定长度的整数,用来表示消息的长度。
- lengthAdjustment(长度调整值):在某些情况下,长度字段可能包括了消息头的长度,而不是整个消息的长度。这个参数允许你进行一些调整,以便准确地计算出消息的实际长度。
- initialBytesToStrip(要剥离的初始字节数):在解码器将帧传递给处理器之前,会先从帧中剥离一些字节。通常,这些字节是长度字段本身,因为处理器只需要处理消息的有效负载部分。这个参数告诉解码器要剥离的初始字节数。
假设有一个网络协议,它的消息格式如下:
- 消息长度字段占据前4个字节。
- 长度字段之后是实际的消息内容。
现在假设你收到了一个包含以上格式的字节流。你希望用Netty的LengthFieldBasedFrameDecoder
来解码这个消息。
在这种情况下,你需要设置以下参数:
lengthFieldOffset
: 偏移量为0,因为长度字段从消息的开头开始。lengthFieldLength
: 长度字段本身是4个字节。lengthAdjustment
: 在这种情况下,长度字段表示的是消息内容的长度,不包括长度字段本身,所以这个值是0。initialBytesToStrip
: 需要剥离长度字段本身,也就是4个字节。(因为用4个字节表示了字段的长度)
假设你收到的字节流如下:
[消息长度字段] [消息内容]
[0, 0, 0, 5] [72, 101, 108, 108, 111]
- 长度字段
[0, 0, 0, 5]
表示消息长度为5个字节。 - 后面的5个字节
[72, 101, 108, 108, 111]
则是实际的消息内容,代表着 “Hello”。
LengthFieldBasedFrameDecoder
将会将这个字节流解析成一条消息,其中包含了 “Hello” 这个字符串。
测试
public class TestLengthFiledDecoder {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] args) {
// 创建一个 EmbeddedChannel 并添加一个 LengthFieldBasedFrameDecoder
// 该解码器会根据长度字段的值来解码数据
// EmbeddedChannel 是一个用于测试的 Channel 实现
EmbeddedChannel channel = new EmbeddedChannel(
/*
* maxFrameLength: 最大帧长度
* lengthFieldOffset: 长度字段的偏移量
* lengthFieldLength: 长度字段的长度
* lengthAdjustment: 长度字段的值表示的长度与整个帧的长度之间的差值(如果消息后面再加上一个长度字段,那么这个字段的值就是lengthAdjustment
* sendInfo("Netty",buffer);后面再加上一个长度字段,那么这个字段的值就是lengthAdjustment) 不加会报错
* initialBytesToStrip: 解码后的数据需要跳过的字节数
*/
new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4),
new LoggingHandler(LogLevel.DEBUG)
);
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// 4 个字节内容的长度 实际内容
sendInfo("Hello,World111111111111111111111111111111111", buffer);
sendInfo("Hello", buffer);
sendInfo("Netty",buffer);
// 模拟写入数据
channel.writeInbound(buffer);
}
private static void sendInfo(String s, ByteBuf buffer) {
byte[] bytes = s.getBytes();
// 写入内容 大端模式 写入长度 4 个字节
int length = bytes.length;
buffer.writeInt(length);
buffer.writeBytes(bytes);
}
}