【Netty源码系列(二)】解读EventLoopGroup

news2025/1/14 18:37:11

【Netty源码系列文章中源码出自4.1.84.Final版本】

文章目录

  • 1. EventLoopGroup接口类
  • 2. NioEventLoopGroup创建过程
    • 2.1 Executor实现机制
    • 2.2 EventLoop对象创建(newChild()方法)

本篇文章主要看一下 EventLoopGroup的源码,了解一下它的创建过程。

	EventLoopGroup bossGroup = new NioEventLoopGroup(2);
	EventLoopGroup workerGroup = new NioEventLoopGroup();

1. EventLoopGroup接口类

我们先进入EventLoopGroup类中,看一下这个类的情况。有关这个类的信息都在源码中注释了

/**
 * 这是一个接口类,继承自EventExecutorGroup 
 * 它的作用就是允许将 事件循环中每个加工过的Channel对象注册进来
 */
public interface EventLoopGroup extends EventExecutorGroup {
    /**
     * 枚举返回下一个EventLoop对象
     */
    @Override
    EventLoop next();

    /**
     * 使用EventLoop注册Channel,当注册完成时,返回通知对象ChannelFuture 
     */
    ChannelFuture register(Channel channel);

    /**
     * 使用EventLoop注册Channel,一旦注册完成,也会返回通知对象ChannelFuture,但是这里传参变为ChannelPromise,
     * 这个对象内部也维护了一个Channel对象
     */
    ChannelFuture register(ChannelPromise promise);

    /**
     * 这个方法已经弃用,上面的方法已经包含这个方法功能
     */
    @Deprecated
    ChannelFuture register(Channel channel, ChannelPromise promise);
}

ChannelFuture接口类,异步返回Channel的I/O结果。

2. NioEventLoopGroup创建过程

new NioEventLoopGroup(2)

这里传入线程数量参数,就会根据传入的值进行创建,不传时,默认按系统cpu的核数*2进行创建。

进入NioEventLoopGroup类中,一直debug,可以看到如下图所示的过程。
在这里插入图片描述
此时,我们进入super(...)这个方法中,进入到MultithreadEventLoopGroup类里面。

	 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
	     super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
	 }

这时,我们看到一个三目运算,而DEFAULT_EVENT_LOOP_THREADS的值,在这个类加载时,就已经进行赋值操作,正是获取系统可用cpu核数的2倍,如下。

    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

我们接着进入super(...)方法中,可以看到下面的this(...)方法,看一下注释,说创建一个新的实例,我们再点击进入到真正的方法中MultithreadEventExecutorGroup(...)

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        //检查参数nThreads
        checkPositive(nThreads, "nThreads");
		//创建executor对象
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
		//创建EventExecutor数组
        children = new EventExecutor[nThreads];
		//枚举将生成的EventExecutor对象放入数组中
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
            	//这里是核心,用来生成EventExecutor对象,后面会讲
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
		//枚举设置监听器
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
		//将EventExecutor对象放入一个只读集合中
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

这个方法干了这几件事,见下图。我们先整体有个了解,再对其中几个重要的步骤进行深入。

在这里插入图片描述

2.1 Executor实现机制

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

我们知道,之前参数executor传入的是null值,在这里会进行创建。

newDefaultThreadFactory() 创建默认线程工厂,并为线程设置一些属性,我们进入这个方法中。

	//MultithreadEventExecutorGroup类

    protected ThreadFactory newDefaultThreadFactory() {
        return new DefaultThreadFactory(getClass());
    }

下一步:

	//DefaultThreadFactory类,实现ThreadFactory接口

    public DefaultThreadFactory(Class<?> poolType) {
        this(poolType, false, Thread.NORM_PRIORITY);
    }

这里新增两个参数,
daemon:守护进程,默认传false
priority:优先级,默认传的是5,即Thread.NORM_PRIORITY

下一步:

    public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
        this(toPoolName(poolType), daemon, priority);
    }

toPoolName(poolType)方法,是获取对应的池名称。

下一步:

    public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
        this(poolName, daemon, priority, null);
    }

新增一个参数,
ThreadGroup:线程组,默认传null

下一步:

    public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
    	//检查池名称不为null,否则抛异常
        ObjectUtil.checkNotNull(poolName, "poolName");
		//检查优先级范围
        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
            throw new IllegalArgumentException(
                    "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
        }
		//poolId,是AtomicInteger原子对象,用来拼接线程名称前缀
		//后面对三个属性进行赋值
        prefix = poolName + '-' + poolId.incrementAndGet() + '-';
        this.daemon = daemon;
        this.priority = priority;
        this.threadGroup = threadGroup;
    }

到这里,只是对线程工厂的线程进行属性的设置;

回到开始的地方:

	executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

我们进入ThreadPerTaskExecutor这个类中,看一下。

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

这个类里,只是把刚刚设置好的线程工厂对象ThreadFactory赋值给ThreadPerTaskExecutor中的属性,这里并没有去调用execute()方法;

