Netty进阶
1. 粘包与半包
1.1 粘包现象
//client端分10次每次发送16字节数据
public void channelActive(ChannelHandlerContext ctx) {
for (int i = 0; i < 10; i++) {
ByteBuf buf = ctx.alloc().buffer(16);
buf.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buf);
}
}
在服务端输出,可以看到一次就收到了160字节数据,而非10次接收。
1.2 半包现象
//client端一次发送160字节数据
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
}
ctx.writeAndFlush(buffer);
//server端修改接收缓冲区
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10); //影响底层接收缓冲区(滑动窗口)大小,仅决定netty读取最小单位,实际读取为其整数倍
在服务端输出中可看到数据被分为两节,一节20字节,一节140字节
1.3 现象分析
粘包:发送abc def,接收abcdef。原因:
- 应用层:接收方ByteBuf设置太大(Netty默认1024)
- 滑动窗口:假设发送方256bytes表示一个完整报文,但由于接收方处理不及时且窗口大小够大,这256字节数据会缓冲在接收方的滑动窗口中,当滑动窗口缓了多个报文就会粘包
- Nagle算法:会造成粘包
半包:发送abcdef,接收abc def。原因: - 应用层:接收方ByteBuf小于实际发送数据大小
- 滑动窗口:假设接收方的窗口只剩128字节,发送方发送256字节,只能先发送128自己二,等待ack后才能发送剩余部分
- MSS(链路层MTU去掉tcp报头和ip头部分)限制:当发送数据超过MSS限制后,会将数据切分发送
本质都是因为TCP是流式协议,消息无边界
Nagle算法:为了提高网络利用率,发送足够多的数据,如果发送数据少的话,则进行延时发送:SO_SNDBUF达到MSS或含有FIN;TCP_NODELAY=TRUE,收到ACK,超时时发送。除了以上几种情况则延时发送。
MSS限制:不同设备的MTU不同,以太网MTU是1500,FDDI的MTU是4352,本地回环地址的MTU是65535本地不走网卡,
MSS :是最大段长度,它是MTU刨去 tcp和ip 头后剩余能够作为数据传输的字节数,ipv4tcp头占用 20,ip头占用 20,因此以太网 MSS 的值为1500- 40=1460,TCP在传递大量数据时,会按照 MSS 大小将数据进行分割发送,MSS的值在三次握手时通知对方自己 MSS 的值,然后在两者之间选择一个小值作为MSS。
1.4 滑动窗口
TCP以段(Segment)为单位发送一次数据就需要却仍应答ack处理,但是往返时间长性能差,因此引了了窗口概念,窗口大小决定了无需等待应答而可以继续发送数据最大值。
滑动窗口起到一个缓冲区的作用,也能进行流量控制。窗口内的数据才允许发送,当应答未到达前,窗口必须停止滑动,接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收。
1.5 解决方案(短连接,定长数据,分隔符,预设长度)
- 短链接,发一个包建立一次连接,这样连接建立到连接断开之间就是消息的边界,缺点效率太低
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
// 发完即关
ctx.close();
}
//调整netty的接收缓冲区
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));
- 每一条消息采用固定长度,缺点浪费空间
//让所有数据包长度固定,服务端加入
ch.pipeline().addLast(new FixedLengthFrameDecoder(8));
//客户端什么时候 flush 都可以
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
// 发送内容随机的数据包
Random r = new Random();
char c = 'a';
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
byte[] bytes = new byte[8];
for (int j = 0; j < r.nextInt(8); j++) {
bytes[j] = (byte) c;
}
c++;
buffer.writeBytes(bytes);
}
ctx.writeAndFlush(buffer);
}
缺点:长度定的太大,浪费,长度定的太小,对某些数据包又显得不够
- 每一条消息采用分隔符,例如 \n,缺点需要转义
//服务端加入,默认以 \n 或 \r\n 作为分隔符,如果超出指定长度仍未出现分隔符,则抛出异常
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
//客户端发送+\n
public static StringBuilder makeString(char c, int len) {
StringBuilder sb = new StringBuilder(len + 2);
for (int i = 0; i < len; i++) {
sb.append(c);
}
sb.append("\n");
return sb;
}
ByteBuf buf = ctx.alloc().buffer();
char c = '0';
Random r = new Random();
for (int i = 0; i < 10; i++) {
StringBuilder sb = makeString(c, r.nextInt(256) + 1);
c++;
buf.writeBytes(sb.toString().getBytes());
}
ctx.writeAndFlush(buf);
缺点,处理字符数据比较合适,但如果内容本身包含了分隔符(字节数据常常会有此情况),那么就会解析错误
- 每一条消息分为 head 和 body,head 中包含 body 的长度
//在发送消息前,先约定用定长字节表示接下来数据的长度
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 1, 0, 1));//最大长度,长度偏移,长度占用字节,长度调整,剥离字节数
2. 协议设计与解析
2.1 协议
TCP/IP 中消息传输基于流的方式,没有边界。协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则。
2.2 redis 协议举例
//模拟客户端给本机redis发送set name aric命令
public static void main(String[] args) {
final byte[] LINE = {13,10};
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("set".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$4".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("name".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$4".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("aric".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
buf.toString(Charset.defaultCharset());
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}
}
2.3 http 协议举例
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) throws Exception {
log.debug(httpRequest.getUri());
//返回响应
DefaultFullHttpResponse response = new DefaultFullHttpResponse(httpRequest.protocolVersion(),HttpResponseStatus.OK);
byte[] bytes = "<h1>hello,world!</h1>".getBytes();
response.headers().setInt(CONTENT_LENGTH, bytes.length)
response.content().writeBytes(bytes);
//写回响应
ctx.writeAndFlush(response);
}
});
2.4 自定义协议要素
- 魔数:先判断是否无效数据包
- 版本号:可支持协议的升级
- 序列化算法:消息正文采用哪种序列化方式如:json,protobuf,hessian,jdk
- 指令类型:登录,注册,单聊,群聊。。。跟业务相关
- 请求序号:为了双工通信,提供异步能力
- 正文长度
- 消息正文
public class MessageCodec extends ByteToMessageCodec<Message> {
//编码:出站前把msg编码成ByteBuf
@Override
protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) throws Exception {
//1. 魔数4字节
out.writeBytes(new byte[]{1, 2, 3, 4});
//2. 版本号1字节
out.writeByte(1);
//3. 字节序列化算法 jdk 0, json 1
out.writeByte(0);
//4. 指令类型1字节
out.writeByte(message.getMessageType());
//5. 请求序号4字节
out.writeInt(message.getSequenceId());
out.writeByte(0xff); //用于对齐字节
//6. 获取内容字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(message);
byte[] bytes = bos.toByteArray();
//7. 长度
out.writeInt(bytes.length);
//8。 写入内容
out.writeBytes(bytes);
}
//解码:把ByteBuf转化为msg
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
byte length = in.readByte();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
if (serializerType == 0) {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
out.add(message);
}
}
}
什么时候可以加@Sharable
- 当handler不保存状态时,就可以安全地在多线程下被共享
- 对于编解码器类,不能继承ByteToMessageCodec或CombinedChannelDuplexHandler父类,他们的构造方法对@Sharable有限制。
- 如果能确保编解码器不会保存状态,可以继承MessageToMessageCodec父类。
@Slf4j
@ChannelHandler.Sharable
/**
* 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {}
}
3. 聊天室案例
3.1 聊天室业务介绍
netty实现聊天室,可以登录,单聊,创建群聊,群聊,加群,退群,退出功能。
3.2 聊天室业务-登录
见ch.pipeline().addLast(LOGIN_HANDLER);方法。
3.3 聊天室业务-单聊
服务器端将 handler 独立出来。ch.pipeline().addLast(CHAT_HANDLER);
3.4 聊天室业务-群聊
ch.pipeline().addLast(GROUP_CREATE_HANDLER); //创建群聊
ch.pipeline().addLast(GROUP_JOIN_HANDLER); //加入群聊
ch.pipeline().addLast(GROUP_MEMBER_HANDLER); //查看群成员
ch.pipeline().addLast(GROUP_QUIT_HANDLER); //退出群聊
ch.pipeline().addLast(GROUP_CHAT_HANDLER); /群聊消息
3.5 聊天室业务-退出
ch.pipeline().addLast(QUIT_HANDLER);
3.6 聊天室业务-空闲检测
连接假死
原因:
- 网络故障,底层TCP断开连接,应用程序没有感知到,仍占用资源
- 公网网络不稳,丢包。客户端和服务端都都不到数据
- 应用程序线程阻塞,无法进行数据读写
问题: - 假死连接占用资源不能自动释放
- 向假死连接发送数据,得到反馈为发送超时
netty服务器端解决
空闲状态检测器:每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死。
//空闲状态检测器,5s没收客户端消息,会触发IdleState#READER_IDLE事件
ch.pipeline().addLast(new IdleStateHandler(5,0,0));
//ChannelDuplexHandler可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
//用来触发特殊事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) { //触发5s读写空闲事件,断开
ctx.channel().close();
}
super.userEventTriggered(ctx, evt);
}
});
客户端定时心跳
客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器
4. 代码
https://gitee.com/xuyu294636185/netty-demo.git