Netty核心功能以及线程模型

news2024/11/17 16:46:15

目录

Netty核心功能以及线程模型

Netty初探

Netty的使用场景:

Netty通讯示例

Netty线程模型

Netty模块组件


Netty核心功能以及线程模型

Netty初探

NIO 的类库和 API 繁杂, 使用麻烦: 需要熟练掌握Selector、 ServerSocketChannel、 SocketChannel、 ByteBuffer等。

开发工作量和难度都非常大: 例如客户端面临断线重连、 网络闪断、心跳处理、半包读写、 网络拥塞和异常流的处理等等。

Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,解决了上述问题。且Netty拥有高性能、 吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。

Netty 现在都在用的是4.x,5.x版本已经废弃,Netty 4.x 需要JDK 6以上版本支持

Netty的使用场景:

1)互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现。各进程节点之间的内部通信。Rocketmq底层也是用的Netty作为基础通信组件。

2)游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。

3)大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。

Netty通讯示例

Netty的maven依赖:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.35.Final</version>
</dependency>

Server端代码:

/**
 * @Description: TODO
 * @Author: etcEriksen
 * @Date: 2023/3/1
 **/
public class NettyServer {


    public static void main(String[] args) {
        //创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
        // bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //创建服务端的启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程来配置参数 设置两个线程组
            bootstrap.group(bossGroup,workerGroup)
                    //使用NioServerSocketChannel作为服务器管道的实现
                    .channel(NioServerSocketChannel.class)
                    //初始化服务器连接队列的大小,服务器端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接
                    //多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中进行等待
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //创建通道初始化对象,设置初始化参数
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //对workerGroup的SocketChannel设置处理器
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    }) ;
            System.out.println("netty server start ... ");
            //绑定一个端口并且同步,生成了一个ChannelFuture异步对象,通过isDone()等方法可以进行判断异步事件的执行情况
            //启动服务器(并且绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
            ChannelFuture cf = bootstrap.bind(9000).sync();
            //给cf注册监听器,监听我们关心的事件
            /*cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("监听端口9000成功");
                    } else {
                        System.out.println("监听端口9000失败");
                    }
                }
            });*/
            //对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
            //通过sync方法进行同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
            cf.channel().closeFuture().sync() ;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }



    }


}

Server端处理器代码:

/**
 * 自定义Handler处理器,需要进行继承netty规定好的某一个HandlerAdpter
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {


    /**
     * 读取客户端发送的数据
     * @param ctx 上下文对象,含有通道channel,管道pipeline
     * @param msg 就是客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器读取线程" + Thread.currentThread().getName());
        //Channel channel = ctx.channel();
        //ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
        //将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
    }


    /**
     * 数据读取完毕处理方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    /**
     * 处理异常, 一般是需要关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

}

Client端代码:

public class NettyClient {


    public static void main(String[] args) throws InterruptedException {
        //客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建客户端启动对象
            //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) // 使用 NioSocketChannel 作为客户端的通道实现
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            //加入处理器
                            channel.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            System.out.println("netty client start");
            //启动客户端去连接服务器端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }



}

Client端处理器代码:

/**
 * @Description: TODO
 * @Author: etcEriksen
 * @Date: 2023/3/1
 **/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端连接服务器完成就会触发该方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    //当通道有读取事件时会触发,即服务端发送数据给客户端
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }


}

看完代码,我们发现Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来,让你可以专注业务的开发,而不需写一大堆类似NIO的网络处理操作。

Netty线程模型

模型解释:

1) Netty 抽象出两组线程池BossGroup和WorkerGroup,BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写

2) BossGroup和WorkerGroup类型都是NioEventLoopGroup

3) NioEventLoopGroup 相当于一个事件循环线程组, 这个组中含有多个事件循环线程 , 每一个事件循环线程是NioEventLoop

4) 每个NioEventLoop都有一个selector , 用于监听注册在其上的socketChannel的网络通讯