真正调用执行的是这个方法:newChild(executor, args)

	//MultithreadEventExecutorGroup类

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
		...
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
            	//真正去调用执行的方法
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            }
            ...
     }

2.2 EventLoop对象创建(newChild()方法)

我们点击newChild()方法,发现这个一个抽象方法,这里我们看一下它的实现方法,因为创建的是NioEventLoopGroup对象,所以我们选择NioEventLoopGroup这个类的重写方法,点进去看一下:

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        ...
        return new NioEventLoop(this, executor, selectorProvider,
                selectStrategyFactory.newSelectStrategy(),
                rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
    }

这里对之前传入的多个参数,根据不同的参数类型,进行强制转换;然后将转换后的参数传递到NioEventLoop类的构造方法中,创建一个NioEventLoop对象。我们进入它的构造方法中,看一下:

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
        super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

NioEventLoop构造方法中又调用了它的父类方法,校验参数,获取选择器并赋值。我们进入super(...)方法中。

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
        tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
    }

这里也是干了同样的事情,调用父类方法,校验参数。我们再点击super()方法。

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        //重点!!!
        this.executor = ThreadExecutorMap.apply(executor, this);
        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
        this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

到了这一步,我们会发现不一样了,又出现executor这个对象了,而且通过ThreadExecutorMap.apply(executor, this)这个方法又重新赋值给Executor,我们前面说过,之前创建Executor的时候,只是设置了一些属性参数,并没有真正去调用创建线程的方法,现在这里又出现了,会是这里吗?我们点击apply()方法进去看看。

    /**
     * Decorate the given {@link Executor} and ensure {@link #currentExecutor()} will return {@code eventExecutor}
     * when called from within the {@link Runnable} during execution.
     */
    public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
        ObjectUtil.checkNotNull(executor, "executor");
        ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
        return new Executor() {
            @Override
            public void execute(final Runnable command) {
                executor.execute(apply(command, eventExecutor));
            }
        };
    }

我们看到这里调用execute()这个方法,apply(command, eventExecutor)这个方法通过command.run()生成Runnable对象,然后执行器去调用。

到这里我们大体了解EventLoopGroup的整个创建过程,本文有误的地方,烦请留言指正,谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/10597.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

二、react的组件-state-props-setState

目标 理解组件和组件的创建、以及能够根据实际场景去划分合理的组件。理解并且能够灵活的应用组件中的state、props。可以使用组件去实现各种前端交互。 知识点 组件的定义&#xff1a;组件能够表示一块视图的结构表现以及逻辑交互&#xff0c;并且可以重复利用。如何创建组件…

【ROS】机械人开发四--ROS常用概念与Launch文件

机械人开发四--ROS常用概念与Launch文件一、ROS常用概念1.1 ROS 的结构1.2 ROS 话题通信1.3 海龟仿真器 仿真 例程二、Launch文件2.1 功能2.2 文件语法2.3 参数服务器2.4 节点分组与重命名标签一、ROS常用概念 1.1 ROS 的结构 ROS 中有一些很重要的基础概念&#xff1a;节点&…

四、ref与DOM-findDomNode-unmountComponentAtNode

目标 理解react的框架使用中&#xff0c;真实dom存在的意义。 使用真实dom和使用虚拟dom的场景。 灵活掌握并能够合理使用操作真实dom的方法。 知识点 react中提供了ref这个属性来获取原生的dom节点&#xff0c;使用方式&#xff1a;在虚拟dom中添加ref属性&#xff0c;即可…

笔试强训(三十七)

目录一、选择题二、编程题2.1 mkdir2.1.1 题目2.1.2 题解2.2 数据库连接池2.2.1 题目2.2.2 题解一、选择题 &#xff08;1&#xff09;下面关于源端口地址和目的端口地址的描述中&#xff0c;正确的是&#xff08;A&#xff09; A.在TCP/UDP传输段中&#xff0c;源端口地址和目…

从内部失衡到外部失衡-中国视角下的宏观经济

从内部失衡到外部失衡 – 潘登同学的宏观经济学笔记 文章目录从内部失衡到外部失衡 -- 潘登同学的宏观经济学笔记国际金融复习全球失衡与储蓄过剩利用拉姆齐模型进行分析数值模拟外部失衡与国际收支危机国际支付工具的作用资产价格泡沫国际收支危机亚洲金融危机中国在亚洲金融危…

智能家居项目开发准备工作

智能家居代码机构——简单工厂模式&#xff1a; 什么是设计模式&#xff1f;百度百科解释&#xff1a; 软件设计模式&#xff08;Design pattern&#xff09;&#xff0c;又称设计模式&#xff0c;是一套被反复使用、多数人知晓的、经过分类编目的、代码设计经验的总结。使用设…

好心情精神心理平台:精神疾病怎样才算「治好」?医生和患者眼中的标准不一样!

