Netty笔记02-组件EventLoop

news2025/1/22 16:46:40

文章目录

  • EventLoop概述
    • EventLoop 的概念
    • EventLoop 的作用
    • EventLoop 的生命周期
    • EventLoopGroup
    • EventLoop 的工作原理
    • 总结
  • 代码示例
    • 💡 优雅关闭
    • 演示 NioEventLoop 处理 io 事件
    • 解决work中的channel读操作耗费时间过长,影响其他channel(客户端)的问题
    • 💡 handler 执行中如何换人?


EventLoop概述

EventLoop 的概念

在 Netty 中,EventLoop 是一个非常核心的概念,它是 Netty 异步事件驱动模型的基础。EventLoop 负责处理 I/O 事件,包括读、写、连接和断开连接等操作。理解 EventLoop 的工作机制对于高效地使用 Netty 至关重要。

EventLoop 是 Netty 中的一个抽象类,它封装了一个线程和一组注册到该线程的 Channel。
EventLoop 的主要职责是处理与 Channel 相关的 I/O 事件,并且负责执行与 Channel 相关联的任务。

EventLoop 的作用

  • 事件处理:EventLoop 负责监听 Channel 上的 I/O 事件,并在这些事件发生时调用相应的处理器。
  • 任务执行:EventLoop 还负责执行与 Channel 关联的任务,这些任务可以是用户提交的业务逻辑或者 Netty 内部的一些操作。

EventLoop 的生命周期

EventLoop 的生命周期通常与它所绑定的 Channel 的生命周期相关。当一个 Channel 被创建时,它会被绑定到一个 EventLoop,并且这个 EventLoop 会负责处理该 Channel 的所有 I/O 事件。当 Channel 关闭时,EventLoop 仍然会存在,但它不再处理与该 Channel 相关的事件。

EventLoopGroup

EventLoopGroup 是 EventLoop 的容器,它管理一组 EventLoop 实例。EventLoopGroup 为 Channel 提供了一个或多个 EventLoop 供其使用。Netty 中常见的 EventLoopGroup 类型包括 NioEventLoopGroup 和 EpollEventLoopGroup。

NioEventLoopGroup:针对 NIO 模型的 EventLoopGroup,适用于大多数操作系统。
EpollEventLoopGroup:针对 Linux 操作系统的 EventLoopGroup,使用 epoll 代替 select,性能更高。

EventLoop 的工作原理

在一个典型的 Netty 服务器中,EventLoop 的工作原理如下:

  1. 初始化:当创建一个 Netty 服务器时,会创建一个或多个 EventLoopGroup。这些 EventLoopGroup 通常包括两个部分:一个 Boss Group 用于接收新的连接,一个 Worker Group 用于处理 I/O 操作。
    • Boss Group:接收客户端的连接请求,并为每个连接分配一个 Worker Group 中的 EventLoop。
    • Worker Group:处理 I/O 操作,包括读、写、连接和断开连接等。
  2. 事件循环:每个 EventLoop 都有一个事件循环,它不断地轮询注册在其上的 Channel,查找感兴趣的 I/O 事件。
    • Select:EventLoop 会调用 select 方法来轮询 Channel,查找感兴趣的事件。
    • 事件处理:当事件发生时,EventLoop 会调用相应的处理器来处理这些事件。
  3. 任务执行:除了处理 I/O 事件之外,EventLoop 还可以执行与 Channel 相关的任务,如用户提交的业务逻辑任务。
    • 任务队列:每个 EventLoop 都有一个任务队列,用于存放待执行的任务。
  4. 事件传播:如果一个 Channel 需要从一个 EventLoop 转移到另一个 EventLoop,Netty 会确保事件传播的一致性。

总结

EventLoop (事件循环对象)本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup

EventLoopGroup (事件循环组)是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop

代码示例

