NettyのEventLoopChannel

news2024/11/26 21:18:45

Netty的重要组件:EventLoop、Channel、Future & Promise、Handler & Pipeline、ByteBuf

本篇主要介绍Netty的EventLoop和Channel组件。

1、Netty入门案例

        服务器端的创建,主要分为以下步骤:

  • 创建serverBootstrap对象。
  • 配置服务器的线程模型。可以指定两个线程模型,parentGroup是专门负责接收连接的线程模型,childGroup是处理读写事件的工作线程模型。

      

  • 设置 Channel 类型,这里使用NioServerSocketChannel,是基于NIO实现的,还有其他的实现如下:

  • 配置 Channel 初始化器,使用ChannelInitializer初始化NioSocketChannel,在这里我们配置了处理字符串编码以及打印字符串。
  • 绑定端口。
  ServerBootstrap serverBootstrap = new ServerBootstrap();
        //接受连接的线程  or 工作线程
        serverBootstrap.group(new NioEventLoopGroup())
                //服务器ServerSocketChannel的实现 是NIO 还是 BIO
                .channel(NioServerSocketChannel.class)
                .childHandler(
                        //初始化与客户端进行数据读写的通道,并且添加别的handler 在连接建立后回调
                        new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel channel) throws Exception {
                                //处理编码(解码 )
                                channel.pipeline().addLast(new StringDecoder());
                                //自己的业务逻辑,比如打印字符串
                                channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        System.out.println(msg);
                                    }
                                });

                            }
                        }
                )
                .bind(8080);

        客户端的创建步骤与服务器端创建的步骤类似,但是在connect方法后需要加上.sync(),以及调用writeAndFlush()方法进行收发数据。(为什么会这样后面会说明)

 new Bootstrap()
                .group(new NioEventLoopGroup())
                //客户端SocketChannel的实现
                .channel(NioSocketChannel.class)
                //初始化与服务器进行数据读写的通道,并且添加别的handler 在连接建立后回调
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //字符输出编码
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost",8080))
                .sync()//阻塞方法 连接建立成功后才会执行
                .channel()
                .writeAndFlush("netty");//收发数据,会调用ch.pipeline().addLast(new StringEncoder());所添加的处理器

        启动程序看一下效果(先启动服务器,再启动客户端)

2、EventLoop

        下面我们开始介绍Netty中的第一个组件,配置服务器的线程模型时设置的EventLoop。

        EventLoop类关系图:

        我们首先看下EventLoop的NIO实现:NioEventLoopGroup的构造方法:

        可以看到它有多个重载的构造方法:

        在构造方法中可以指定线程的数量,如果使用的是无参构造方法,那么默认传递的线程数是0,但是会把0传递给其他的构造方法:

        最后调用父类的构造:

        在父类的构造中,会判断传入的线程参数是否为0。

        目前很显然条件成立,就会获取成员变量DEFAULT_EVENT_LOOP_THREADS 并且再次作为参数调用父类的构造。成员变量DEFAULT_EVENT_LOOP_THREADS 会在静态代码块中被赋值。

        如果能获取到"io.netty.eventLoopThreads" key对应的value,就以该值为准,否则线程数是当前cpu核心数*2

//NioEventLoopGroup构造方法指定线程数 如果不指定为CPU可运行核心数 * 2
NioEventLoopGroup loopGroup = new NioEventLoopGroup(2);

         一个 EventLoop由一个单独的线程驱动,它不断轮询 I/O 事件并执行相应的任务:

log.debug("{}",loopGroup.next());
log.debug("{}",loopGroup.next());
log.debug("{}",loopGroup.next());

        并且EventLoop的内部维护了一个任务队列,可以提交任务到这个队列中,由 EventLoop的线程顺序执行。

        向EventLoop提交任务:

