Netty—EventLoop

news2025/1/10 10:56:37

文章目录

  • 一、EventLoopGroup 是什么?🤔️
  • 二、NioEventLoop 有哪些重要组成部分?🔍
  • 三、NioEventLoop 的 thread 在何时启动?
  • 三、 run() 方法中线程在干嘛?

一、EventLoopGroup 是什么?🤔️

EventLoop,即"事件循环对象",本质是一个单线程执行器(同时维护了一个 Selector),里面有run方法处理 Channel 上源源不断的 io 事件。

EventLoop它的继承关系比较复杂

  • 继承java.util.concurrent.ScheduledExecutorService ,因此包含了线程池中所有方法
  • 继承netty自身的OrderedEventExecutor,它提供了boolean inEventLoop(Thread thread);方法判断一个线程是否属于此 EventLoop,EventLoopGroup parent();方法查看自己属于哪个 EventLoopGroup。

EventLoopGroup,即“事件循环组”,是一组 EventLoop,channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 channel 上的io事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)。

EventLoopGroup继承 netty 自己的 EventExecutorGroup,实现了 Iterable 接口提供遍历 EventLoop 的能力,另有 next() 方法获取集合中下一个 EventLoop。
在这里插入图片描述

二、NioEventLoop 有哪些重要组成部分?🔍

接下来我们通过源码查看 NioEventLoop 有哪些部分组成~

通过查看 NioEventLoop 类,我们发现其维护了 Selector!可是为什么是两个selector呢?这就是netty优化selector之处,在后面会为大家详细讲解!

public final class NioEventLoop extends SingleThreadEventLoop {
		
  	//......
    /**
     * The NIO {@link Selector}.
     */
    private Selector selector;
    private Selector unwrappedSelector;
}

我们查看NioEventLoop的父类io.netty.util.concurrent.SingleThreadEventExecutor,维护了一个线程和任务队列!

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
		//......

    private final Queue<Runnable> taskQueue;

    private volatile Thread thread;
}

其父类io.netty.util.concurrent.AbstractScheduledEventExecutor中维护了一个任务队列!

public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
		//......

    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
}

NioEventLoop主要由selector、线程和任务队列组成,NioEventLoop既会处理io事件,也可以处理普通任务和定时任务!

NioEventLoop 中的 selector 何时创建?

我们发现在EventLoop的构造方法中就已经将Selector创建!

在这里插入图片描述

NioEventLoop 为何有两个 selector 成员呢?

原生的selector中对于selectedKeys的实现是采用Set结构,想必大家一定知道set的遍历性能并不高!netty使用反射的方式将其读取出来替换成数组结构去维护~

在这里插入图片描述

三、NioEventLoop 的 thread 在何时启动?

当然是在执行 execute() 方法时创建咯,我们通过debug的方式来查看服务端这边NioEventLoop的nio线程启动过程!

以上通过bind()方法一直点下去,直至 io.netty.bootstrap.AbstractBootstrap#doBind0

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

此时调用了channel所关联的eventLoop的execute()方法,我们查看io.netty.util.concurrent.SingleThreadEventExecutor#execute

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
  	// 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列
    addTask(task);
    if (!inEventLoop) {
       // inEventLoop 如果为 false 表示由其它线程来调用 execute,即首次调用,这时需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThread
        startThread();
        if (isShutdown()) {
          // 如果已经 shutdown,做拒绝逻辑
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
      // 如果线程由于 IO select 阻塞了,添加的任务的线程需要负责唤醒 NioEventLoop 线程
        wakeup(inEventLoop);
    }
}

在这里插入图片描述

此时会先调用inEventLoop(),判断当前线程是否是eventLoop线程。显然在此时eventLoop线程是null。inEventLoop 如果为 false 表示由其它线程来调用 execute,即首次调用,这时需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThread

在这里插入图片描述

进入startThread() 方法,其内部通过状态位控制线程只会启动一次。其内部使用执行器创建一个线程,并将其赋给eventLoop的thread属性,调用外部类 SingleThreadEventExecutor 的 run 方法,进入死循环!

在这里插入图片描述

三、 run() 方法中线程在干嘛?

io.netty.channel.nio.NioEventLoop#run 主要任务是执行死循环,不断看有没有新任务,有没有 IO 事件