5) 每个Boss NioEventLoop线程内部循环执行的步骤有 3 步

  • 处理accept事件 , 与client 建立连接 , 生成 NioSocketChannel
  • 将NioSocketChannel注册到某个worker NIOEventLoop上的selector
  • 处理任务队列的任务 , 即runAllTasks

6) 每个worker NIOEventLoop线程循环执行的步骤

  • 轮询注册到自己selector上的所有NioSocketChannel 的read, write事件,这些事件都被注册到Selector上的rdList集合中。事件发生后,操作系统会使用中断处理,这些事件加入到rdList集合中。
  • 处理 I/O 事件, 即read , write 事件, 在对应NioSocketChannel 处理业务
  • runAllTasks处理任务队列TaskQueue的任务 ,一些耗时的业务处理一般可以放入TaskQueue中慢慢处理,这样不影响数据在 pipeline 中的流动处理

7) 每个worker NIOEventLoop处理NioSocketChannel业务时,会使用 pipeline (管道),管道中维护了很多 handler 处理器用来处理 channel 中的数据

Netty模块组件

【Bootstrap、ServerBootstrap】:

Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。

【Future、ChannelFuture】:

正如前面介绍,在 Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。

但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。

【Channel】:

Netty 网络通信的组件,能够用于执行网络 I/O 操作。Channel 为用户提供:

1)当前网络连接的通道的状态(例如是否打开?是否已连接?)

2)网络连接的配置参数 (例如接收缓冲区大小)

3)提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。

4)调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方。

5)支持关联 I/O 操作与对应的处理程序。

不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应。

下面是一些常用的 Channel 类型:

NioSocketChannel,异步的客户端 TCP Socket 连接。
NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
NioDatagramChannel,异步的 UDP 连接。
NioSctpChannel,异步的客户端 Sctp 连接。
NioSctpServerChannel,异步的 Sctp 服务器端连接。
这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

【Selector】:

Netty是基于Selector对象实现的IO多路复用,通过Selector一个线程就可以监听到多个连接的Channel事件。当向一个Selector中注册Channel后,Selector内部的机制就可以不断的查询这些注册的Channel是否有已经就绪的IO事件(例如:可读,可写,网络连接完成等),这样程序就可以很简单的使用一个线程高效的管理多个Channel

【NioEventLoop】:

NioEventLoop中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行 I/O 任务和非 I/O 任务:

I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。

非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。

【NioEventLoopGroup】:

NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程。

【ChannelHandler】:

ChannelHandler是一个接口,处理IO事件或者拦截IO操作,并且将转发到其ChannelPipeline(业务处理链)中下一个处理程序。

ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:

ChannelInboundHandler 用于处理入站 I/O 事件。read事件

ChannelOutboundHandler 用于处理出站 I/O 操作。write事件

或者使用以下适配器类:

ChannelInboundHandlerAdapter 用于处理入站 I/O 事件。 read事件

ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作。write事件

【ChannelHandlerContext】:

保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。

【ChannelPipline】:

保存了ChannelHandler的List集合,用于处理或拦截Channel的入站事件(read)和出站事件(write)操作。

ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互。

在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下: 

 一个Channel包含了一个ChannelPipeline,而ChannelPipeline又维护了一个由ChannelHandlerContext组成的双向链表,并且每一个ChannelHandlerContext中又关联着一个ChannelHandler

read事件(入站事件)和write事件(出站事件)在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的handler,出站事件会从链表tail往前传递到最前一个出站的handler,两种类型的handler互不干扰。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/380974.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

wepack4配置入门

一、 webpack 简介 1.1 webpack 是什么 webpack是一种前端资源构建工具&#xff0c;一个静态模块打包器(module bundler)。 在 webpack 看来, 前端的所有资源文件(js/json/css/img/less/...)都会作为模块处理。 它将根据模块的依赖关系进行静态分析&#xff0c;打包生成对应的…

机器学习100天(三十二):032 KD树的构造和搜索

