Netty核心组件EventLoop源码解析

news2025/1/19 17:12:09

源码解析目标

  • 分析最核心组件EventLoop在Netty运行过程中所参与的事情,以及具体实现

源码解析

  • 依然用netty包example下Echo目录下的案例代码,单我们写一个NettyServer时候,第一句话就是 EventLoopGroup bossGroup = new NioEventLoopGroup(1);,我们先来看看NioEventLoop的UML图

在这里插入图片描述

  • 首先我们看到ScheduledEecutorService接口,这个接口是concurrent包下的一个定时任务接口,EventLoop实现了这个接口,因此可以接受定时任务,所以我们在Debug的时候,能在EventLoop中找到一个scheduledTaskQueue
  • EventLoop接口我们看下源码,如下,从注释中我们了解到,EventLoop中一旦注册了Channel,就会处理该Channel对应的所有I/O操作
/**
 * Will handle all the I/O operations for a {@link Channel} once registered.
 *
 * One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on
 * implementation details and internals.
 *
 */
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    @Override
    EventLoopGroup parent();
}
  • SingleThreadEventExecutor 也是一个比较重要的类,看源码注释,说明了SingleThreadEventExecutor 是一个单个线程的线程池

/**
 * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
 *
 */
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {......}
  • 在SingleThreadEventExecutor 类中实现了很多对线程池的操作,例如runAllTask,executer,takeTask,pollTask,看下其中一个构造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
//其中taskQueue初始化
 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
    }
  • 如上,SingleThreadEventExecutor 队列中元素是实现了Runnable接口的对象,线程池中最重要的方法当然是executer方法,EventLoop是SingleThreadEventExecutor 的子类,那么EventLoop 类也可以直接调用executer方法来完成对事件的执行,我们来看源码
 @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
  • inEventLoop(); 首先判断EventLoop中线程是否是当前线程,如果是,则直接将task添加到线程池队列中
  • 如果不是则尝试启动一个线程(因为是单个线程的线程池,所以只能且只需要启动一次),之后在将任务添加到队列中去
  • isShutdown() && removeTask(task)) 中逻辑 如果线程已经停止并且删除任务失败,则直接拒绝策略
  • 接着看下addTask的实现
 /**
     * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
     * before.
     */
    protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }

    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        return taskQueue.offer(task);
    }
  • 从注释中可以看出,addTask方法会添加一个task任务到队列中,如果当前线程是shutdown的状态,那么直接抛出异常RejectedExecutionException
  • 接着来看executer方法中的startThread(); ,当我们判断当前线程不是EventLoop中的线程的时候会执行这个方法,他是NioEventLoop中的核心方法,如下源码
 private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try {
                    doStartThread();
                } catch (Throwable cause) {
                    STATE_UPDATER.set(this, ST_NOT_STARTED);
                    PlatformDependent.throwException(cause);
                }
            }
        }
    }

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ')');
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

  • state == ST_NOT_STARTED 首先通过状态判断是否执行过,保证EventLoop只有一个线程

  • 如果没有启动 用cas的方式STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED) 去修改状态为ST_STARTED,直接调用doStartThread方法

  • 如果失败就回滚

  • 接着分析doStartThread方法,首先会调用Executor的execute方法,这个Executor 是我们在创建EventLoopGroup时候创建的是一个ThreadPerTaskExecutor类,如下图是在channel中对应的EventLoop找到的对象信息,该execute方法会将Runable包装成Netty的FastThreadLocalThread

在这里插入图片描述

  • 接着通过Thread.currentThread() 判断线程是否中断
  • updateLastExecutionTime(); 然后设置最后一次执行的事件
  • 核心方法是:SingleThreadEventExecutor.this.run(); 执行单曲NioEventLoop的run方法,等会重点关注
  • 接着完成run方法的事物处理后,在finally中使用cas不断的修改state状态,设置为ST_SHUTTING_DOWN,也就是当loop中run方法结束运行后,关闭线程,最后还会通过不断轮询来二次确认是否关闭,否则不会break跳出
  • 接下来分析EventLoop中的Run方法,我们进入Run方法,就到了我们之前分析过的NioEventLoop中的run方法,此方法做了三件事情,如下源码

@Override
    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
  • NioEventLoop 中的loop轮询是依靠run方法来执行的,在方法中可以看到是一个for循环其中三件事情,如下图中EventLoop部分

    • case SelectStrategy.SELECT: 当事件类似是SELECT 时候, 通过select(wakenUp.getAndSet(false));方法获取感兴趣的事件
    • processSelectedKeys(); 处理选中的事件
    • runAllTasks 执行队列中的任务。
      在这里插入图片描述
  • 上图不管是bossGroup还是WorkerGroup中的EventLoop都是次run方法执行流程

  • select方法实现

