Netty(三)NIO-进阶

news2025/1/10 10:35:39

Netty进阶

1. 粘包与半包

1.1 粘包现象

//client端分10次每次发送16字节数据
public void channelActive(ChannelHandlerContext ctx) {
	for (int i = 0; i < 10; i++) {
		ByteBuf buf = ctx.alloc().buffer(16);
		buf.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
		ctx.writeAndFlush(buf);
	}
}

在服务端输出,可以看到一次就收到了160字节数据,而非10次接收。
粘包现象

1.2 半包现象

//client端一次发送160字节数据
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
    buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
}
ctx.writeAndFlush(buffer);
//server端修改接收缓冲区
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);  //影响底层接收缓冲区(滑动窗口)大小,仅决定netty读取最小单位,实际读取为其整数倍

在服务端输出中可看到数据被分为两节,一节20字节,一节140字节
半包现象

1.3 现象分析

粘包:发送abc def,接收abcdef。原因:

  1. 应用层:接收方ByteBuf设置太大(Netty默认1024)
  2. 滑动窗口:假设发送方256bytes表示一个完整报文,但由于接收方处理不及时且窗口大小够大,这256字节数据会缓冲在接收方的滑动窗口中,当滑动窗口缓了多个报文就会粘包
  3. Nagle算法:会造成粘包
    半包:发送abcdef,接收abc def。原因:
  4. 应用层:接收方ByteBuf小于实际发送数据大小
  5. 滑动窗口:假设接收方的窗口只剩128字节,发送方发送256字节,只能先发送128自己二,等待ack后才能发送剩余部分
  6. MSS(链路层MTU去掉tcp报头和ip头部分)限制:当发送数据超过MSS限制后,会将数据切分发送

本质都是因为TCP是流式协议,消息无边界
Nagle算法:为了提高网络利用率,发送足够多的数据,如果发送数据少的话,则进行延时发送:SO_SNDBUF达到MSS或含有FIN;TCP_NODELAY=TRUE,收到ACK,超时时发送。除了以上几种情况则延时发送。
MSS限制:不同设备的MTU不同,以太网MTU是1500,FDDI的MTU是4352,本地回环地址的MTU是65535本地不走网卡,
MSS :是最大段长度,它是MTU刨去 tcp和ip 头后剩余能够作为数据传输的字节数,ipv4tcp头占用 20,ip头占用 20,因此以太网 MSS 的值为1500- 40=1460,TCP在传递大量数据时,会按照 MSS 大小将数据进行分割发送,MSS的值在三次握手时通知对方自己 MSS 的值,然后在两者之间选择一个小值作为MSS。

1.4 滑动窗口

TCP以段(Segment)为单位发送一次数据就需要却仍应答ack处理,但是往返时间长性能差,因此引了了窗口概念,窗口大小决定了无需等待应答而可以继续发送数据最大值。
滑动窗口示意
滑动窗口起到一个缓冲区的作用,也能进行流量控制。窗口内的数据才允许发送,当应答未到达前,窗口必须停止滑动,接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收。

1.5 解决方案(短连接,定长数据,分隔符,预设长度)

  1. 短链接,发一个包建立一次连接,这样连接建立到连接断开之间就是消息的边界,缺点效率太低
public void channelActive(ChannelHandlerContext ctx) throws Exception {
	log.debug("sending...");
	ByteBuf buffer = ctx.alloc().buffer();
	buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
	ctx.writeAndFlush(buffer);
	// 发完即关
	ctx.close();
}
//调整netty的接收缓冲区
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));
  1. 每一条消息采用固定长度,缺点浪费空间
//让所有数据包长度固定,服务端加入
ch.pipeline().addLast(new FixedLengthFrameDecoder(8));
//客户端什么时候 flush 都可以
public void channelActive(ChannelHandlerContext ctx) throws Exception {
	log.debug("sending...");
	// 发送内容随机的数据包
	Random r = new Random();
	char c = 'a';
	ByteBuf buffer = ctx.alloc().buffer();
	for (int i = 0; i < 10; i++) {
		byte[] bytes = new byte[8];
		for (int j = 0; j < r.nextInt(8); j++) {
			bytes[j] = (byte) c;
		}
		c++;
		buffer.writeBytes(bytes);
	}
	ctx.writeAndFlush(buffer);
}

缺点:长度定的太大,浪费,长度定的太小,对某些数据包又显得不够

  1. 每一条消息采用分隔符,例如 \n,缺点需要转义