//执行普通任务
loopGroup.next().submit(() -> log.debug("run..."));
//执行定时任务
loopGroup.next().scheduleAtFixedRate((Runnable) () -> log.debug("run with schedule..."),0,1, TimeUnit.SECONDS);

        下面通过一个案例来加深对EventLoop的理解:

        改造最初的入门案例中的代码,主要体现在.group方法,这次传递了两个参数,将负责连接的EventLoop和负责读写的EventLoop分离开,并且给负责读写的EventLoop设置了两个线程。

 //boss只负责接受连接 1个线程 worker负责读写 2个线程
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))

        服务端的代码和之前的相同,我们先启动一个服务端,再启动三个客户端:

        三个连接(channel)由两个EventLoop轮询处理,并且每个连接(channel)和EventLoop是绑定的,一个EventLoop可以负责多条消息的处理。

        如图所示:

        同时EventLoop也有不同的实现:

        案例进一步细化,我们可以再设置一个EventLoop 专门处理其他任务:

EventLoopGroup eventLoop = new DefaultEventLoopGroup();

         改造.childHandler:

  .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    /**
                     * 工序有多道,合在一起就是 pipeline,
                     * pipeline 负责发布事件(读、读取完成...)传播给每个 handler,
                     * handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
                     * @param ch
                     * @throws Exception
                     */
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast("handle1",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = ((ByteBuf) msg);
                                log.debug(byteBuf.toString(Charset.defaultCharset()));
                                //责任链模式,将消息传递个下一个处理器
                                ctx.fireChannelRead(msg);
                            }
                        }).addLast(eventLoop,"handle2",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = ((ByteBuf) msg);
                                log.debug(byteBuf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })

        启动一个客户端:

        相当于链式调用:

小结:

  • 在创建EventLoopGroup实例时,可以指定线程数,如果没有指定,默认使用cpu核心数*2。
  • 调用ServerBootstrap的.group时,可以将EventLoop细化成为专门处理连接以及负责读写的。
  • 可以利用pipeline()的.addLast,链式组合多个EventLoopGroup,实现不同的功能。
  • 每一个EventLoop可以轮询处理多个Channel事件,但是会和Channel绑定,线程间相互独立。

3、Channel

        Channel在Netty中代表一个可以执行I/O操作(如读、写、连接、绑定等)的对象。它可以是一个套接字连接、文件、管道等。

        Netty支持多种类型的Channel,包括NioSocketChannel(用于客户端连接)、NioServerSocketChannel(用于服务端绑定)以及其他专门用途的Channel如EmbeddedChannel等。

        Channel的I/O操作都是异步的,会返回一个ChannelFuture对象,用于表示操作的结果:

 ChannelFuture channelFuture = new Bootstrap()
                 // 。。。。。。
                .connect(new InetSocketAddress("localhost", 8080));

        并且每个Channel都有一个与之关联的ChannelPipeline。Pipeline中包含多个ChannelHandler,每个Handler负责特定的处理逻辑。

为什么在案例代码中,客户端写出数据之前必须要调用.sync()方法?

        因为Channel的I/O操作都是异步的,是主线程调用了.connect() 方法,但是建立连接是在NioSocketChannel所在线程。.sync()方法的作用就是让主线程在此处阻塞,等到NioSocketChannel所在线程建立完成连接,主线程才会继续向下执行。(如果不使用.sync()方法,主线程会在连接没有建立完成的时候继续执行后续代码,服务端无法正常接受消息。)

        与此类似的还有.close()方法,如果我们想在断开连接后执行一段自己的逻辑:

        客户端代码:

        //... new BootStrap...
        //接受控制台输入,q则断开连接
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while (true){
                String str = scanner.nextLine();
                if ("q".equals(str)){
                    //关闭channel连接
                    channel.close();
                    break;
                }
                channel.writeAndFlush(str);
            }
        },"input").start();

        可以将执行断开后逻辑的代码放在input 线程的channel.close();后,或者主线程中吗?

        答案是否定的:

  1. 如果放在主线程中,那么input 线程和主线程是并行执行的,无法控制先后顺序。
  2. 如果放在input 线程的channel.close(); 后,也是不行的。因为channel.close(); 方法也是异步调用,由NioSocketChannel所在线程负责关闭连接:

        解决该问题有两个方案:

        方案一的思路和解决.connect() 方法异步调用的类似,都是使用.sync()方法阻塞主线程,等待input 线程中的channel.close(); 执行完成后再由主线程处理连接关闭后释放资源

ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
log.debug("连接关闭后释放资源");

        第二种方案是通过调用ChannelFuture的.addListener 方法,添加一个监听器,由nioEventLoopGroup所在线程关闭连接后,处理连接关闭后释放资源(释放资源和处理后续都是nioEventLoopGroup同一线程。):