private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    break;
                }
                if (Thread.interrupted()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
        }
    }
  • 关注点在于 select方法如何体现出非阻塞,如下图中,选择器获取对应事件debug
    在这里插入图片描述

  • 在select中传如参数是1 秒,也就是默认情况下阻塞1秒中,具体的算法: long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;

  • 其中 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 表示当前时间 - 定时任务的时间,那么timeoutMillis 意思就是,当有定时任务的时候 delayNanos(currentTimeNanos) 时间就部位空,那么定时任务剩余时间 t +0.5秒阻塞的时间,否则就默认1秒中阻塞时间

  • 接着判断: if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks())

    • 如果1秒(或者t+0.5)后能获取到selectedKeys :selectedKeys != 0
    • 或者select被用户唤醒 :oldWakenUp, wakenUp.get()
    • 或者任务队列中有任务存在 : hasTasks()
    • 或者有定时任务即将被执行 : hasScheduledTasks()
  • 有以上任何情况则跳出循环,否则继续沦陷,直到满足其中一个条件为止

  • 接着processSelectedKeys对获取到的selectKey处理

  • 在接着runAllTasks 执行队列任务

总结
  • 每次执行execute方法就会向队列中添加任务。当第一次添加时候就启动线程,执行run方法,run方法是EventLoop的核心实现,负责轮询获取事件,处理事件,执行队列中任务
  • 其中调用selector的select方法默认阻塞一秒,有定时任务就t+0.5,t是定时任务剩余时间,当执行execute方法时候,也就是添加任务的时候,唤醒selector,防止selector阻塞事件过长
  • 当selector返回的时候,会调用processSelectedKeys对selectKey进行处理
  • 当processSelectedKeys 方法执行结束,按照ioRatio比例执行runAllTasks方法默认是 IO 任务时间和非 IO 任务时间是相同的代码如下
  final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/378299.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

html2canvas将页面dom元素内容渲染成图片保存至本地

html2canvas:https://html2canvas.hertzen.com/configuration/ github:https://github.com/niklasvh/html2canvas 效果 代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compa…

VR直播丨颠覆性技术革命,新型直播已经到来

细数当下最火热的营销手段&#xff0c;首先浮现脑海的无疑是“直播”。前有罗永浩、李佳琦&#xff0c;后有刘畊宏和东方甄选&#xff0c;直播如日中天&#xff0c;俨然成了大众足不出户就能休闲娱乐的重要途径。 而随着虚拟现实在“十四五规划”中被列入“建设数字中国”数字…

一文了解GPU并行计算CUDA

了解GPU并行计算CUDA一、CUDA和GPU简介二、GPU工作原理与结构2.1、基础GPU架构2.2、GPU编程模型2.3、软件和硬件的对应关系三、GPU应用领域四、GPUCPU异构计算五、MPI与CUDA的区别一、CUDA和GPU简介 CUDA&#xff08;Compute Unified Device Architecture&#xff09;&#xf…

Java 常用 API

文章目录一、Math二、System三、Object1. toString() 方法2. equals() 方法四、Arrays1. 冒泡排序2. Arrays 常用方法五、基本类型包装类1. Integer2. int 和 String 相互转换3. 字符串中数据排序4. 自动装箱和拆箱六、日期类1. Date2. SimpleDateFormat3. Calendar4. 二月天一…

(四十七)大白话表锁和行锁互相之间的关系以及互斥规则是什么呢?

今天我们接着讲&#xff0c;MySQL里是如何加表锁的。这个MySQL的表锁&#xff0c;其实是极为鸡肋的一个东西&#xff0c;几乎一般很少会用到&#xff0c;表锁分为两种&#xff0c;一种就是表锁&#xff0c;一种是表级的意向锁&#xff0c;我们分别来看看。 首先说表锁&#xf…

如何使用Arsenal快速部署功能强大的Bug Bounty工具

关于Arsenal Arsenal是一个功能强大且使用简单的Shell脚本&#xff08;Bash&#xff09;&#xff0c;该工具专为漏洞赏金猎人设计&#xff0c;在该工具的帮助下&#xff0c;我们可以轻松在自己环境中安装并部署目前社区中功能最为强大的网络侦查工具、漏洞扫描工具和其他安全研…

企业活动直播如何设置VIP观看席?

阿酷tony / 2023-2-28 / 长沙 / 多图内容企业活动直播如何设置VIP观看席&#xff1f;有意思吧&#xff0c;直播也能设vip席位。在直播间可以分设尊享嘉宾席、特邀VIP以及观众席三个区域&#xff0c;为企业提供多种用户接待模式&#xff0c;不仅能为嘉宾营造尊享VIP体验&#xf…

Git学习(1)pro git阅读

目录 目录&#xff1a; 1. 起步 2. Git 基础 3. Git 分支 4. 服务器上的 Git 5. 分布式 Git 第一章 1.3 Git是什么 1.6运行git前的配置 该开源图书网站 Git - Book (git-scm.com) 目录&#xff1a; 1. 起步 1.1 关于版本控制1.2 Git 简史1.3 Git 是什么&#xff1f;1…

