Netty通信技术进阶一

news2025/1/12 6:59:17

Netty通信技术进阶

    • 1. 概念
    • 2. 线程同步、异步
    • 3. 其他通信技术对比
    • 4. Netty中的Reactor实现
    • 5. Pipeline 和 Handler
      • 5.1 ChannelHandler 分类
    • 6. 入站事件传播
    • 7.inbound/outbound 加载顺序和执行顺序
    • 8. 出站事件传播
    • 9. Code example
      • 9.1 编写服务端
      • 9.2 编写客户端
    • 10. 核心组件
      • 10.1 Bootstrap
      • 10.2 Channel
      • 10.3 EventLoopGroup 和 EventLoop
        • 10.3.1 eventLoopThreads 是多少?
        • 10.3.2 复用Handler

1. 概念

Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供非阻塞的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

  • 本质:网络应用程序框架
  • 实现:异步、事件驱动
  • 特性:高性能、可维护、快速开发
  • 用途:开发服务器和客户端

2. 线程同步、异步

线程同步、异步是相对的,在请求或执行过程中,如果会阻塞等待,就是同步操作,反之就是异步操作
在这里插入图片描述

3. 其他通信技术对比

  • Apache Mina:和Netty是同一作者,但是推荐Netty,作者认为Netty是针对Mina的重新打造版本,解决了一些问题并提高了扩展性
  • Sun Grizzly:用得少、文档少,更新少
  • Cindy:生命周期不长
  • Tomcat、Jetty:还没有独立出来,另外他们有自己的网络通信层实现,是为了专门针对servelet容器而做的,不具备通用性

4. Netty中的Reactor实现

Netty线程模型是基于Reactor模型实现的,对Reactor三种模式都有非常好的支持,并做了一定的改进,也非常的灵活,一般情况,在服务端会采用主从架构模型
在这里插入图片描述
1)Netty抽象出两组线程池:BossGroup和WorkerGroup,每个线程池中都有EventLoop线程(可以是OIO,NIO,AIO); BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup中的线程专门负责处理连接上的读写,EventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环
2)EventLoop表示一个不断循环的执行事件处理的线程,每个EventLoop都包含一个Selector,用于监听注册在其上的Socket网络连接(Channel)
3)每个BossEventLoop中循环执行以下三个步骤:
3.1)select:轮训注册在其上的ServerSocketChannel的accept事件(OP_ACCEPT事件)
3.2)processSelectedKeys:处理accept事件,与客户端建立连接,生成一个SocketChannel,并将其注册到某个Worker
3.3)runAllTasks:再去以此循环处理任务队列中的其他任务
4)每个WorkerEventLoop中循环执行以下三个步骤:
4.1)select:轮训注册在其上的SocketChannel的read/write事件(OP_READ/OP_WRITE事件)
4.2)processSelectedKeys:在对应的SocketChannel上处理read/write事件
4.3)runAllTasks:再去以此循环处理任务队列中的其他任务
5)在以上两个processSelectedKeys步骤中,会使用Pipeline(管道),Pipeline中引用了Channel,即通过Pipeline可以获取到对应的Channel,Pipeline中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)

5. Pipeline 和 Handler

ChannelPipeline 提供了 ChannelHandler 链的容器
在这里插入图片描述
pipeline 中包装的是由ChannelHandlerContext包装的ChannelHandler, 为双向链表, 其中head为netty内置, 无法修改, 我们只需要专注于中间的ChannelHandler, tail不一定存在

5.1 ChannelHandler 分类

对于数据的出站和入站,有着不同的ChannelHandler类型与之对应:

ChannelInboundHandler: 入站事件处理器
ChannelOutBoundHandler: 出站事件处理器
ChannelHandlerAdapter: 提供了一些方法的默认实现,可减少用户对于ChannelHandler的编写
ChannelDuplexHandler: 混合型,既能处理入站事件又能处理出站事件
SimpleChannelInboundHandler: 对ChannelHandlerAdapter的继承(可指定消息泛型)