@Slf4j
public class Test01EventLoop {
    public static void main(String[] args) {
        // 1. 创建事件循环组
        EventLoopGroup group = new NioEventLoopGroup(2);
        //NioEventLoopGroup是功能最全面的,既能处理io事件,也能向其提交普通任务,定时任务
//        EventLoopGroup group = new DefaultEventLoopGroup();
        // DefaultEventLoopGroup不能处理io事件,只能处理普通任务,定时任务

        //创建NioEventLoopGroup时,如果不指定线程数,底层代码使用NettyRuntime.availableProcessors()创建线程数
//        System.out.println(NettyRuntime.availableProcessors());//16

        // 2. 获取下一个事件循环对象
        //当创建的线程数为2时,使用group.next()依次获取对象(类似轮询的效果)
        System.out.println(group.next());
        System.out.println(group.next());
        System.out.println(group.next());
        System.out.println(group.next());

        // 3. 执行普通任务,作用:可以用来执行耗时较长的任务
//        group.next().submit(() ->{
//            try {
//                Thread.sleep(1000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//            log.debug("ok");
//        });
//        group.next().execute(() -> {
//            try {
//                Thread.sleep(1000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//            log.debug("ok");
//        });
        //submit()和execute()效果相同

        // 4. 执行定时任务
        group.next().scheduleAtFixedRate(() -> {
            log.debug("ok");
        }, 0, 1, TimeUnit.SECONDS);

        log.debug("main");
    }
}

💡 优雅关闭

优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。

演示 NioEventLoop 处理 io 事件

服务器端两个 nio worker 工人

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                // boss 和 worker
                // 细分1:
                //  boss 只负责 ServerSocketChannel 上 accept 事件
                //  worker 只负责 socketChannel 上的读写
                //group参数1:boss不需要指定线程数,因为ServerSocketChannel只会跟一个EventLoop进行绑定,
                //              又因为服务器只有一个,所以只会占用一个线程,不用指定线程数。
                //group参数1:work线程指定为两个
                .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            /**
                             * @param ctx
                             * @param msg ByteBuf类型
                             * @throws Exception
                             */
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

细分1:

  • boss 只负责 ServerSocketChannel 上 accept 事件
  • worker 只负责 socketChannel 上的读写

客户端

@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        // 2. 带有 Future,Promise 的类型都是和异步方法配套使用,用来处理结果
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override // 在连接建立后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                // 1. 连接到服务器
                // 异步非阻塞, main 发起了调用,真正执行 connect 是 nio 线程
                .connect(new InetSocketAddress("localhost", 8080)); // 1s 秒后

        // 2.1 使用 sync 方法同步处理结果
        channelFuture.sync(); // 阻塞住当前线程,直到nio线程连接建立完毕
        Channel channel = channelFuture.channel();
        log.debug("{}", channel);
        channel.writeAndFlush("hello, world");
        channel.writeAndFlush("hello, world");
        channel.writeAndFlush("hello, world");
        System.out.println();
    }
}

从服务器端代码可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定
在这里插入图片描述

建立连接后,服务器会始终拿同一个EventLoop(线程)处理客户端所有的请求
在这里插入图片描述

解决work中的channel读操作耗费时间过长,影响其他channel(客户端)的问题

创建一个独立的 EventLoopGroup,将耗时的代码放到一个额外的组中线程处理

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        // 细分2:创建一个独立的 EventLoopGroup,将耗时的代码放到一个额外的组中线程处理
        EventLoopGroup group = new DefaultEventLoopGroup();
        new ServerBootstrap()
                // boss 和 worker
                // 细分1:
                //  boss 只负责 ServerSocketChannel 上 accept 事件
                //  worker 只负责 socketChannel 上的读写
                //group参数1:boss不需要指定线程数,因为ServerSocketChannel只会跟一个EventLoop进行绑定,
                //              又因为服务器只有一个,所以只会占用一个线程,不用指定线程数。
                //group参数1:work线程指定为两个
                .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
//                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//                            /**
//                             * @param ctx
//                             * @param msg ByteBuf类型
//                             * @throws Exception
//                             */
//                            @Override
//                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//                                ByteBuf buf = (ByteBuf) msg;
//                                log.debug(buf.toString(Charset.defaultCharset()));
//                            }
//                        });

                        
                        //如果读操作耗费的时间很上,会影响其他客户端的读写操作,
                        // 一个work管理多个channel,如果其中一个耗时过长则会影响其他channel的读写操作
                        ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
                            /**
                             * @param ctx
                             * @param msg ByteBuf类型
                             * @throws Exception
                             */
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(Charset.defaultCharset()));
                                ctx.fireChannelRead(msg); // 让消息传递给下一个handler,如果不添加则消息不会传递给handler2中
                            }
                        }).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
                            @Override                                         // ByteBuf
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

细分2:创建一个独立的 EventLoopGroup,将耗时的代码放到一个额外的组中线程处理
以下两个客户端依次发送消息
在这里插入图片描述
可以看到,nio 工人和 非 nio 工人也分别绑定了 channel
在这里插入图片描述