Fedora系统安装KubeVela

话不多说直接看命令 Docker安装 Vela安装需要先安装Docker sudo yum -y install docker只需这行命令便可以自动添加 yum和dnf理论上都能成功&#xff0c;但是很看网速&#xff0c;&#xff0c;&#xff0c;实践证明yum是最好的。 如果发生报错mirrors trieds大概率就是网速超…

[oeasy]python0096_游戏娱乐行业_雅达利_米洛华_四人赛马_影视结合游戏

游戏娱乐行业 回忆上次内容 游戏机行业从无到有 雅达利 公司 一枝独秀并且带领 行业 发展起来 雅达利公司 优秀员工 乔布斯 在 朋友 帮助下完成了《pong》 Jobs 黑了 Woz 一部分收入 然后拿着钱 去印度禅修了 游戏行业 会如何继续 呢&#xff1f;?&#x1f914; 灵修 乔布…

常见损失函数Loss Function的选择(regression problem)

损失函数Loss Function的设计是机器学习模型的核心问题&#xff0c;一般情况下函数式子会分成两项&#xff1a;衡量预估值和目标间的差距、正则项式。其中正则项式子一般用于衡量模型的复杂度&#xff0c;可以避免模型过拟合&#xff08;奥卡姆剃刀原理&#xff09;。 另一部分…

【Node.js】MySQL数据库的第三方模块(mysql)

mysql安装操作MySQL数据库的第三方模块&#xff08;mysql&#xff09;通过第三方模块&#xff08;mysql2&#xff09;连接到MySQL数据库mysql插入数据mysql插入数据的便捷方式mysql更新数据mysql更新数据的便捷方式mysql删除数据安装操作MySQL数据库的第三方模块&#xff08;my…

Direct IO

目录 一、基本介绍 二、使用方法与Demo 三、O_DIRECT 与 O_SYNC 一、基本介绍 如上图所示&#xff0c;普通的 IO 读写&#xff0c;会先将内容保存在缓冲区中&#xff0c;文件落盘需要调用 fflush 、fsync 等方法。 而 DirectIO 是无缓冲 IO&#xff0c;&#xff0c;使用无缓…

>>数据管理:DAMA简介「考试和续期」

关于DAMA,这里就不再多做描述,可以参考以前写的一些简介或官方介绍。下面就考试再做一些详细介绍。 1 区别 CDGA:数据治理工程师(Certified Data Governance Associate),“DAMA中国”组织的数据治理方面的职业认证考试。 CDGP:数据治理专家(Certified Data Governa…

数学小课堂:无穷小(用动态和极限的眼光看世界)

文章目录 引言I 极限1.1 柯西对极限的认知1.2 极限准确的定义1.3 数列极限的定义1.4 函数极限的定义1.5 无穷小(特殊的极限)1.6 定量和逆向思维1.7 认知升级的过程引言 身处于渐变世界的人类,难以理解瞬间突变。 老师的作用,就是用大白话,把数学语言所写的知识,翻译成大…

基于ANN以使用有监督和无监督的学习将其分为不同的类别或识别模式(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 人工神经网络&#xff08;ANN&#xff09;在包括技术或统计在内的每一个分支中都变得越来越有用&#xff0c;以分析一些社会或非…

【C++容器】vector、map、hash_map、unordered_map四大容器的性能分析【2023.02.28】

摘要 vector是标准容器对数组的封装&#xff0c;是一段连续的线性的内存。map底层是二叉排序树。hash_map是C11之前的无序map&#xff0c;unordered_map底层是hash表&#xff0c;涉及桶算法。现对各个容器的查询与”插入“性能做对比分析&#xff0c;方便后期选择。 测试方案…

QT-自定义滑动式日期选择

QT-自定义滑动式日期选择前言一、效果演示二、注意说明二、关键程序1.SliderDateTime.cpp2.Slider.cpp四、程序链接前言 1、使用鼠标滑动的方式选择指定的日期时间&#xff0c;并且获取当前选中的时间&#xff0c;整体样式看来十分舒服&#xff0c;更加适用触摸屏的方式。 2、…

Python进阶-----面向对象1.0(对象和类的介绍、定义)

目录 前言&#xff1a; 面向过程和面向对象 类和对象 Python中类的定义 &#xff08;1&#xff09;类的定义形式 &#xff08;2&#xff09;深层剖析类对象 前言&#xff1a; 感谢各位的一路陪伴&#xff0c;我学习Python也有一个月了&#xff0c;在这一个月里我收获满满…

理解Spring中的依赖注入和控制反转

依赖注入&#xff08;Dependency Injection&#xff09;是一种面向对象编程的设计模式&#xff0c;用于解决对象之间的依赖关系。它的基本思想是将对象的创建和管理工作交给容器来完成&#xff0c;而不是在应用程序中手动创建和管理对象&#xff0c;从而达到松耦合、易维护、易…