Netty学习——源码篇5 EventLoop

news2025/1/10 0:36:12

1 Reactor线程模型

        Reactor线程模型 中对Reactor的三种线程模型——单线程模型、多线程模型、主从多线程模型做了介绍,这里具体分析Reactor在Netty中的应用。

1.1单线程模型

单线程模型处理流程如下图:

        单线程模型,即Accept的处理和Handler的处理都在同一个线程中。这个模型的弊端是:当其中某个Handler阻塞时,会导致其他所有的Client的Handler都无法执行,并且更严重是,Handler的阻塞也会导致整个服务不能接收新的Client请求(因为Accept也被阻塞了)。 因为这个缺陷,所以单线程Reactor模型在Netty中的应用场景比较少。

1.2 多线程模型

        Netty中Reactor多线程模型的应用如如下图:

        1、设计一个专门的线程Accept,用于监听客户端的TCP连接请求。

        2、客户端的I/O操作都是由一个特定的NIO线程池负责。每个客户端连接都与一个特定的NIO线程绑定,因此在这个客户端连接中的所有I/O操作都是在同一个线程中完成的。

        3、客户端连接有很多,但是NIO线程数是比较少的,因此一个NIO线程可以同时绑定到多个客户端连接中。

1.3 主从多线程模型

        主从Reactor多线程模型在Netty中的应用,如下图

             一般情况下,Reactor的多线程模型已经适用于大部分业务场景。但如果服务端需要同时处理大量的客户端连接请求,或者需要再客户端连接时增加一些诸如权限的校验等操作,那么单个Accept就很有可能处理不过来,将会造成大量的客户端连接超时。主从Reactor多线程模型将服务端接收客户端的连接请求专门设计为一个独立的连接池。主从Reactor多线程模型和Reactor多线程模型很类似,只是在主动Reactor多线程模型的Accept线程池中获取数据,通过认证鉴权后进行派遣,再分配给Reactor线程池来处理客户端请求。

2 EventLoopGroup与Reactor关联

        不同的设置NioEventLoopGroup的方式对应了不同的Reactor线程模型。

        1、单线程模型在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup);

            首先实例化一个NioEventLoopGroup,接着调用bootstrap.group(bossGroup)设置服务端的EventLoopGroup。这里有个疑惑:在启动服务端的Netty程序时,需要设置bossGroup和workerGroup,为什么这里只设置了1个bossGroup?原因很简单,ServerBootstrap重写了group方法,代码如下:

@Override
    public ServerBootstrap group(EventLoopGroup group) {
        return group(group, group);
    }

        因此,当传入一个group时,bossGroup和workerGroup就是同一个NioEventLoopGrouop,并且这个NioEventLoopGroup线程池数量只设置了1个线程,也就是说Netty中的Acceptor和后续的所有客户端连接的I/O操作都是在一个线程中处理的。那么对应到Reactor的线程模型中,这样设置NioEventLoopGroup,就相当于Reactor的单线程模式。

        2、多线程在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(16);
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup);

        从代码中可以看出,只需要将NioEventLoopGroup的参数设置大于1,就是Reactor多线程模型。

        3、主从Reactor模型在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap server = new ServerBootstrap();
        server.group(bossGroup,workGroup);

        bossGroup为主线程,而workerGroup中的线程数是CPU核数*2,因此对应到Reactor线程模型中,这样设置的NioevGroup就是主从Reactor多线程模型。

3 EventLoopGroup的实例化

        首先,来看一下EventLoopGroup的类结构图,如下图:

        然后,在通过时序图来了解一下EventLoopGroup初始化的基本过程,如下图:

 

          基本步骤如下:

        1、EventLoopGroup内部维护一个属性为EventExecutor的children的数组,其大小是nThreads,这样就初始化一个线程池。

        2、在实例化NioEventLoopGroup时,如果指定县城池大小,则nThreads就是指定的值,否则是CPU核数 * 2。

        3、在MultithreadEventExecutorGroup中调用newChild()抽象方法来初始化children数组。

        4、newChild()抽象方法实际上是在NioEventLoopGroup中实现的,由它返回一个NioEventLoop实例。

        5、初始化NioEventLoop对象并给属性赋值,具体赋值属性如下:

                (1)provider:就是在NioEventLoopGroup构造器中,调用SelectorProvider的provider()方法获取的SelectorProvider对象。

                (2)selector:就是在NioEventLoop构造器中,调用selector=provider.openSelector()方法获取的Selector对象。