closeFuture.addListener((ChannelFutureListener) future -> log.debug("连接关闭后释放资源"));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1832438.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

10.Docker Compose容器编排

文章目录 Compose简介安装和卸载步骤核心概念compose文件两要素 使用步骤Compose常用命令微服务测试本地编码打包编写Dockerfile文件构建镜像 不使用Compose调试使用Compose调试WordPress测试验证增量更新 Compose简介 ​ docker建议我们每一个容器中只运行一个服务,因为docke…

Misc之图片隐写

前几天忙高数和c考试去了。。。Web毫无进展&#xff0c;学学这个放松一下 一、工具准备 这里目前使用的工具为kali上的工具和安装在电脑上的Winhex&#xff0c;010editor&#xff0c;Stegsolve 二、png图片隐写 这里我就直接用题目学习了&#xff0c;也是参考了csdn上大佬的…

05通讯录管理系统——添加联系人

功能描述&#xff1a;实现添加联系人功能&#xff0c;联系人上限为1000人&#xff0c;联系人信息包括姓名、性别、年龄、联系电话、家庭住址。 添加联系人实现步骤&#xff1a; 1.设计联系人结构体 2.设计通讯录结构体 3.main函数中创建通讯录 4.封装添加联系人函数 5.测…

软考系统规划与管理师伴读脑图第9章

周末发系统规划与管理师的试听视频&#xff0c;占用了发送次数&#xff0c;所以上周的脑图推迟了今天发出。 不知不觉已经发到了第9章&#xff0c;感叹这就是坚持积累下来的力量&#xff0c;其实考试也是一样的道理。

《骑行健身:“柳叶刀”研究揭示的健康与经济双赢策略》

在这个物价飞涨、经济压力日益加重的时代&#xff0c;普通人如何在不增加额外负担的情况下提升生活质量&#xff1f;《柳叶刀》的最新研究为我们揭开了一个意想不到的秘密&#xff1a;坚持健身&#xff0c;尤其是骑行&#xff0c;竟等同于每年为自己赚取了一笔不小的财富。这一…

多叉树的DFS深度优先遍历,回溯法的基础算法之一

一、前言 多叉树一般用于解决回溯问题。 想必大家都学过二叉树&#xff0c;以及二叉树的深度优先遍历和广度优先遍历&#xff0c;我们思考&#xff1a;能不能将二叉树的DFS转化为多叉树的DFS&#xff1f; 二、多叉树的结构 多叉树的本质&#xff0c;就是一棵普通的树&#x…

C语言数据存储大小端问题

大小端 什么是大小端 大端模式&#xff08;Big-endian&#xff09;&#xff0c;是指数据的高字节&#xff0c;保存在内存的低地址中&#xff0c;而数据的低字节&#xff0c;保存在内存的高地址中; 小端模式&#xff08;Little-endian&#xff09;&#xff0c;是指数据的高字…

mcms-5.2.8环境部署

1 数据库 1.1 新建数据库 1.2 导入数据表 2 tomcat配置 2.1 在IDEA中tomcat环境并配置 首先添加tomcat服务器并配置 配置Artifacts&#xff08;这里配置不正确的话&#xff0c;在运行时会报错&#xff1a;Error during artifact deployment. See server log for details.&am…

日常销售数据分析为什么重要?三个维度全面分析日常销售数据

在当今电子商务的浪潮席卷全球的时代&#xff0c;网店如雨后春笋般涌现&#xff0c;并且竞争日趋激烈。在这样一个充满挑战与机遇的环境中&#xff0c;如何洞察市场动向&#xff0c;把握消费者需求&#xff0c;实现销售业绩的稳步增长&#xff0c;成为每一位电商运营者必须面对…

【1990-2023】上市公司高新技术企业数据(Excel+stata)+do代码

数据简介&#xff1a;根据《上市公司资质认定信息文件》 数据进行整理。筛选“认定项目类型” 为“高新技术企业”&#xff1b;筛选“认定对象身份”为“上市公司本身”&#xff0c;根据“认定时间”和“有效期限”判断当年是否为高新技术企业。有效期限通常为3年&#xff0c;缺…