//服务端加入,默认以 \n 或 \r\n 作为分隔符,如果超出指定长度仍未出现分隔符,则抛出异常
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
//客户端发送+\n
    public static StringBuilder makeString(char c, int len) {
        StringBuilder sb = new StringBuilder(len + 2);
        for (int i = 0; i < len; i++) {
            sb.append(c);
        }
        sb.append("\n");
        return sb;
    }
    ByteBuf buf = ctx.alloc().buffer();
 	char c = '0';
	Random r = new Random();
	for (int i = 0; i < 10; i++) {
		StringBuilder sb = makeString(c, r.nextInt(256) + 1);
		c++;
		buf.writeBytes(sb.toString().getBytes());
	}
	ctx.writeAndFlush(buf);

缺点,处理字符数据比较合适,但如果内容本身包含了分隔符(字节数据常常会有此情况),那么就会解析错误

  1. 每一条消息分为 head 和 body,head 中包含 body 的长度
//在发送消息前,先约定用定长字节表示接下来数据的长度
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 1, 0, 1));//最大长度,长度偏移,长度占用字节,长度调整,剥离字节数

2. 协议设计与解析

2.1 协议

TCP/IP 中消息传输基于流的方式,没有边界。协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则。

2.2 redis 协议举例

//模拟客户端给本机redis发送set name aric命令
public static void main(String[] args) {
        final byte[] LINE = {13,10};
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LoggingHandler());
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) {
                            ByteBuf buf = ctx.alloc().buffer();
                            buf.writeBytes("*3".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("$3".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("set".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("$4".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("name".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("$4".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("aric".getBytes());
                            buf.writeBytes(LINE);
                            ctx.writeAndFlush(buf);
                        }
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuf buf = (ByteBuf) msg;
                            buf.toString(Charset.defaultCharset());
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

2.3 http 协议举例

ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) throws Exception {
	log.debug(httpRequest.getUri());
	//返回响应
	DefaultFullHttpResponse response = new DefaultFullHttpResponse(httpRequest.protocolVersion(),HttpResponseStatus.OK);
	byte[] bytes = "<h1>hello,world!</h1>".getBytes();
	response.headers().setInt(CONTENT_LENGTH, bytes.length)
	response.content().writeBytes(bytes);
	//写回响应
	ctx.writeAndFlush(response);
	}
});

2.4 自定义协议要素

  • 魔数:先判断是否无效数据包
  • 版本号:可支持协议的升级
  • 序列化算法:消息正文采用哪种序列化方式如:json,protobuf,hessian,jdk
  • 指令类型:登录,注册,单聊,群聊。。。跟业务相关
  • 请求序号:为了双工通信,提供异步能力
  • 正文长度
  • 消息正文
public class MessageCodec extends ByteToMessageCodec<Message> {
    //编码:出站前把msg编码成ByteBuf
    @Override
    protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) throws Exception {
        //1. 魔数4字节
        out.writeBytes(new byte[]{1, 2, 3, 4});
        //2. 版本号1字节
        out.writeByte(1);
        //3. 字节序列化算法 jdk 0, json 1
        out.writeByte(0);
        //4. 指令类型1字节
        out.writeByte(message.getMessageType());
        //5. 请求序号4字节
        out.writeInt(message.getSequenceId());
        out.writeByte(0xff);  //用于对齐字节
        //6. 获取内容字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(message);
        byte[] bytes = bos.toByteArray();
        //7. 长度
        out.writeInt(bytes.length);
        //8。 写入内容
        out.writeBytes(bytes);
    }

    //解码:把ByteBuf转化为msg
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        byte length = in.readByte();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        if (serializerType == 0) {
            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
            Message message = (Message) ois.readObject();
            out.add(message);
        }
    }
}

自定义协议解析

什么时候可以加@Sharable
  • 当handler不保存状态时,就可以安全地在多线程下被共享
  • 对于编解码器类,不能继承ByteToMessageCodec或CombinedChannelDuplexHandler父类,他们的构造方法对@Sharable有限制。
  • 如果能确保编解码器不会保存状态,可以继承MessageToMessageCodec父类。
@Slf4j
@ChannelHandler.Sharable
/**
 * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
 */
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {}
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {}
}

3. 聊天室案例

3.1 聊天室业务介绍

netty实现聊天室,可以登录,单聊,创建群聊,群聊,加群,退群,退出功能。

3.2 聊天室业务-登录

见ch.pipeline().addLast(LOGIN_HANDLER);方法。

3.3 聊天室业务-单聊

服务器端将 handler 独立出来。ch.pipeline().addLast(CHAT_HANDLER);

3.4 聊天室业务-群聊

ch.pipeline().addLast(GROUP_CREATE_HANDLER); //创建群聊
ch.pipeline().addLast(GROUP_JOIN_HANDLER); //加入群聊
ch.pipeline().addLast(GROUP_MEMBER_HANDLER); //查看群成员
ch.pipeline().addLast(GROUP_QUIT_HANDLER); //退出群聊
ch.pipeline().addLast(GROUP_CHAT_HANDLER); /群聊消息