服务端异步处理数据禁止使用, 因为方法内部读取消息后会自动release掉数据占用的Bytebuffer资源

在这里插入图片描述

6. 入站事件传播

在ChannelInboundHandler中, channelActive中的channelActive方法可将事件向后传递, 另一种写法ctx.fireChannelActive()

/**
 * 通道准备就绪
 * @param ctx
 * @throws Exception
 */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
	// ctx.fireChannelActive();
    super.channelActive(ctx);
}

7.inbound/outbound 加载顺序和执行顺序

InboundHandler是按照Pipleline的加载顺序(addLast),顺序执行
OutboundHandler是按照Pipeline的加载顺序(addLast),逆序执行
如果想让所有的OutboundHandler都能被执行到,可以选择把OutboundHandler放在最后一个有效的InboundHandler之前
有一种做法是通过addFirst加载所有OutboundHandler,再通过addLast加载所有InboundHandler;另外也推荐:使用addLast先加载所有OutboundHandler,然后加载所有InboundHandler(注意考虑加载顺序和执行顺序)
在这里插入图片描述

8. 出站事件传播

在outboundhandler中最好不要再通过Channel写数据,会导致事件再次从尾部流动到头部,造成类似递归问题
可以在事件向前传播出去之后通过ChannelHandlerContext写数据

9. Code example

在这里插入图片描述

9.1 编写服务端

定义NioEventLoopGroup时可在构造方法指定线程数量, 默认构造器的线程数量为cpu核数的2倍
Math.max(1, SystemPropertyUtil.getInt(“io.netty.eventLoopThreads”, NettyRuntime.availableProcessors() * 2))
从源码中可以看出, 也可以在启动时指定io.netty.eventLoopThreads的线程数

public static void main(String[] args) {
    NettyServer server = new NettyServer();
    server.start(9999);
}

private void start(int port) {
    // 定义reactor线程组
    EventLoopGroup boss = new NioEventLoopGroup(1, new DefaultThreadFactory("boss"));
    EventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
    // 业务线程池
    EventExecutorGroup business = new UnorderedThreadPoolEventExecutor(NettyRuntime.availableProcessors() * 2, new DefaultThreadFactory("business"));
    // 基于netty引导整个服务端程序的启动
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(boss, worker)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>() {

                //当客户端 SocketChannel初始化时回调该方法,添加handler
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    // 日志和超时
                    pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                    pipeline.addLast(new ServerReadIdleHandler());
                    // 编码
                    pipeline.addLast(new LengthFieldPrepender(4));
                    pipeline.addLast(new ProtoStuffEncoder());
                    // 解码
                    pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
                    pipeline.addLast("protostuffdecoder", new ProtoStuffDecoder());
                    // 执行业务
                    pipeline.addLast(business, "tcptesthandler", new TcpStickHalfHandler());
                }
            });
    // 绑定端口并启动
    try {
        ChannelFuture future = serverBootstrap.bind(port).sync();
        // 监听端口的关闭 sync阻塞
        future.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        //清理一些资源
        worker.shutdownGracefully();
        boss.shutdownGracefully();
    }

}

自定义ChannelInboundHandler, 继承ChannelInboundHandlerAdapter, 也做一些数据的解码, 业务处理等操作

public class TcpStickHalfHandler extends ChannelInboundHandlerAdapter {

