Netty核心源码分析(五)核心组件EventLoop源码分析

news2024/11/25 2:22:20

文章目录

  • 系列文章目录
  • 一、EventLoop源码分析
    • 1、NioEventLoop源码
    • 2、EventLoop的父接口SingleThreadEventExecutor
      • (1)addTask方法
      • (2)startThread方法
    • 3、NioEventLoop的run方法(核心!)
      • (1)select
    • 4、小结

系列文章目录

Netty核心源码分析(一),Netty的Server端启动过程源码分析
Netty核心源码分析(二),Netty的Server端接收请求过程源码分析
Netty核心源码分析(三)业务请求执行关键——ChannelPipeline、ChannelHandler、ChannelHandlerContext源码分析
Netty核心源码分析(四)心跳检测源码分析
Netty核心源码分析(五)核心组件EventLoop源码分析

一、EventLoop源码分析

之前我们简单分析过NioEventLoopGroup的源码。今天我们分析一下EventLoop执行的源码。

1、NioEventLoop源码

首先我们分析一下类继承关系图:
在这里插入图片描述
(1)ScheduledExecutorService接口表示是一个定时任务接口,EventLoop可以接受定时任务。
(2)EventLoop接口:一旦Channel注册了,就处理该Channel对应的所有IO操作。
(3)SingleThreadEventExecutor接口表示这是一个单线程的线程池。
(4)EventLoop是一个单例的线程池,里面含有一个死循环的线程不断地做着三件事情:监听端口、处理端口事件、处理队列事件。每个EventLoop都可以绑定多个Channel,而每个Channel始终只能由一个EventLoop来处理。

2、EventLoop的父接口SingleThreadEventExecutor

SingleThreadEventExecutor是一个单线程的线程池,其中包含着execute方法是EventLoop使用的源头:

// io.netty.util.concurrent.SingleThreadEventExecutor#execute
@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
	// 是否是当前线程
    boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) { // 如果该EventLoop的线程不是当前线程
        startThread(); // 开启线程
        if (isShutdown() && removeTask(task)) {
        	// 如果线程已经停止,并且删除任务失败,执行拒绝策略,默认是抛出异常RejectedExecutionException
            reject();
        }
    }
	// 如果addTaskWakesUp是false,并且任务不是NonWakeupRunnable类型的,尝试唤醒selector,这个时候,阻塞在selector的线程会立即返回
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

(1)addTask方法

// io.netty.util.concurrent.SingleThreadEventExecutor#addTask
protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (!offerTask(task)) {
        reject(task);
    }
}
// io.netty.util.concurrent.SingleThreadEventExecutor#offerTask
final boolean offerTask(Runnable task) {
    if (isShutdown()) {
        reject();
    }
    return taskQueue.offer(task);
}

(2)startThread方法

首先判断是否启动过了,保证EventLoop只有一个线程,如果没有启动过,尝试使用CAS将state状态改为ST_STARTED,然后调用doStartThread启动

// io.netty.util.concurrent.SingleThreadEventExecutor#startThread
private void startThread() {
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            try {
            	// 启动
                doStartThread();
            } catch (Throwable cause) {
            	// 异常回滚
                STATE_UPDATER.set(this, ST_NOT_STARTED);
                PlatformDependent.throwException(cause);
            }
        }
    }
}
// io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
private void doStartThread() {
    assert thread == null;
    // executor就是创建EventLoopGroup时创建的ThreadPerTaskExecutor类,将runnable包装秤Netty的FastThreadLocalThread
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            // 判断中断状态
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            // 设置最后一次的执行时间
            updateLastExecutionTime();
            try {
            	// this就是NioEventLoop,执行NioEventLoop的run方法,这个方法是个死循环,也是整个EventLoop的核心!
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
            	// 使用CAS不断修改state状态,改成ST_SHUTTING_DOWN 
                for (;;) {
                    int oldState = state;
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }

                // Check if confirmShutdown() was called at the end of the loop.
                if (success && gracefulShutdownStartTime == 0) {
                    logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                            SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                            "before run() implementation terminates.");
                }

                try {
                    // Run all remaining tasks and shutdown hooks.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                    	// cheanup
                        cleanup();
                    } finally {
                    	// 修改状态ST_TERMINATED
                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        threadLock.release();
                        if (!taskQueue.isEmpty()) {
                            logger.warn(
                                    "An event executor terminated with " +
                                            "non-empty task queue (" + taskQueue.size() + ')');
                        }
						// 回调terminationFuture方法
                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });
}

3、NioEventLoop的run方法(核心!)

该方法是一个死循环,也是整个NioEventLoop的核心!

从源码我们可以看出,run方法总共做了三件事:
(1)select获取感兴趣的事件。
(2)processSelectedKeys处理事件。
(3)runAllTasks执行队列中的任务。

// io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                	// select
                    select(wakenUp.getAndSet(false));

                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                	// 处理select keys
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

(1)select

大致的逻辑就是:调用NIO的selector的select方法,默认阻塞一秒钟,如果有定时任务,则在定时任务剩余时间的基础上再加上0.5秒进行阻塞。当执行execute方法的时候,也就是添加任务的时候,会唤醒selector,防止selector阻塞时间过长。