💡 handler 执行中如何换人?

关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
    EventExecutor executor = next.executor();//返回下一个handler的eventLoop(eventLoop继承了EventExecutor)
    
     //executor.inEventLoop():当前handler中的线程是否和executor(eventLoop)是同一个线程
    // 是,直接调用
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } 
    // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
    else {
         //下一个handler线程创建一个线程执行next.invokeChannelRead(m);
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
        //目的是切换线程
    }
}

全部文章:
Netty笔记01-Netty的基本概念与用法
Netty笔记02-组件EventLoop
Netty笔记03-组件Channel
Netty笔记04-组件Future & Promise
Netty笔记05-组件Handler & Pipeline

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2137430.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

开源链动 2+1 模式 AI 智能名片与 S2B2C 商城小程序在用户运营中的应用

摘要&#xff1a; 本文深入探讨了用户运营中不同用户阶段的特点及策略&#xff0c;引入“开源链动 21 模式 AI 智能名片 S2B2C 商城小程序”&#xff0c;分析其在用户运营各个阶段的作用和价值&#xff0c;旨在为企业提供更高效的用户运营方案&#xff0c;实现用户价值的最大化…

Spring 框架——@Async 注解

目录 1.同步调用与异步调用1.1.同步调用1.2.异步调用1.3.总结 2.注解 Async 介绍2.1.用在方法上2.2.用在类上 3.使用演示3.1.在启动类或者配置类上增加 EnableAsync 注解3.2.在异步方法上增加 Async 注解3.3.调用异步方法3.4.测试3.5.其它说明 4.注意事项4.1.Async 注解失效的常…

【Qt绘图】—— 运用Qt进行绘图

目录 &#xff08;一&#xff09;基本概念 &#xff08;二&#xff09;绘制各种形状 2.1 绘制线段 2.2 绘制矩形 2.3 绘制圆形 2.4 绘制文本 2.5 设置画笔 2.6 设置画刷 &#xff08;三&#xff09;绘制图片 3.1 绘制简单图片 3.2 平移图片 3.3 缩放图片 3.4…

Linux 手动安装Ollama

Linux 离线安装Ollama 前言 不知道为什么 在阿里云服务器上 执行curl -fsSL https://ollama.com/install.sh | sh一键安装 非常慢 所以只能手动装了 1.到 https://ollama.com/install.sh 下载安装执行文件 修改其中 下载和安装部分代码 if curl -I --silent --fail --location…

Python数据分析-Numpy快速入门

一、什么是Numpy 二、 创建 Numpy ndarray对象 三、数组中的维度 1.各种维度数组 2.检查维度数 3.创建更高维度的数组 四、数组索引 1.访问数组元素 2.访问2-D数组元素 其他维度的同理 3.负索引 五、数据裁剪&#xff1a;要头不要尾 1.裁剪数组 demo&#xff1a; 2.负裁…

构建基于 Feign 的微服务:从 Eureka 到负载均衡的实践 --day05

目录 步骤1&#xff1a;创建父工程feign-1步骤2&#xff1a;改造服务提供者使用 RequestMapping使用 GetMapping 步骤3&#xff1a;改造服务消费者为Feign客户端&#xff08;1&#xff09;添加Feign依赖&#xff08;2&#xff09;添加EnableFeignClients注解&#xff08;3&…