protected void run() {
    for (;;) {
        try {
            try {
                // calculateStrategy 的逻辑如下:
                // 有任务,会执行一次 selectNow,清除上一次的 wakeup 结果,无论有没有 IO 事件,都会跳过 switch
                // 没有任务,会匹配 SelectStrategy.SELECT,看是否应当阻塞
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:

                    case SelectStrategy.SELECT:
                        // 因为 IO 线程和提交任务线程都有可能执行 wakeup,而 wakeup 属于比较昂贵的操作,因此使用了一个原子布尔对象 wakenUp,它取值为 true 时,表示该由当前线程唤醒
                        // 进行 select 阻塞,并设置唤醒状态为 false
                        boolean oldWakenUp = wakenUp.getAndSet(false);
                        
                        // 如果在这个位置,非 EventLoop 线程抢先将 wakenUp 置为 true,并 wakeup
                        // 下面的 select 方法不会阻塞
                        // 等 runAllTasks 处理完成后,到再循环进来这个阶段新增的任务会不会及时执行呢?
                        // 因为 oldWakenUp 为 true,因此下面的 select 方法就会阻塞,直到超时
                        // 才能执行,让 select 方法无谓阻塞
                        select(oldWakenUp);

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }
            } catch (IOException e) {
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            // ioRatio 默认是 50
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // ioRatio 为 100 时,总是运行完所有非 IO 任务
                    runAllTasks();
                }
            } else {                
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // 记录 io 事件处理耗时
                    final long ioTime = System.nanoTime() - ioStartTime;
                    // 运行非 IO 任务,一旦超时会退出 runAllTasks
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

io.netty.channel.DefaultSelectStrategy#calculateStrategy,判断是否有任务,没有任务,返回 SelectStrategy.SELECT,看若有任务则调用selectNow()拿取

@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
};

接着进入io.netty.channel.nio.NioEventLoop#select 方法

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        // 计算等待时间,没有 scheduledTask,超时时间为 1s;有 scheduledTask,超时时间为 `下一个定时任务执行时间 - 当前时间`
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            // 如果超时,退出循环
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            // 如果期间又有 task 退出循环,如果没这个判断,那么任务就会等到下次 select 超时时才能被执行
            // wakenUp.compareAndSet(false, true) 是让非 NioEventLoop 不必再执行 wakeup
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            // select 有限时阻塞
            // 注意 nio 有 bug,当 bug 出现时,select 方法即使没有时间发生,也不会阻塞住,导致不断空轮询,cpu 占用 100%
            int selectedKeys = selector.select(timeoutMillis);
            // 计数加 1
            selectCnt ++;

            // 醒来后,如果有 IO 事件、或是由非 EventLoop 线程唤醒,或者有任务,退出循环
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
            if (Thread.interrupted()) {
               	// 线程被打断,退出循环
                // 记录日志
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // 如果超时,计数重置为 1,下次循环就会 break
                selectCnt = 1;
            } 
            // 计数超过阈值,由 io.netty.selectorAutoRebuildThreshold 指定,默认 512
            // 这是为了解决 nio 空轮询 bug
            else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // 重建 selector
                selector = selectRebuildSelector(selectCnt);
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            // 记录日志
        }
    } catch (CancelledKeyException e) {
        // 记录日志
    }
}

netty通过轮训执行次数是否到达预值判断是否发生空轮训bug,采用更换 selector 的方式去解决问题!

我们回到io.netty.channel.nio.NioEventLoop#run 方法中,通过以上分析我们知道一般只有在有任务或者有事件的时候才结束阻塞。那会不会出现任务的处理时间过长而耽误事件的处理呢?

					 	// ioRatio 默认是 50
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // ioRatio 为 100 时,总是运行完所有非 IO 任务
                    runAllTasks();
                }
            } else {                
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // 记录 io 事件处理耗时
                    final long ioTime = System.nanoTime() - ioStartTime;
                    // 运行非 IO 任务,一旦超时会退出 runAllTasks
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }

ioRatio 参数就是为了控制处理io事件 和 处理任务之间的时间比例!

我们来详细看看处理io事件的 io.netty.channel.nio.NioEventLoop#processSelectedKeys 方法