机器学习100天,今天讲的是:KD树的构造和搜索! 《机器学习100天》完整目录:目录 在 K 近邻算法中,我们计算测试样本与所有训练样本的距离,类似于穷举法。如果数据量少的时候,算法运行时间没有大的影响,但是如果数据量很大,那么算法运行的时间就会很长。这在实际的应用…

GFD563A101 3BHE046836R0101

GFD563A101 3BHE046836R0101关于高端涂布机张力控制系统方案的介绍高端涂布机张力控制系统方案涂布机是将具有某种功效的胶&#xff0c;或者油墨类物质均匀粘连在塑料薄膜、铝箔、纺织品等表面的机械设备。本系统从放卷到收卷共采用七台变频器&#xff0c;其中收放卷采用闭环张…

Databend 开源周报第 81 期

Databend 是一款现代云数仓。专为弹性和高效设计&#xff0c;为您的大规模分析需求保驾护航。自由且开源。即刻体验云服务&#xff1a;https://app.databend.com 。Whats New探索 Databend 本周新进展&#xff0c;遇到更贴近你心意的 Databend 。Accepted RFCsrfc: 查询结果缓存…

Cartesi 举办的2023 黑客马拉松

Cartesi 是具有 Linux 运行时的特定于应用程序的Rollups执行层。Cartesi 的特定应用程序 Optimistic Rollup 框架使区块链堆栈足够强大&#xff0c;开发人员可以构建计算密集型和以前不可能的去中心化实例。Cartesi 的 RISC-V 虚拟机支持 Linux 运行时环境&#xff0c;允许像你…

DevOps 学习笔记(二)| 使用 Harbor