YoloV10 训练自己的数据集(推理,转化,C#部署)

目录 一、下载 三、开始训练 train.py detect.py export.py 超参数都在这个路径下 四、C#读取yolov10模型进行部署推理 如下程序是用来配置openvino 配置好引用后就可以生成dll了 再创建一个控件&#xff0c;作为显示 net framework 4.8版本的 再nuget工具箱里下载 …

thinkphp6开发的通用网站系统源码

thinkphp6开发的通用网站系统源码。 基于ThinkPHP6框架开发的通用后台权限管理系统&#xff0c;底层采用国内最流行的ThinkPHP6框架&#xff0c; 支持内容管理、文章管理、用户管理、权限管理、角色管理等功能。 代码下载

Cookie和Session的对比

Cookie和Sesion 一、cookie和session创建对象 2024/9/15 10:23:59 你想了解的是如何在某种编程语言中创建和管理 cookies 和 sessions 吗&#xff1f;如果是的话&#xff0c;具体是哪个语言或框架呢&#xff1f; 2024/9/15 10:24:09 二、创建对象是客户端还是服务器 2024/…

20Kg载重30分钟续航多旋翼无人机技术详解

一、机架与结构设计 1. 材料选择&#xff1a;为了确保无人机能够承载20Kg的负载&#xff0c;同时实现30分钟的续航&#xff0c;其机架材料需选用轻质高强度的材料&#xff0c;如碳纤维或铝合金。这些材料不仅具有良好的承重能力&#xff0c;还能有效减轻无人机的整体重量&…

一步一步搭建AI智能体应用

您可以在百炼控制台以零代码的方式快速创建智能体应用&#xff0c;并将RAG&#xff08;Retrieval-Augmented Generation&#xff0c;检索增强生成&#xff09;以及插件能力集成进来。应用创建完成后&#xff0c;您可以通过控制台或API的方式来使用。 以下均以 大模型应用指代 智…

微信小程序使用 ==== 粘性布局

目录 Chrome杀了个回马枪 position:sticky简介 你可能不知道的position:sticky 深入理解粘性定位的计算规则 粘性定位其他特征 代码实现 微信小程序在scroll-view中使用sticky Chrome杀了个回马枪 position:sticky早有耳闻也有所了解&#xff0c;后来&#xff0c;Chro…

通过API接口获取下来的数据需要怎样应用?

在当今数字化时代&#xff0c;通过API接口获取数据已成为企业获取、处理和分析信息的重要手段。API接口不仅能够提高数据交互的效率&#xff0c;还能促进数据的安全性和灵活性。以下是如何将通过API接口获取的数据有效应用的一些方法和策略。 数据整合与分析 企业可以通过API接…

QPS和TPS的区别简单理解

QPS&#xff08;Queries Per Second&#xff09; QPS是指每秒查询率&#xff0c;它是衡量服务器处理能力的一个指标&#xff0c;表示服务器在一秒钟内能够响应的查询次数。这个指标通常用于数据库或服务器的性能测试&#xff0c;反映了服务器在规定时间内处理流量的能力。QPS …

浮动元素详解

浮动元素 代码实现&#xff1a; <!DOCTYPE html> <html><head><meta charset"utf-8"><title>浮动元素</title><style>#container1 {width: 400px;height: 50px;background-color: lightgrey;border: 1px solid;}#contai…

世界杯足球赛网站---附源码73185

摘 要 随着互联网的普及和足球运动的全球性影响力&#xff0c;建立一个专门的世界杯足球赛网站成为了与球迷互动和传播赛事信息的重要途径。本论文聚焦于世界杯足球赛网站的设计与实现&#xff0c;旨在探讨如何利用现代技术为球迷提供一个全方位的足球赛事体验。 通过对 Spring…

上传头像,访问本地图片

文件大坑&#xff1a; web项目&#xff1a;首先不能直接访问本地资源&#xff0c;只能够访问服务器上的资源。 所以我想就储存数据到服务器&#xff0c;但是这样有个问题就是&#xff0c;当重新启动程序时&#xff0c;服务器上的所有文件会被重新编译&#xff0c;导致之前的文…

HarmonyOS开发实战( Beta5.0)自动生成动态路由实践

鸿蒙HarmonyOS开发往期必看&#xff1a; HarmonyOS NEXT应用开发性能实践总结 最新版&#xff01;“非常详细的” 鸿蒙HarmonyOS Next应用开发学习路线&#xff01;&#xff08;从零基础入门到精通&#xff09; 介绍 本示例将介绍如何使用装饰器和插件&#xff0c;自动生成动…

【UE5】使用2DFlipbook图作为体积纹理,实现实时绘制体积纹理

这是一篇对“Creating a Volumetric Ray Marcher-Shader Bits”的学习心得 文章时间很早&#xff0c;因此这里针对UE5对原文做出兼容性修正&#xff08;为避免累赘不做出注明。链接如上&#xff0c;有需要自行学习&#xff09; 以及最后对Custom做可能的蓝图移植&#xff0c;做…

Qt中样式表常用的属性名称定义

Qt中&#xff0c;用好样式表&#xff0c;不但可以做出意想不到的酷炫效果&#xff0c;有时候也能减轻开发量&#xff0c;可能由于你不了解某些样式使用&#xff0c;想破脑袋通过代码实现的效果&#xff0c;反倒不如别人用样式&#xff0c;一两句样式脚本就搞定。 Qt中&#xff…