// io.netty.channel.nio.NioEventLoop#select
private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
            // Selector#wakeup. So we need to check task queue again before executing select operation.
            // If we don't, the task might be pended until select operation was timed out.
            // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
			// 阻塞给定时间,默认一秒
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;
			// 如果有返回值||select被用户唤醒||任务队列有任务||有定时任务即将被执行,则跳出循环
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // - Selected something,
                // - waken up by user, or
                // - the task queue has a pending task.
                // - a scheduled task is ready for processing
                break;
            }
            if (Thread.interrupted()) {
                // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                // As this is most likely a bug in the handler of the user or it's client library we will
                // also log it.
                //
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis elapsed without anything selected.
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                        "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                        selectCnt, selector);

                rebuildSelector();
                selector = this.selector;

                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                        selectCnt - 1, selector);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                    selector, e);
        }
        // Harmless exception - log anyway
    }
}

4、小结

每次执行 ececute 方法都是向队列中添加任务。当第一次添加时就启动线程,执行 run 方法,而 run 方法是整个 EventLoop 的核心,就像 EventLoop 的名字一样,Loop Loop ,不停的 Loop ,Loop 做什么呢?做3件事情。

  • 调用 selector 的 select 方法,默认阳塞一秒钟,如果有定时任务,则在定时任务剩余时间的基础上在加上0.5秒进行阻塞。当执行 execute 方法的时候,也就是添加任务的时候,唤醒 selecor,防止 selector 阻塞时间过大。
  • 当 selector 返回的时候,会调用 processSelectedKeys 方法对 selectKey 进行处理。
  • 当 processSelectedKeys 方法执行结束后,则按照 ioRatio 的比例执行 runAlITasks 方法,默认是 IO 任务时间和非IO 任务时间是相同的,你也可以根据你的应用特点进行调优 。比如 非IO任务比较多,那么你就将ioRatio 调小一点,这样非 IO 任务就能执行的长一点。防止队列积攒过多的任务。

此时,下图红圈部分源码我们分析完毕。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/460202.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网口通讯与串口通讯

目录 一、简介以及数据格式&#xff1a; 二、网口通讯与串口通讯主要区别&#xff1a; 三、工具小助手&#xff1a; 一、简介以及数据格式&#xff1a; 网口通讯&#xff08;Ethernet&#xff09;和串口通讯&#xff08;Serial&#xff09;都是用于数据传输的通信协议。 1、…

Netty简介

1.Netty是什么? 1>.Netty是由JBOSS提供的一个Java开源框架,现在为Github上的独立项目; 2>.Netty是一个异步的,基于事件驱动的网络应用框架,用于快速开发高性能的,高可靠的网络IO程序; 如图:异步与同步 说明: 同步: 在传统的BS开发模式中(左图),浏览器端发送一个请求…

接口策略PBR

实验原理 接口策略路由只对转发的报文起作用,对本地下发的报文(比如本地的Ping报文)不起作用,接口策略路由通过在流行为中配置重定向实现,只对接口入方向的报文生效。缺省情况下,设备按照路由表的下一跳进行报文转发,如果配置了接口策略路由,则设备按照接口策略路由指…

Flowable 流程定义(流程模板)的部署及设计的数据库表

一.简介 我们使用了 Spring Boot 之后&#xff0c;默认情况下流程是会自动部署的&#xff0c;基本上不需要我们额外做什么事情&#xff0c;我们称之为默认部署。 有的时候&#xff0c;我们的流程可能并不是提前设计好的&#xff0c;而是项目启动之后&#xff0c;动态部署的&am…

Verilog阻塞与非阻塞赋值详解

基本概念 关于阻塞赋值&#xff08;&#xff09;和非阻塞赋值&#xff08;<&#xff09;&#xff0c; 阻塞赋值&#xff08;&#xff09;是顺序敏感的&#xff0c;非阻塞赋值&#xff08;<&#xff09;是顺序独立的。阻塞赋值按它们在程序块中列出的顺序顺序执行。当它们…

分库分表,shardingJdbc和Mycat区别

shardingJdbc和Mycat都可以用来分库分表 MyCatshardingJdbc本质第三方应用,中间件代理层jar包是否需要修改代码否是可跨数据库否是是否跨语言是否性能下架&#xff0c;因为多了一层好 sharding-jdbc后续发展为Sharding-Sphere&#xff0c;包含sharding-jdbc、Sharding-Proxy、…

C. Painting the Fence(思维 + 前缀和)

Problem - C - Codeforces You需要油漆一个由n个部分组成的长围栏。不幸的是&#xff0c;它没有被涂漆&#xff0c;所以你决定雇用q名画家来完成这项工作。第i名画家将会油漆所有满足lisxsri的部分x. 不幸的是&#xff0c;你的预算很紧&#xff0c;所以你只能雇用q-2名画家。显…

Java线程池详解,内含实战演练~