4 执行任务者EventLoop

        NioEventLoop继承自SingleThreadEventLoop,而SingleThreadEventLoop又继承自SingleThreadEventExecutor。SingleThreadEventExecutor是Netty对本地线程的抽象,它内部有一个Thread属性,实际上就是存储了一个本地Java 线程。因此可以简单的认为,一个NioEventLoop对象其实就是一个和特定的线程进行绑定,并且在NioEventLoop声明周期内,其绑定的线程都不会再改变,NioEventLoop的类层次结构图如下:

        NioEventLoop的类层次结构比较复杂,只需要关注重点即可。首先看NioEventLoop的继承关系:NioEventLoop继承SingleThreadEventLoop, SingleThreadEventLoop继承SingleThreadEventExecutor,SingleThreadEventExecutor继承AbstractScheduledEventExecutor。

        在AbstractScheduledEventExecutor,Netty实现了NioEventLoop的Schedule功能,即通过调用一个NioEventLoop实例的schedule方法来运行一些定时任务。而在SingleThreadEventLoop中,又实现了任务队列的功能。通过它,可以调用一个NioEventLoop实例的execute()方法向任务队列中添加一个Task,并由NioEventLoop进行调度执行。

        通常来说,NioEventLoop负责执行两个任务:第一个任务是作为I/O线程,执行与Channel相关的I/O操作,包括调用Selector等待就绪的I/O事件、读写数据与数据处理等;第二个任务是作为任务队列,执行taskQueue中的人物,例如用户调用eventLoop.schedule提交的定时任务也是由这个线程执行的。

4.1 NioEventLoop的实例化过程

        先了解一下EventLoop实例化的运行时序图,如下:

        从上图可以看出,SingleThreadEventExecutor有一个名为thread的Thread类型属性,这个属性就是与SingleThreadEventExecutor关联的本地线程。来看thread是在哪里被赋值的,代码如下:

private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ')');
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

         前面分析过,SingleThreadEventExecutor启动时会调用doStartThread方法,然后调用executor.execute方法,将当前线程赋值给thread。在这个线程中所做的事情主要就是调用SingleThreadEventExecutor.this.run()方法,因为NioEventLoop实现了这个方法,所以根据多态性,其实调用的是NioEventLoop.run方法。

4.2 EventLoop与Channel关联

        在Netty中,每个Channel都有且仅有一个EventLoop与之关联,它们的关联过程如下图:

               

        从上图可以看到,当调用AbstractChannel.register()方法后,就完成了Channel和EventLoop的关联,register方法的具体实现如下:

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

        在register方法中,会将一个EventLoop赋值给AbstractChannel内部的eventLoop属性,这句代码就完成了EventLoop与Channel的关联过程。

4.3 EventLoop 启动

        前面已经介绍NioEventLoop本身就是一个SingleThreadEventExecutor,因此NioEventLoop的启动,其实就是NioEventLoop所绑定的本地Java线程的启动。

        按照这个思路,只需要找到在哪里调用了SingleThreadExecutor中thread属性的start方法就可以知道在哪里启动这个线程了。前面分析过,其实thread.start()被封装在SingleThreadExecutor.startThread()方法中,代码如下:

    private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }

        STATE_UPDATER是SingleThreadExecutor内部维护的一个属性,它的作用是标识当前的Thread的状态。在初始化的时候,STATE_UPDATER == ST_NOT_STARTED,因此第一次调用startThread方法时,就会进入if语句内,进而调用thread.start方法。而这个关键的startThread方法又是在哪调用的呢?用方法调用关系反向查找功能,就恢复阿贤,startTahread方法是在SingleThreadEventExecutor的execute方法中调用的,代码如下:

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

        既然如此,现在只需要找到第一次调用SingleThreadEventExecutor的execute方法的位置即可。前面在提到注册Channel的过程中,会在AbstractChannel的register方法中调用eventLoop.execute方法,在EventLoop中进行Channel注册代码的执行,AbstractChannel的register方法的关键代码如下:

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
           //删除判断代码

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                   //删除异常处理代码
                }
            }
        }

        很明显,从Boostrap的bind方法一路跟踪到AbstractChannel的register方法,整个代码都是在主线程中运行的,因此上面的eventLoop.inEventLoop()返回值为false,于是进入else分支,在这个分支中调用eventLoop.execute方法,而NioEventLoop没有实现execute方法,因此调用的是SingleThreadEventExecutor的execute方法,关键代码如下:

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

       由于 inEventLoop ==false,因此直行道else分支就调用startThread方法来启动SingleThreadEventExecutor内部关联的Java本地线程。用一句话总结:当EventLoop的execute方法第一次被调用时,会触发startThread方法的调用,进而启动EventLoop所对应的Java本地线程。

        完整的EventLoop启动时序图如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1542797.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构】链表习题之环形链表的约瑟夫问题