4.类,方法,对象

1.1.2. 面向对象程序设计的三大特征 1.1.2.1. 封装 面向对象编程核心思想之一就是将数据和对数据的操作封装在一起&#xff0c;形成一般的概念&#xff0c;比如类的概念。 1.1.2.2. 继承 继承体现了一种先进的编程模式。子类可以继承父类的属性和方法。 1.1.2.3. 多态 多…

CMake从安装到精通

目录 引言 1. CMake的安装 2. CMake的原理 3. CMake入门 3.1 CMakeLists.txt与注释 3.2 版本指定与工程描述 3.3 生成可执行程序 3.4 定义变量与指定输出路径 3.5 指定C标准 3.6 搜索文件 3.7 包含头文件 4. CMake进阶 4.1 生成动静态库 4.2 链接动静态库 4.…

【图像分割】DSNet: A Novel Way to Use Atrous Convolutions in Semantic Segmentation

DSNet: A Novel Way to Use Atrous Convolutions in Semantic Segmentation 论文链接&#xff1a;http://arxiv.org/abs/2406.03702 代码链接&#xff1a;https://github.com/takaniwa/DSNet 一、摘要 重新审视了现代卷积神经网络&#xff08;CNNs&#xff09;中的atrous卷积…

计算机组成原理(四)Cache存储器

文章目录 Cache存储器的基本原理cache命中率、平均访问时间、效率地址映射全相联映射直接映射组相联映射 查找算法cache 存储器替换策略cache 存储器-写操作策略习题 Cache存储器的基本原理 Cache是一种高速缓冲寄存器&#xff0c;是为了解决CPU和主存之间速度不匹配而采用的一…

检索增强生成(RAG)的挑战与优化措施

如何理解检索增强生成&#xff08;RAG&#xff09; 简单来说&#xff0c;RAG就是让LLM通过外部知识源获取额外信息&#xff0c;从而生成更准确、更符合上下文的答案&#xff0c;并减少错误信息&#xff08;或称为“幻觉”&#xff09;的产生。 我们都知道&#xff0c;最先进的…

计数排序(Counting Sort)

计数排序&#xff08;Counting Sort&#xff09; 计数排序是一个非基于比较的排序算法&#xff0c;该算法于1954年由 Harold H. Seward 提出。它的优势在于在对一定范围内的整数排序时&#xff0c;快于任何比较排序算法。排序思路: 1.找出待排序数组最大值2.定义一个索引最大…

Python学习打卡:day08

day8 笔记来源于&#xff1a;黑马程序员python教程&#xff0c;8天python从入门到精通&#xff0c;学python看这套就够了 目录 day858、数据容器(序列)的切片序列的常用操作——切片 59、序列的切片课后练习60、集合的定义和操作集合的定义集合的操作添加新元素移除元素从集合…

NATAPP-内网穿透工具----下载与配置

NATAPP-内网穿透工具 基于ngrok的国内高速内网穿透服务&#xff0c;natapp提供了一种便利的方式&#xff0c;使得开发和测试过程更加高效&#xff0c;尤其是在需要进行远程调试或展示时。无论是进行web开发、微信和支付宝的本地开发调试&#xff0c;还是简单地从外部网络访问家…

如何根据使用场景选购3D扫描仪?

三维扫描建模是指通过专业的三维扫描仪对产品进行三维数据的采集&#xff0c;快速获取物体精确的3D数据&#xff0c;实现1:1复刻原物体&#xff0c;扫描后所得的数字化3D模型以obj、fbx、glb、gltf等格式保存。 积木易搭自主研发多款三维扫描设备&#xff0c;拥有多项国家专利&…

初学者必看的web前端开发学习路线,干货满满!

初学者必看的web前端开发学习路线,干货满满&#xff01; 随着互联网的深入发展,前端工程师这个岗位在市场上的需求&#xff0c;薪资也是很可观的。前端很火&#xff0c;想自学前端的人也很多。包括一些学生、上班族、以前的UI&#xff0c;java&#xff0c;或完全零基础&#xf…