本文是向大家介绍线程池的使用和一些注意事项&#xff0c;它能够实现高并发下快速处理业务&#xff0c;能够帮助开发人员深入理解线程池的价值。 1. 简介 线程池是使用池化技术管理和使用线程的一种机制。池化技术&#xff1a;提前准备一些资源&#xff0c;在需要时可以重复使…

200颗卫星!武大“东方慧眼”星座项目发布

本文转自武汉大学官微 4月24日&#xff0c;是“中国航天日”&#xff0c;“东方慧眼”智能遥感星座项目在武汉大学宣布正式启动。 针对当前我国卫星遥感存在“成本高、效率低、不稳定、应用少”等诸多问题&#xff0c;“东方慧眼”智能遥感卫星星座计划通过卫星星座组网观测、…

【环境配置】Window上Git clone 如何提高速度

步骤一&#xff1a;得到ip 在下列网站上 https://www.ipaddress.com/ 分别搜索&#xff1a; github.global.ssl.fastly.netgithub.com然后记录得到的IP地址 步骤二&#xff1a; 修改host 在Windows中&#xff0c;先进入&#xff1a;C:\Windows\System32\drivers\etc 目录 …

MySQL: 数据类型之整数型、浮点数、时间日期

目录 前言&#xff1a; 数据类型&#xff1a; 整数型&#xff1a; 浮点数与定点数&#xff1a; 浮点数&#xff1a; 定点数&#xff1a; 日期与时间&#xff1a; DATATIME: DATE&#xff1a; TIMESTAMP: ​编辑 YEAR: TIME: 前言&#xff1a; 前面的几篇写了如何创…

css-设置单行文本溢出省略号,使用overflow:hidden属性之后的出现的问题几解决办法。

1 设置单行文本溢出后出现省略号 必要&#xff1a;需要设置固定宽度&#xff0c;不允许换行 width: 200px; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; display: -webkit-box; -webkit-line-clamp: 1; -webkit-box-orient: vertical; 2 设置N行文本…

每天一道大厂SQL题【Day24】华泰证券真题实战(六)

文章目录 每天一道大厂SQL题【Day24】华泰证券真题实战(六)每日语录第24题&#xff1a;需求列表 思路分析答案获取加技术群讨论文末SQL小技巧 后记 每天一道大厂SQL题【Day24】华泰证券真题实战(六) 大家好&#xff0c;我是Maynor。相信大家和我一样&#xff0c;都有一个大厂梦…

如何生成以及校验token

1️⃣ What is token&#xff1f; token是令牌的意思&#xff0c;作用就像“通关令牌”一样&#xff0c;持有token的请求会被“放行”&#xff0c;不持有token的请求可以被拦截&#xff08;可以设置白名单使不被拦截&#xff0c;例如登陆请求&#xff09;。 token是由…

Dockere-Compose迁移Gitea部署

Dockere-Compose迁移Gitea部署 ps: 江湖不是打打杀杀&#xff0c;江湖是人情事故。 解释&#xff1a; Gitea&#xff1a;类似于Git的代码版本管理工具。Docker&#xff1a;Docker-Compose&#xff1a; Docker命令&#xff1a; 查看镜像&#xff1a;docker images 删除镜像…

web自动化测试入门篇07 ——— 用例编写技巧

&#x1f60f;作者简介&#xff1a;博主是一位测试管理者&#xff0c;同时也是一名对外企业兼职讲师。 &#x1f4e1;主页地址&#xff1a;【Austin_zhai】 &#x1f646;目的与景愿&#xff1a;旨在于能帮助更多的测试行业人员提升软硬技能&#xff0c;分享行业相关最新信息。…

idea中导入spring源码错误during working with external system: java.lang.AssertionError

标题:idea中导入spring源码错误during working with external system: java.lang.AssertionError 1.Spring源码编译环境 spring 5.3.1 JDK环境:1.8 Spring版本:5.3.1版本 开发工具:IntelliJ IDEA 2019.10 编译工具:Gradle-6.4-bin 操作系统:windows 10 注&#xff1a; sprin…

分支和循环语句(2)

文章目录 3.2 for循环3.2.1 for语句的语法3.2.2 for循环中的break和continue3.2.3 for语句的循环控制变量3.2.4 一些for循环的变种3.2.5 一道笔试题 3.3 do while循环3.3.1 do语句的语法3.3.2 do语句的特点3.3.3 do while循环中的break和continue 3.4 练习3.4.1 计算 n的阶乘3.…

数据可视化工具汇总:数字孪生产品的得力助手

数字孪生技术是一项快速发展的新兴技术&#xff0c;已经在许多领域得到广泛应用。数字孪生技术不仅可以提供完整的虚拟模型&#xff0c;还可以模拟物理系统的行为。在数字孪生技术的推动下&#xff0c;越来越多的数字孪生产品开始涌现出来&#xff0c;为不同的领域提供支持和解…

2023年保理产品研究报告

第一章 行业概况 1.1 概述 保理&#xff08;Factoring&#xff09;&#xff0c;全称保付代理&#xff0c;又称托收保付&#xff0c;是一种通过将企业的应收账款出售给专业金融机构&#xff08;保理公司&#xff09;来获得资金的融资方式。保理业务通常包括两种类型&#xff1…