精神疾病恢复到什么程度才算「治好」了&#xff1f; 很多患者朋友认为&#xff0c;症状消失就代表病好了&#xff0c;就可以停药了。 不是我吓唬你&#xff0c;如果你见症状好转就停药&#xff0c;那病情出现反复是必然结果。 实现疾病症状的消除&#xff0c;这只是达到了「临…

[附源码]java毕业设计驾校管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

[附源码]java毕业设计基于的疫苗预约系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

详解VSCode中C++工程配置

安装MinGW-w64及环境变量配置 下载MinGW-w64 可以通过官网直接进行下载在线安装包&#xff0c;然后在本地进行安装即可&#xff08;但是基本都会由于网络超时等各种原因终止&#xff09;。 因此这里建议直接下载 MinGW-w64 库解压&#xff0c;然后在系统中指定环境变量即可。…

低资源场景下的命名实体识别

Overview 低资源下的命名实体识别主要分为两个方面&#xff0c;一种是in-domain下的N-way-K-shot类型的少样本&#xff0c;一种是cross-domain下现在资源丰富的sourc-domain上进行微调&#xff0c;之后再迁移到低资源的target-domain进一步微调。 基于prompt的方法在少样本分…

C语言学习-数组(4)

目录 思维导图&#xff1a; 1. 一维数组的创建和初始化 1.1 数组的创建 1.2 数组的初始化 1.3 一维数组的使用 1.4 一维数组在内存中的存储 2. 二维数组的创建和初始化 2.1 二维数组的创建 2.2 二维数组的初始化 2.3 二维数组的使用 2.4 二维数组在内存中的存…

C# async / await 的使用方法

目录 一、简介 二、异步等待返回结果 三、异步方法的返回类型 四、await foreach 五、Task.Delay 结束 一、简介 await 运算符暂停对其所属的 async 方法的求值&#xff0c;直到其操作数表示的异步操作完成。 异步操作完成后&#xff0c;await 运算符将返回操作的结果&a…

【Xilinx】Zynq\MPSoc\Versal不同速度等级下的ARM主频

【Xilinx】Zynq\MPSoc\Versal不同速度等级下的ARM主频一、Zynq&#xff08;A9&#xff09;二、MPSoC(A53R5)三、Versal(A72R5F)最近有很多人在选型的时候&#xff0c;问到ARM主频的问题&#xff0c;不知道去哪里找这个参数。 授人以鱼不如授人以渔&#xff0c;基本的通用方法是…

【面试题】 TypeScript 前端面试题 由浅到深

给大家推荐一个实用面试题库 1、前端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★ 地址&#xff1a;前端面试题库 基本类型介绍 1.Boolean&#xff0c;Number&#xff0c;String 声明:类型 类型对应变量 let flag:boolean true let …

【现代密码学原理】——哈希函数(学习笔记)

&#x1f4d6; 前言&#xff1a;我们在登录QQ有时会遇到密码忘记的问题&#xff0c;那么思考一下&#xff0c;为什么腾讯公司不直接把密码发还给用户而是要求设置新密码呢。其实&#xff0c;不保存密码&#xff0c;是为了更好地对密码保密&#xff0c;换言之&#xff0c;腾讯的…

力扣(LeetCode)106. 从中序与后序遍历序列构造二叉树(C++)

递归 如图&#xff0c;后序序列按照左右根遍历&#xff0c;所以根在最后。逆着后序遍历的顺序&#xff0c;按照根右左递归建树就可以复原这棵树。后序序列&#xff0c;可以确定根的位置 postrootpostrootpostroot 和根结点的值。我们在中序序列找到根结点的值&#xff0c;就确定…

《深度学习进阶 自然语言处理》第四章:Embedding层和负采样介绍

文章目录4.1 word2vec的改进一4.1.1 Embedding层4.2 word2vec的改进二4.2.1 中间层之后的计算问题4.2.2 从多分类到二分类4.2.3 负采样总结之前文章链接&#xff1a; 开篇介绍&#xff1a;《深度学习进阶 自然语言处理》书籍介绍 第一章&#xff1a;《深度学习进阶 自然语言处…

想知道有没有拍照转文字的软件?这3款工具职场人士必备

你们在工作上有没有遇到这种情况&#xff0c;就是领导突然甩一份纸质文件给你&#xff0c;并要求整理成电子版&#xff0c;供其他同事查阅。回想我当初刚踏入职场时&#xff0c;没有什么工作经验&#xff0c;只会对照着内容手动码字输出&#xff0c;但是太浪费时间了&#xff0…

湖北银行冲刺上市:不良率高于行业均值,有公司欠5亿元未能追回

撰稿|汤汤 来源|贝多财经 最近&#xff0c;湖北银行正式向A股递交申请材料&#xff0c;准备在上海证券交易所上市。 据贝多财经了解&#xff0c;湖北银行股份有限公司&#xff08;下称“湖北银行”&#xff09;于2022年11月4日在证监会预披露招股书&#xff0c;计划在上交所…