3.5 聊天室业务-退出

ch.pipeline().addLast(QUIT_HANDLER);

3.6 聊天室业务-空闲检测

连接假死

原因:

  • 网络故障,底层TCP断开连接,应用程序没有感知到,仍占用资源
  • 公网网络不稳,丢包。客户端和服务端都都不到数据
  • 应用程序线程阻塞,无法进行数据读写
    问题:
  • 假死连接占用资源不能自动释放
  • 向假死连接发送数据,得到反馈为发送超时
    netty服务器端解决
    空闲状态检测器:每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死。
//空闲状态检测器,5s没收客户端消息,会触发IdleState#READER_IDLE事件
ch.pipeline().addLast(new IdleStateHandler(5,0,0));
//ChannelDuplexHandler可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
	//用来触发特殊事件
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
	IdleStateEvent event = (IdleStateEvent) evt;
	if (event.state() == IdleState.READER_IDLE) {  //触发5s读写空闲事件,断开
		ctx.channel().close();
	}
	super.userEventTriggered(ctx, evt);
	}
});

客户端定时心跳
客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器

4. 代码

https://gitee.com/xuyu294636185/netty-demo.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1050672.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【嵌入式 – GD32开发实战指南(ARM版本)】第2部分 外设篇 - 第1章 温湿度传感器DHT11

1 理论分析 1.1 DHT11介绍 DHT11 数字温湿度传感器是一款含有已校准数字信号输出的温湿度复合传感器。它应用专用的数字模块采集技术和温湿度传感技术,确保产品具有极高的可靠性与卓越的长期稳定性。 DHT11传感器包括一个电阻式感湿元件和一个 NTC 测温元件,并与一个高性能…

2、MQ高级

在昨天的练习作业中&#xff0c;我们改造了余额支付功能&#xff0c;在支付成功后利用RabbitMQ通知交易服务&#xff0c;更新业务订单状态为已支付。 但是大家思考一下&#xff0c;如果这里MQ通知失败&#xff0c;支付服务中支付流水显示支付成功&#xff0c;而交易服务中的订单…

C++核心编程--继承篇

4.6、继承 继承是面向对象三大特征之一 有些类与类之间存在特殊的关系&#xff0c;例如下图中&#xff1a; ​ 我们发现&#xff0c;定义这些类的定义时&#xff0c;都拥有上一级的一些共性&#xff0c;还有一些自己的特性。那么我们遇到重复的东西时&#xff0c;就可以考虑使…

【Java 进阶篇】MySQL多表查询之外连接详解

在MySQL数据库中&#xff0c;多表查询是一种常见且强大的功能&#xff0c;允许您在多个表之间执行联接操作&#xff0c;从而检索、过滤和组合数据。在本篇博客中&#xff0c;我们将深入探讨多表查询的一种类型&#xff0c;即外连接&#xff08;Outer Join&#xff09;&#xff…

ESP8266使用记录(四)

放上最终效果 ESP8266&Unity游戏 整合放进了坏玩具车遥控器里 最终只使用了mpu6050的yaw数据&#xff0c;因为roll值漂移…… 使用了https://github.com/ElectronicCats/mpu6050 整个流程 ESP8266取MPU6050数据&#xff0c;处理后通过udp发送给Unity显示出来 MPU6050_Z…

测试OpenCvSharp库的模板匹配功能

微信公众号“Dotnet讲堂”的文章《c#实现模板匹配&#xff0c;并输出匹配坐标》&#xff08;参考文献1&#xff09;中介绍了采用OpenCVSharp库实现模板匹配功能&#xff0c;也即在目标图片中定位指定图片内容的示例&#xff0c;本文参照参考文献1-4&#xff0c;学习并测试OpenC…

中国312个历史文化名镇及景区空间点位数据集

一部中华史&#xff0c;既是人类创造丰富物质财富的奋头史&#xff0c;又是与自然共生共存的和谐史不仅留存下悠久丰富的人文思想和情怀&#xff0c;还在各处镌刻下可流传的生活场景&#xff0c;历史文化名镇(以下简称:名镇)就是这样真实的历史画卷。“镇”是一方的政治文化中心…

Linux——补充点(页表映射及LWP)

目录 补充点1&#xff1a;进程地址空间堆区管理 补充点2&#xff1a;Linux内核进程上下文切换 补充点3&#xff1a;页表映射 补充点4&#xff1a;两级页表 补充点1&#xff1a;进程地址空间堆区管理 Linux内核通过一个被称为进程描述符的task_struct结构体来管理进程&#…

Linux多线程【线程互斥与同步】