👑个人主页:啊Q闻 🎇收录专栏:《数据结构》 🎉道阻且长,行则将至 前言 今天这道题目时牛客上的题目,名为环形链表的约瑟夫问题,很有趣的的一道题目 环形链表的约瑟…

图文详解电容的9大功能

电容是电路设计中最为普通常用的器件,也常常在高速电路中扮演重要角色。 电容的用途非常多,主要有如下几种: 1. 隔直流: 作用是阻止直流通过而让交流通过。 2. 旁路(去耦): 为交流电路中某…

干货详解如何通过代理IP使用 Puppeteer?

Puppeteer 在全球拥有数百万用户,堪称最流行的无头浏览器之一。对于任何与自动浏览相关的任务来说,该工具都是不可或缺的。在接下来的段落中,我们将了解如何在 Puppeteer 中使用代理以及在设置过程中使用哪些技巧。 一、Puppeteer中的代理IP是…

【数据结构】带头双向链表的实现

👑个人主页:啊Q闻 🎇收录专栏:《数据结构》 🎉道阻且长,行则将至 前言 带头双向链表是链表的一种,相较于单链表的实现,其更为简单 一.初识带头双向循环链表 带头…

宁波ISO14068碳中和,ISO14068认证,ISO14068辅导

ISO 14068是国际标准化组织(ISO)📝发布的关于碳中和的标准✒️,也被称为“碳中和国际标准”。该标准🧰定义了碳中和的📱概念,包括组织或产品👠通过自身减排、边界内🫧碳清…

力扣-20 有效的括号详解 Java

目录 1.题目分析 2.基础知识储备 2.1 哈希表 2.2 栈的存取 3. 逻辑概要 4.源码 示例 1.题目分析 为了对比都是从内而外,一个个匹配,全部匹配成功即为有效字符 2.基础知识储备 2.1 哈希表 简单来说,keyvalue存储 ,通过key…

探索LLaMA模型:架构创新与Transformer模型的进化之路

引言 在人工智能和自然语言处理领域,预训练语言模型的发展一直在引领着前沿科技的进步。Meta AI(前身为Facebook)在2023年2月推出的LLaMA(Large Language Model Meta AI)模型引起了广泛关注。LLaMA模型以其独特的架构…

【微服务】Spring Boot 版本升级到 2.7.18

前言 目前项目上扫描出一些 Java 依赖的代码漏洞&#xff0c;需要对现有依赖版本升级&#xff0c;记录一下遇到的问题。 <spring-boot.version>2.3.2.RELEASE</spring-boot.version> <spring-cloud.version>Hoxton.SR9</spring-cloud.version> <s…

饼图渲染的关键

1) 创建一个DOM对象,有自定义的高和宽. 2) 引入Echarts软件包并导入到对应文件内 npm i Echarts import 文件.js script src.../文件 3) 初始化一个对象 4) 对象的方法实现饼图渲染 data内的数据,且当一个对象已经渲染一遍,再执行这个,会对setOption的参数进行更新,其…

ctfshow web入门 反序列化

254 分析代码&#xff1a; 如果用户名和密码参数都存在&#xff0c;脚本会创建一个 ctfShowUser 类的实例 $user。 接着&#xff0c;调用 $user->login($username, $password) 方法尝试登录。如果登录成功&#xff08;即用户名和密码与类中的默认值匹配&#xff09;&#…

MyBatis 入门笔记