    int count = 0;
	/**
	 * 通道准备就绪
	 * @param ctx
	 * @throws Exception
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
	    super.channelActive(ctx);
	}

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        UserInfo data = (UserInfo) msg;
        count++;
        log.info("服务端收到的第{}个数据:{}", count, data);
        super.channelRead(ctx, msg);
    }
}

自定义ChannelOutboundHandler, 继承ChannelOutboundHandlerAdapter, 需要注意的是ctx.writeAndFlush和ctx.channel().wirte的区别
前者在是此handler往依次往前执行(pipeline双向链表), 后者是最后一个tail往前执行, initChannel中addLast时顺序错误可能会数据错误

public class ServerOutboundHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log.info("ServerOutboundHandler  write ");
        super.write(ctx, msg, promise);
        
        ByteBuf buffer = ctx.alloc().buffer();
        buffer.writeBytes("append".getBytes(StandardCharsets.UTF_8));
        ctx.writeAndFlush(buffer);
    }
    
}

9.2 编写客户端

其中ChannelHandler的编写和服务端的用法一致, 编码解码流程相反

public static void main(String[] args) {
    NettyClient client = new NettyClient();
    client.start("127.0.0.1", 9999);
}

public void start(String host, int port) {
    // 定义线程组,
    EventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    // 超时
                    pipeline.addLast(new ClientWriterIdleHandler());
                    // 编码
                    pipeline.addLast(new LengthFieldPrepender(4));
                    pipeline.addLast(new ProtoStuffEncoder());
                    // 解码
                    pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
                    pipeline.addLast(new ProtoStuffDecoder());
                    // 解码器
                    pipeline.addLast(new ClientInboundHandler());

                }
            });
    //连接服务端
    try {
        ChannelFuture future = bootstrap.connect(host, port).sync();
        future.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        group.shutdownGracefully();
    }
}

10. 核心组件

10.1 Bootstrap

Bootstrap是引导的意思,它的作用是配置整个Netty程序,将各个组件都串起来,最后绑定端口、启动Netty服务
Netty中提供了2种类型的引导类,一种用于客户端(Bootstrap),而另一种(ServerBootstrap)用于服务器,区别在于

1、ServerBootstrap 将绑定到一个端口,因为服务器必须要监听连接,而 Bootstrap 则是由想要连接到远程节点的客户端应用程序所使用的
2、引导一个客户端只需要一个EventLoopGroup,但是一个ServerBootstrap则需要两个
在这里插入图片描述

10.2 Channel

Netty中的Channel是与网络套接字相关的,可以理解为是socket连接,在客户端与服务端连接的时候就会建立一个Channel,它负责基本的IO操作,比如:bind()、connect(),read(),write() 等
不同协议、不同的I/O类型的连接都有不同的 Channel 类型与之对应
主要作用:

  1. 通过Channel可获得当前网络连接的通道状态。
  2. 通过Channel可获得网络连接的配置参数(缓冲区大小等)。
  3. Channel提供异步的网络I/O操作,比如连接的建立、数据的读写、端口的绑定等。

在这里插入图片描述

10.3 EventLoopGroup 和 EventLoop

Netty是基于事件驱动的,比如:连接注册,连接激活;数据读取;异常事件等等,有了事件,就需要一个组件去监控事件的产生和事件的协调处理,这个组件就是EventLoop(事件循环/EventExecutor)
在Netty 中每个Channel 都会被分配到一个 EventLoop。一个 EventLoop 可以服务于多个 Channel。每个EventLoop 会占用一个 Thread,同时这个 Thread 会处理 EventLoop 上面发生的所有 IO 操作和事件。
EventLoopGroup 是用来生成 EventLoop 的,包含了一组EventLoop(可以初步理解成Netty线程池)
在这里插入图片描述

10.3.1 eventLoopThreads 是多少?

核心线程数默认:cpu核数*2, 核心线程数在创建时可通过构造函数指定
对于boss group,我们其实也只用到了其中的一个线程,因为服务端一般只会绑定一个端口启动

10.3.2 复用Handler

每个客户端Channel创建后初始化时,均会向与该Channel绑定的Pipeline中添加handler,此种模式下,每个Channel享有的是各自独立的Handler
如果复用的handler对象不加@Sharable注解会报错, 另外存在线程安全问题, 内部全局变量线程安全问题要自己处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/423875.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

虚拟直播需要哪些设备?如何搭建虚拟直播团队?

虚拟直播不止是新兴的娱乐途径 &#xff0c;还是新的商业模式 。虚拟直播的出现&#xff0c;是互联网娱乐趋势的变化&#xff0c;带来了更加丰富多彩的娱乐形式&#xff0c;同时也优化了传统直播模式下的人力物力成本&#xff0c;使直播行业更加效率及智能。 科技不断发展&…

JDBC(数据库连接)

MYSQL 数据库总结&#xff1a; http://t.csdn.cn/Ka9Vm JDBC是使用Java语言操作关系型数据库的一套API。 将mysql-connector-j-8.0.32jar复制粘贴到一个新建的目录里&#xff0c;然后右键mysql-connector-j-8.0.32jar&#xff0c;添加为库。 DriverManager 一个工厂类&…

2023易派客工业品展览会在苏州开幕

展厅面积达5.3万平方米&#xff0c;500多家重要工业领军企业参展&#xff0c;20组企业签署购销意向协议&#xff0c;签约金额超82亿元 ​ 4月13日&#xff0c;“2023易派客工业品展览会”在苏州国际博览中心开幕。展会以“绿色智造融通赋能”为主题&#xff0c;500多家重要工业…

CART分类树算法

1. CART分类树算法的最优特征选择方法 我们知道&#xff0c;在ID3算法中我们使用了信息增益来选择特征&#xff0c;信息增益大的优先选择。在C4.5算法中&#xff0c;采用了信息增益比来选择特征&#xff0c;以减少信息增益容易选择特征值多的特征的问题。但是无论是ID3还是C4.…

FreeRTOS中临界段的保护(笔记)

目录临界段的定义Cortex-M内核快速关开关中断的指令关中断开中断进入临界段的宏退出临界段的宏进入临界段&#xff0c;不带中断保护&#xff0c; 不能嵌套进入临界段&#xff0c;带中断保护版本&#xff0c;可以嵌套退出临界段&#xff0c;不带中断保护版本&#xff0c;不能嵌套…

【数据结构与算法】堆的实现(附源码)

目录 一.堆的概念及结构 二.接口实现 A.初始化 Heapinit 销毁 Heapdestroy B.插入 Heappush 向上调整 AdjustUp 1.Heappush 2.AdjustUp C.删除 Heappop 向下调整 AdjustDown D.堆的判空 Heapempty 堆顶数据 Heaptop 堆的大小 Heapsize 三.源码 Heap.h He…

Windows通过RDP异地远程桌面Ubuntu【内网穿透】

文章目录前言1. ubuntu安装XRDP2.局域网测试连接3. Ubuntu安装cpolar内网穿透4.cpolar公网地址测试访问5.固定域名公网地址前言 XRDP是一种开源工具&#xff0c;它允许用户通过Windows RDP访问Linux远程桌面。 除了Windows RDP外&#xff0c;xrdp工具还接受来自其他RDP客户端(…

文心一格,百度AI作画产品

文章目录AIGC什么是AI作画&#xff1f;Prompt文心一格使用方法注册账号使用AI绘图AIGC的未来发展结语AIGC AIGC&#xff08;AI Generated Content&#xff09;是指利用人工智能生成内容。是利用人工智能来生成你所需要的内容&#xff0c;GC的意思是创作内容。与之相对应的概念中…

ElasticSearch索引文档写入和近实时搜索

一、基本概念 1.Segments In Lucene 众所周知&#xff0c;ElasticSearch存储的基本单元Shard&#xff0c;ES中一个Index可能分为多个Shard&#xff0c;事实上每个Shard都是一个Lucence的Index&#xff0c;并且每个Lucene Index由多个Segment组成&#xff0c;每个Segment事实上…

【JS运算】分组求和/平均值(reduce函数)

对于数组求和的问题&#xff0c;使用reduce函数能够最快的解决 如果你还不会reduce函数&#xff0c;可以看这一篇&#xff1a; reduce函数的使用 思路 reduce函数对相同group的值进行迭代求和 将分组的总和除以组里的个数得到平均值&#xff0c;然后存储起来 Sum函数&#x…

Linux ubuntu更新meson版本

问题描述 在对项目源码用meson进行编译时&#xff0c;可能出现以下错误 meson.build:1:0: ERROR: Meson version is 0.45.1 but project requires > 0.58.0. 或者 meson_options.txt:1:0: ERROR: Unknown type feature. 等等&#xff0c;原因是meson版本跟设置的不适配。 …

Linux 学习总结(92)—— Linux 高效率使用技巧

1、跳转目录优雅顺滑 1.1 bd 命令 快速回到 Bash 中的特定父目录&#xff0c;而不是多余地键入 cd ../../..。如果在此路径中/home/radia/work/python/tkinter/one/two并且想快速转到目录 python&#xff0c;只需键入: bd python或者仅输入目录的前几个字母&#xff0c;如匹…

锁子甲 bulid+sim

链接: youtube 分析&#xff1a;洒一堆点——copy 模型——点和模型符合一定规律 点和点的距离符合上述图中的关系 &#xff08;横纵&#xff09; 横向 但是我们要横向10个点够了&#xff1a; 用modulo 除余 纵向 这里用除法向上取整 /10 eg &#xff1a; 0-9 得0 10-19 得1…

【逗号你真的懂吗?】C++与JAVA中逗号的区别

文章目录一、先上结论二、C中的逗号逗号运算符和逗号表达式三、JAVA中的逗号四、实战验证情况一&#xff1a;在定义&#xff08;或声明&#xff09;变量时利用逗号CJAVA情况二&#xff1a;在for循环条件中使用逗号CJAVA情况三&#xff1a;在函数形参参数列表中使用逗号CJAVA情况…

WPF_Application

文章目录Application1 Application类1.1 定义1.2 示例1.3 附注2 Application常用属性2.1 Current2.2 Dispatcher3 总结Application 1 Application类 1.1 定义 该类封装了一个WPF应用程序。 该类派生自DispatcherObject&#xff0c;实现了IQueryAmbient接口。 1.2 示例 以…

ICLR Spotlight | 卷积网络上的首个BERT/MAE预训练,ResNet也能用

“删除-再恢复” 形式的自监督预训练可追溯到 2016 年&#xff0c;早于 18 年的 BERT 与 21 年的 MAE。然而在长久的探索中&#xff0c;这种 BERT/MAE 式的预训练算法仍未在卷积模型上成功&#xff08;即大幅超过有监督学习&#xff09;。本篇 ICLR Spotlight 工作 “Designing…

PPP协议相关的知识

这只是我自己在学习时的总结&#xff0c;对于我有用的知识点&#xff0c;希望可以和大家分享&#xff0c;主要学习的文章如下&#xff0c;如有兴趣也可以去了解一下其他作者写的ppp协议的知识点。 PPP协议详解https://blog.csdn.net/m0_49864110/article/details/124987932?o…

Spring Boot 之四:使用Feign实现微服务间的交互

系列目录&#xff08;持续更新。。。&#xff09; Spring Cloud&#xff1a;什么是微服务 Spring Cloud之一&#xff1a;注册与发现-Eureka工程的创建 Spring Cloud之二&#xff1a;服务提供者注册到Eureka Server Spring Cloud之三&#xff1a;Eureka Server添加认证 Spr…

Talk预告 | 清华大学交叉信息研究院助理教授赵行:基于视觉感知的自动驾驶运动预测

本期为TechBeat人工智能社区第481期线上Talk&#xff01; 北京时间3月15日(周三)20:00&#xff0c;清华大学交叉信息研究院助理教授——赵行的Talk将准时在TechBeat人工智能社区开播&#xff01; 他与大家分享的主题是: “基于视觉感知的自动驾驶运动预测”&#xff0c;届时将…

Dish - TS:减轻时间序列预测中分布偏移的一般范式

摘要 时间序列预测中的分布偏移&#xff08;TSF&#xff09;指的是序列分布随时间发生变化&#xff0c;这很大程度上阻碍了TSF模型的性能。现有针对时间序列中分布偏移的研究主要限于分布的量化&#xff0c;更重要的是忽略了回视窗口和预测窗口&#xff08;horizon windows&am…