文章目录1. 上传镜像到 Harbor2. 拉取 Harbor 镜像3. 使用 Jenkins 操作 Harbor1. 上传镜像到 Harbor 首先在CI/CD 服务器中配置 Docker mkdir -p /etc/docker/ cd /etc/docker/ vim daemon.json其中的 IP 地址为Harbor 服务器的 IP 地址 {"xxx": "xxxx"…

IGH主站通信测试csp模式(DC同步 preemrt)连通一从站并实现控制

IGH主站通信测试 linuxcnc配置基础机器人控制LinuxCNC与EtherCAT介绍&&PDO&SDO&#xff0c;搭建环境步骤 需要配置IGH主站的查看这篇文章 linux系统学习笔记7——一次性安装igh-ethercat主站 CSP模式 DC同步方式 preemrt实时补丁 直接上代码&#xff0c;这…

YOLOV7模型调试记录

先前的YOLOv7模型是pytorch重构的&#xff0c;并非官方提供的源码&#xff0c;而在博主使用自己的数据集进行实验时发现效果并不理想&#xff0c;因此生怕是由于源码重构导致该问题&#xff0c;此外还需进行对比实验&#xff0c;因此便从官网上下载了源码&#xff0c;进行调试运…

二叉树——二叉搜索树中的插入操作

二叉搜索树中的插入操作 链接 给定二叉搜索树&#xff08;BST&#xff09;的根节点 root 和要插入树中的值 value &#xff0c;将值插入二叉搜索树。 返回插入后二叉搜索树的根节点。 输入数据 保证 &#xff0c;新值和原始二叉搜索树中的任意节点值都不同。 注意&#xff0c…

配置二层远程端口镜像案例

实验拓扑&#xff1a; 实验需求&#xff1a; 如图1所示&#xff0c;某公司行政部通过SwitchA与外部Internet通信&#xff0c;监控设备Server通过SwitchB与SwitchA相连。 现在希望Server能够远程对行政部访问Internet的流量进行监控。 操作步骤&#xff1a; 配置观察端口 # 在…

C/C++开发,无可避免的多线程(篇一).跨平台并行编程姗姗来迟

一、编译环境准备 在正式进入c/c多线程编程系列之前&#xff0c;先来搭建支持多线程编译的编译环境。 1.1 MinGW&#xff08;win&#xff09; 进入Downloads - MinGW-w64下载页面&#xff0c;选择MinGW-w64-builds跳转下载&#xff0c; 再次进行跳转&#xff1a; 然后进入下载页…

Fiddler抓包之Fiddler过滤器(Filters)调试

Filters&#xff1a;过滤器&#xff0c;帮助我们过滤请求。 如果需要过滤掉与测试项目无关的抓包请求&#xff0c;更加精准的展现抓到的请求&#xff0c;而不是杂乱的一堆&#xff0c;那功能强大的 Filters 过滤器能帮到你。 2、Filters界面说明 fiddler中的过滤 说明&#…

新增2000w播放、单月涨粉80w!13秒短视频竟成B站热门

知识区自从被设立为一级分区后&#xff0c;B站就成了大家口中的“互联网大学”&#xff0c;有什么不懂的知识就习惯上B站搜一搜、查一查。根据B站官方出具的创作者报告数据显示&#xff0c;除了众多自发原创知识作品的UP主以外&#xff0c;还有超过300位名师学者加入B站&#x…

【亲测可用】BEV Fusion (MIT) 环境配置

CUDA环境 首先我们需要打上对应版本的显卡驱动&#xff1a; 接下来下载CUDA包和CUDNN包&#xff1a; wget https://developer.download.nvidia.com/compute/cuda/11.6.2/local_installers/cuda_11.6.2_510.47.03_linux.run sudo sh cuda_11.6.2_510.47.03_linux.runwget htt…

pytorch-softmax解决分类问题,用fashion-mnist为例子,再走一遍数据获取到模型预测的流程。深度了解分类指标的递进关系

softmax回归 线性回归模型适用于输出为连续值的情景。在另一类情景中&#xff0c;模型输出可以是一个像图像类别这样的离散值。对于这样的离散值预测问题&#xff0c;我们可以使用诸如softmax回归在内的分类模型。和线性回归不同&#xff0c;softmax回归的输出单元从一个变成了…

当ChatGPT遇见Python

在如火如荼的 ChatGPT 大潮当中&#xff0c;已经衍生出了各种各样的周边产品。Python 作为著名的万金油工具&#xff0c;怎么能没有它的身影呢。今天我们就介绍两种通过 Python 调用 ChatGPT 的方法&#xff0c;一起来看看吧~chatgpt-wrapper这是一个开源在 GitHub 上的项目&am…

leetcode 236. 二叉树的最近公共祖先

给定一个二叉树, 找到该树中两个指定节点的最近公共祖先。 百度百科中最近公共祖先的定义为&#xff1a;“对于有根树 T 的两个节点 p、q&#xff0c;最近公共祖先表示为一个节点 x&#xff0c;满足 x 是 p、q 的祖先且 x 的深度尽可能大&#xff08;一个节点也可以是它自己的祖…

华为机试题:HJ86 求最大连续bit数(python)

文章目录&#xff08;1&#xff09;题目描述&#xff08;2&#xff09;Python3实现&#xff08;3&#xff09;知识点详解1、input()&#xff1a;获取控制台&#xff08;任意形式&#xff09;的输入。输出均为字符串类型。1.1、input() 与 list(input()) 的区别、及其相互转换方…

Linux下 C/C++ NTP网络时间协议详解

NTP&#xff08;Network Time Protocol&#xff0c;网络时间协议&#xff09;是由RFC 1305定义的时间同步协议。它是通过网络在计算机系统之间进行时钟同步的网络协议。NTP 在公共互联网上通常能够保持时间延迟在几十毫秒以内的精度&#xff0c;并在理想条件下&#xff0c;它能…

Molecule:使用Jetpack Compose构建StateFlow流

Molecule:使用Jetpack Compose构建StateFlow流 看下面的jetpack compose片段&#xff1a; Composable fun MessageCard(message: Message) {Column {Text(text message.author)Text(text message.body)} }这段代码最有趣的部分是它实际上是reactive。其反应性为 通过Composa…