private void processSelectedKeys() {
    if (selectedKeys != null) {
        // 通过反射将 Selector 实现类中的就绪事件集合替换为 SelectedSelectionKeySet 
        // SelectedSelectionKeySet 底层为数组实现,可以提高遍历性能(原本为 HashSet)
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

默认都会进入我们优化后的 io.netty.channel.nio.NioEventLoop#processSelectedKeysOptimized 方法

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys.keys[i] = null;
				// 拿到 channel
        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

拿到channel后判断是不是AbstractNioChannel,已知该类是所有Nio channel的父类。如果是则进入方法 io.netty.channel.nio.NioEventLoop#processSelectedKey,区分当前事件处理相关IO事件操作……

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    // 当 key 取消或关闭时会导致这个 key 无效
    if (!k.isValid()) {
        // 无效时处理...
        return;
    }

    try {
        int readyOps = k.readyOps();
        // 连接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // 可写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }

        // 可读或可接入事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            // 如果是可接入 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
            // 如果是可读 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/984788.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

纯源码程序的执行

QT Creator本身是个IDE安装的时候根据自己需要配置的又有对应的编译器&#xff0c;因此编写普通的程序也不再话下。 选择Non-Qt Project工程&#xff0c;并在右侧根据自己的需要选择C应用还是C应用 新工程中工程管理文件和代码如下&#xff1a; 执行结果如下

驱动开发--day2

实现三盏灯的控制&#xff0c;编写应用程序测试 head.h #ifndef __HEAD_H__ #define __HEAD_H__#define LED1_MODER 0X50006000 #define LED1_ODR 0X50006014 #define LED1_RCC 0X50000A28#define LED2_MODER 0X50007000 #define LED2_ODR 0X50007014#endif mychrdev.c #inc…

浅谈数据治理中的智能数据目录

在数字化转型的战略实施中&#xff0c;很多企业都在搭建自己的业务、数据及人工智能的中台。在同这些企业合作和交流中&#xff0c;越来越体会到数据目录是中台建设的核心和基础。为了更好地提供数据服务&#xff0c;发挥数据价值&#xff0c;用户需要先理解数据和信任数据。 企…

c高级day2(9.7)shell脚本

作业: 写一个1.sh脚本&#xff0c;将以下内容放到脚本中&#xff1a; 在家目录下创建目录文件&#xff0c;dir 在dir下创建dir1和dir2 把当前目录下的所有文件拷贝到dir1中&#xff0c; 把当前目录下的所有脚本文件拷贝到dir2中 把dir2打包并压缩为dir2.tar.xz 再把dir2…

基于docker环境的tomcat开启远程调试

背景&#xff1a; Tomcat部署在docker环境中&#xff0c;使用rancher来进行管理&#xff0c;需要对其进行远程调试。 操作步骤&#xff1a; 1.将容器中的catalina.sh映射出来&#xff0c;便于对其修改&#xff0c;添加远程调试相关参数。 注意&#xff1a;/data/produce2201…

【计算机网络】HTTP(下)

本文承接上文的代码进行改造&#xff0c;上文链接&#xff1a;HTTP上 文章目录 1. 实现网站跳转实现 自己的网站跳转 2. 请求方法(get) && 响应方法(post)GET方法POST方法GET与POST的应用场景 3. HTTP状态码在自己设计的代码中发现4043开头的状态码(重定向状态码)永久…

“交叉轮”轮融资后,哪吒汽车能否脚踏“风火轮”续写逆袭故事?

2023年的新能源汽车江湖&#xff0c;烟波浩渺的水面下暗潮汹涌。 从特斯拉年初打响降价第一枪&#xff0c;降价潮至今未见尾声。9月刚至&#xff0c;小鹏汽车、零跑汽车又推出了调价政策。 这一背景下&#xff0c;车企内卷加剧是必然。年初&#xff0c;哪吒汽车联合创始人、C…

2019 ICPC香港站 G. Game Design

Problem - G - Codeforces 问题描述&#xff1a;怪物只能在树叶子节点出生&#xff0c;向上走&#xff0c;可以花费一个值在一个节点建防御塔&#xff0c;防御塔会阻碍怪物向上走。最小花费可以让根节点无法被怪物走到的一个造塔方法算是一个方案。现给定方案数&#xff0c;让…

MaskVO: Self-Supervised Visual Odometry with a Learnable Dynamic Mask 论文阅读

论文信息 题目&#xff1a;MaskVO: Self-Supervised Visual Odometry with a Learnable Dynamic Mask 作者&#xff1a;Weihao Xuan, Ruijie Ren, Siyuan Wu, Changhao Chen 时间&#xff1a;2022 来源&#xff1a; IEEE/SICE International Symposium on System Integration …

Kubernetes(k8s)部署高可用多主多从的Redis集群

Kubernetes部署高可用多主多从的Redis集群 环境准备准备Kubernetes准备存储类 部署redis准备一个命名空间命令创建yaml文件创建&#xff08;推荐&#xff09; 准备redis配置文件准备部署statefulset的资源清单文件执行文件完成部署初始化集群 环境准备 准备Kubernetes 首先你…

【Go基础】编译、变量、常量、基本数据类型、字符串

面试题文档下链接点击这里免积分下载 go语言入门到精通点击这里免积分下载 编译 使用 go build 1.在项目目录下执行 2.在其他路径下编译 go build &#xff0c;需要再后面加上项目的路径&#xff08;项目路径从GOPATH/src后开始写起&#xff0c;编译之后的可执行文件就保存再…

chrome_elf.dll丢失怎么办?修复chrome_elf.dll文件的方法

Chrome是目前最受欢迎的网络浏览器之一&#xff0c;然而有时用户可能会遇到Chrome_elf.dll丢失的问题。该DLL文件是Chrome浏览器的一个重要组成部分&#xff0c;负责启动和管理程序的各种功能。当Chrome_elf.dll丢失时&#xff0c;用户可能无法正常启动Chrome或执行某些功能。本…

log4qt库的使用

log4qt库的使用 一,什么是log4qt?二,log4qt的下载三,如何集成log4qt?1.在vs2022中集成log4qt的方法:模块一:配置log4qt的步骤步骤一,将下好的log4qt库进行解压,然后再库文件中,新建build和Log4Qt文件夹步骤二,打开cmake,有两个填写路径的位置.步骤三,点击cmake的configure按钮…

(2023,Diffusion)稳定扩散模型是不稳定的

Stable Diffusion is Unstable 公众号&#xff1a;EDPJ&#xff08;添加 VX&#xff1a;CV_EDPJ 进交流群获取资料&#xff09; 0. 摘要 最近&#xff0c;文本到图像模型一直蓬勃发展。 尽管它们具有强大的生成能力&#xff0c;但我们的研究发现这一生成过程缺乏稳健性。 具体…

【C++】平衡二叉搜索树的模拟实现

&#x1f307;个人主页&#xff1a;平凡的小苏 &#x1f4da;学习格言&#xff1a;命运给你一个低的起点&#xff0c;是想看你精彩的翻盘&#xff0c;而不是让你自甘堕落&#xff0c;脚下的路虽然难走&#xff0c;但我还能走&#xff0c;比起向阳而生&#xff0c;我更想尝试逆风…

【数据结构】设计环形队列

环形队列是一种线性数据结构&#xff0c;其操作表现基于 FIFO&#xff08;先进先出&#xff09;原则并且队尾被连接在队首之后以形成一个循环。它也被称为“环形缓冲器”。 环形队列的一个好处是我们可以利用这个队列之前用过的空间。在一个普通队列里&#xff0c;一旦一个队列…

使用Caffeine实现帖子的缓存来优化网站的运行速度

导入依赖 <!-- https://mvnrepository.com/artifact/com.github.ben-manes.caffeine/caffeine --><dependency><groupId>com.github.ben-manes.caffeine</groupId><artifactId>caffeine</artifactId><version>3.1.7</version>…

【产品应用】一体化伺服电机在全自动咖啡研磨机中的应用

在现代社会中&#xff0c;咖啡已经成为人们生活中不可或缺的一部分。为了满足咖啡爱好者对于口感和品质的追求&#xff0c;全自动咖啡研磨机应运而生。 而其中的一体化伺服电机作为关键组件&#xff0c;发挥着重要的作用。本文将探讨一体化伺服电机在全自动咖啡研磨机中的应用&…

Magisk隐藏外挂解决方案

自2008年1.0测试版发布以来&#xff0c;安卓系统已经发展了近16年&#xff0c;凭借着优秀的开源生态&#xff0c;安卓系统飞速成长&#xff0c;已经成了当下手机系统中的龙头。据研究机构 Canalys 报告称&#xff0c;今年第一季度安卓设备的整体市场份额占比达到 78 %。 开源生…