课程地址 Mybatis 是一个优秀的持久层框架&#xff0c;用于简化 JDBC 操作 快速入门 POJO Plain Old Java Object 建表 create database mybatis; use mybatis; drop table if exists tb_user;create table tb_user(id int primary key auto_increment,username varchar(2…

万里牛和金蝶云星空接口打通对接实战

万里牛和金蝶云星空接口打通对接实战 源系统:万里牛 万里牛作为行业领先的全渠道零售云服务商&#xff0c;成立于2011年&#xff0c;核心成员来自于阿里巴巴、信雅达等知名企业&#xff0c;是业内最早的SaaSERP服务商&#xff0c;致力于为企业提供全渠道零售一站式解决方案。万…

Gemma开源AI指南

近几个月来&#xff0c;谷歌推出了 Gemini 模型&#xff0c;在人工智能领域掀起了波澜。 现在&#xff0c;谷歌推出了 Gemma&#xff0c;再次引领创新潮流&#xff0c;这是向开源人工智能世界的一次变革性飞跃。 与前代产品不同&#xff0c;Gemma 是一款轻量级、小型模型&…

Web安全基础入门+信息收集篇

教程介绍 学习信息收集&#xff0c;针对域名信息,解析信息,网站信息,服务器信息等&#xff1b;学习端口扫描&#xff0c;针对端口进行服务探针,理解服务及端口对应关系&#xff1b;学习WEB扫描&#xff0c;主要针对敏感文件,安全漏洞,子域名信息等&#xff1b;学习信息收集方法…

AIGC、3D模型、轻量化、格式转换、可视化、数字孪生引擎...

老子云3D可视化快速开发平台&#xff0c;集云压缩、云烘焙、云存储云展示于一体&#xff0c;使3D模型资源自动输出至移动端PC端、Web端&#xff0c;能在多设备、全平台进行展示和交互&#xff0c;是全球领先、自主可控的自动化3D云引擎。 平台架构 平台特性 基于 HTML5 和 Web…

踏青智能伙伴,尽享户外乐趣

春风拂面&#xff0c;花香四溢&#xff0c;正是踏青赏花的好时节。想要尽情享受户外的美好时光吗&#xff1f;华为手环8将是你户外的好搭子&#xff01;它不仅拥有精准的天气预报功能&#xff0c;还能播放你喜爱的音乐&#xff0c;记录户外步行轨迹&#xff0c;并实现遥控拍照&…

[深度学习]yolov8+pyqt5搭建精美界面GUI设计源码实现一

【简单介绍】 基于YOLOv8与PyQt5的精美界面GUI设计&#xff0c;旨在为用户提供一个直观、易用且功能强大的目标检测平台。通过结合YOLOv8的先进目标检测能力与PyQt5的丰富界面设计元素&#xff0c;我们打造了一款高效、稳定的软件产品。 在界面设计上&#xff0c;我们注重用户…

【机器学习】基于北方苍鹰算法优化的BP神经网络分类预测(NGO-BP)

目录 1.原理与思路2.设计与实现3.结果预测4.代码获取 1.原理与思路 【智能算法应用】智能算法优化BP神经网络思路【智能算法】北方苍鹰优化算法&#xff08;NGO)原理及实现 2.设计与实现 数据集&#xff1a; 数据集样本总数2000 多输入单输出&#xff1a;样本特征24&#x…

语音转文字——sherpa ncnn语音识别离线部署C++实现

简介 Sherpa是一个中文语音识别的项目&#xff0c;使用了PyTorch 进行语音识别模型的训练&#xff0c;然后训练好的模型导出成 torchscript 格式&#xff0c;以便在 C 环境中进行推理。尽管 PyTorch 在 CPU 和 GPU 上有良好的支持&#xff0c;但它可能对资源的要求较高&#x…

【4月】CDA Club 第2期数据分析组队打卡学习活动开启!

活动名称 CDA Club 第2期数据分析组队打卡学习活动 活动介绍 本次打卡活动由CDA俱乐部旗下学术部主办。目的是通过数据分析科普内容&#xff0c;为数据分析爱好者提供学习和交流的机会。方便大家利用碎片化时间在线学习&#xff0c;以组队打卡的形式提升学习效果&#xff0c…