✨个人主页&#xff1a; 北 海 &#x1f389;所属专栏&#xff1a; Linux学习之旅 &#x1f383;操作环境&#xff1a; CentOS 7.6 阿里云远程服务器 文章目录 &#x1f307;前言&#x1f3d9;️正文1、资源共享问题1.1、多线程并发访问1.2、临界区与临界资源1.3、“锁” 概念引…

mongodb Community 7 安装(linux)

链接&#xff1a;mongodb官网 链接&#xff1a;官方安装文档 一、安装 1.安装依赖 apt-get install gnupg curl2.安装public key cd /usr/localcurl -fsSL https://pgp.mongodb.com/server-7.0.asc | gpg -o /usr/share/keyrings/mongodb-server-7.0.gpg --dearmor3.把mon…

什么是Local Storage和Session Storage?它们之间有什么区别?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 什么是 Local Storage 和 Session Storage&#xff1f;Local Storage&#xff08;本地存储&#xff09;Session Storage&#xff08;会话存储&#xff09; ⭐ 区别⭐ 示例⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的…

Flutter笔记:手写一个简单的画板工具

Flutter笔记 手写一个简单的画板工具 作者&#xff1a;李俊才 &#xff08;jcLee95&#xff09;&#xff1a;https://blog.csdn.net/qq_28550263 邮箱 &#xff1a;291148484163.com 本文地址&#xff1a;https://blog.csdn.net/qq_28550263/article/details/133418742 目 录 1…

算法基础之归并排序

一、归并排序的形象理解 原题链接 示例代码 void merge_sort(int q[], int l, int r) {if (l > r) return;int mid l r >> 1;merge_sort(q, l, mid), merge_sort(q, mid 1, r);int k 0, i l, j mid 1;while (i < mid && j < r) //第一处if (q[i]…

基于Spider的全站数据爬取

踩坑 一开始运行的时候会出来很多其他的日志信息&#xff0c;这里我忘了设置settings.py中LOG_LEVELERROR 获取xpath 这里获取xpath比较简单。 首先发现所有的照片文字都是在li标签下的&#xff0c;所以第一步是获取所有的li标签&#xff0c;得到li标签的列表。 li_list re…

【sgTileImage】自定义组件:瓦片图拖拽局部加载、实现以鼠标为中心缩放

特性&#xff1a; 支持缩放瓦片图&#xff0c;定义瓦片图初始缩放比例&#xff0c;以鼠标所在位置为中心缩放支持局部拖拽加载 sgTileImage源码 <template><div :class"$options.name"><div class"sg-ctrl"><label>缩放百分比&l…

电脑怎么用U盘重装系统-电脑用U盘重装Win10系统的步骤

电脑怎么用U盘重装系统&#xff1f;电脑对于当前日常办公生活是特别重要的&#xff0c;但是&#xff0c;随着操作时间的增加&#xff0c;电脑内的操作系统运作可能会变得越来越缓慢了。这时候重装系统成为解决系统问题的有效方法。下面小编给大家介绍利用U盘给电脑重装系统Win1…

Python入门教程48:Pycharm永久镜像源的pip配置方法

国内几个好用的Python镜像服务器地址&#xff1a; 清华大学镜像站&#xff1a;https://pypi.tuna.tsinghua.edu.cn/simple/阿里云镜像站&#xff1a;https://mirrors.aliyun.com/pypi/simple/中科大镜像站&#xff1a;https://pypi.mirrors.ustc.edu.cn/simple/中国科技大学镜…

HTTP 错误 401.3 - Unauthorized 由于 Web 服务器上此资源的访问控制列表(ACL)配置或加密设置,您无权查看此目录或页面。

用IIS 发布网站&#xff0c;不能访问且出现错误&#xff1a;HTTP 错误 401.3 - Unauthorized 由于Web服务器上此资源的访问控制列表(ACL)配置或加密设置。您无权查看此目录或页面 问题截图&#xff1a; 问题描述&#xff1a;HTTP 错误 401.3 - 未经授权&#xff1a;访问由于 A…

anzo capital昂首资本详解MT4和MT5订单执行方式

很多投资者在后台咨询anzo capital昂首资本&#xff0c;MT4和MT5订单执行方式有什么不同&#xff0c;今天一起探讨! MT4平台提供了三种类型的订单执行方式&#xff1a; 第一种是即时执行。当交易者向经纪人发送建立订单的请求时&#xff0c;平台将自动以当前价格录入该订单。…

《YOLOv5:从入门到实战》报错解决 专栏答疑

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。《YOLOv5&#xff1a;从入门到实战》专栏上线后&#xff0c;部分同学在学习过程中提出了一些问题&#xff0c;笔者相信这些问题其他同学也有可能遇到。为了让大家可以更好地学习本专栏内容&#xff0c;笔者特意推出了该篇专…