Netty中NioEventLoop介绍

news2025/1/18 5:29:11

一、Netty基本介绍

        Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty 在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

        Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

        本文主要介绍Netty中的核心类之一的:NioEventLoop类。

二、NioEventLoop继承体系

 三、EventLoop相关接口

        我们先看右边的接口, 部分接口我们在上一篇以及介绍过了,可以看Netty中NioEventLoopGroup介绍,我们从OrderedEventExecutor开始往下看。

一、OrderedEventExecutor

public interface OrderedEventExecutor extends EventExecutor {
}

         OrderedEventExecutor继承了EventExecutor,即拥有线程相关的操作声明。

二、EventLoop

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    EventLoopGroup parent();
}

        EventLoop继承了OrderedEventExecutor和EventLoopGroup接口,即拥有线程相关操作即事件循环组的相关行为声明。但并未声明新的接口。

四、EventLoop相关实现

        在介绍EventLoop的实现之前,我们需要了解一些内容:

  1. EventLoop 定义了Netty的核心抽象,用来处理连接的生命周期中所发生的事件,在内部,将会为每个Channel分配一个EventLoop。
  2. EventLoopGroup 是一个 EventLoop 池,包含很多的 EventLoop。
  3. Netty 为每个 Channel 分配了一个 EventLoop,用于处理用户连接请求、对用户请求的处理等所有事件。EventLoop 本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个 Channel 的所有 IO 事件。
  4. 一个 Channel 一旦与一个 EventLoop 相绑定,那么在 Channel 的整个生命周期内是不能改变的。一个 EventLoop 可以与多个 Channel 绑定。即 Channel 与 EventLoop 的关系是 n:1,而 EventLoop 与线程的关系是 1:1。

         上一篇EventLoopGroup的介绍里写到了MultithreadEventExecutorGroup的初始化和创建EventExecutor

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
	if (nThreads <= 0) {
		throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
	}
 
	if (executor == null) {
		executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
	}
 
	// 初始化线程组
	children = new EventExecutor[nThreads];
 
	for (int i = 0; i < nThreads; i ++) {
		boolean success = false;
		try {
			// 将线程和传入的executor做一个绑定
			// 注意:这里线程组每个元素都绑定了同一个executor
            // newChild是一个抽象方法,依赖子类实现
			children[i] = newChild(executor, args);
			success = true;
		} catch (Exception e) {
			// TODO: Think about if this is a good exception type
			throw new IllegalStateException("failed to create a child event loop", e);
		} finally {
			// 失败执行策略
			if (!success) {
				for (int j = 0; j < i; j ++) {
					children[j].shutdownGracefully();
				}
 
				for (int j = 0; j < i; j ++) {
					EventExecutor e = children[j];
					try {
						while (!e.isTerminated()) {
							e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
						}
					} catch (InterruptedException interrupted) {
						// Let the caller handle the interruption.
						Thread.currentThread().interrupt();
						break;
					}
				}
			}
		}
	}
 
	// 初始化一个EventExecutor选择工厂,轮询获取EventExecutor,chooserFactory的默认实现是DefaultEventExecutorChooserFactory
	// next()方法依赖chooser实现
	chooser = chooserFactory.newChooser(children);
 
	// 声明线程终止的监听器
	final FutureListener<Object> terminationListener = new FutureListener<Object>() {
		@Override
		public void operationComplete(Future<Object> future) throws Exception {
			if (terminatedChildren.incrementAndGet() == children.length) {
				terminationFuture.setSuccess(null);
			}
		}
	};
 
	// 将监听器绑定到线程组的每个线程中
	for (EventExecutor e: children) {
		e.terminationFuture().addListener(terminationListener);
	}
 
	// 初始化线程集合(只读)
	Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
	Collections.addAll(childrenSet, children);
	readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

        创建EventExecutor

// 创建EventLoop对象,并绑定executor
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
	return new NioEventLoop(this, executor, (SelectorProvider) args[0],
		((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

         我们在这里接着往下跟NioEventLoop

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
	// 初始化
	super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
	if (selectorProvider == null) {
		throw new NullPointerException("selectorProvider");
	}
	if (strategy == null) {
		throw new NullPointerException("selectStrategy");
	}
	provider = selectorProvider;
	// 创建一个selector的二元组
	final SelectorTuple selectorTuple = openSelector();
	selector = selectorTuple.selector;
	unwrappedSelector = selectorTuple.unwrappedSelector;
	selectStrategy = strategy;
}

        进入super方法

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
	super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
	// 创建收尾队列
	tailTasks = newTaskQueue(maxPendingTasks);
}

        接着进入super

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
	super(parent);
	this.addTaskWakesUp = addTaskWakesUp;
	this.maxPendingTasks = Math.max(16, maxPendingTasks);
	// 初始化子线程
	this.executor = ThreadExecutorMap.apply(executor, this);
	taskQueue = newTaskQueue(this.maxPendingTasks);
	rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

        进入ThreadExecutorMap.apply方法

public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
	ObjectUtil.checkNotNull(executor, "executor");
	ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
	return new Executor() {
		@Override
		public void execute(final Runnable command) {
			// 这里调用了NioEventLoopGroup所包含的executor的execute()
			executor.execute(apply(command, eventExecutor));
		}
	};
}

        进入apply方法

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
	ObjectUtil.checkNotNull(command, "command");
	ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
	// 这里包装了一个runnable,记录当前执行线程,并在执行完成后删除
	return new Runnable() {
		@Override
		public void run() {
			setCurrentEventExecutor(eventExecutor);
			try {
				command.run();
			} finally {
				setCurrentEventExecutor(null);
			}
		}
	};
}

        我们再回去看看NioEventLoop的openSelector()方法,我们先了解下SelectorTuple类

private static final class SelectorTuple {
	final Selector unwrappedSelector;
	final Selector selector;

	SelectorTuple(Selector unwrappedSelector) {
		this.unwrappedSelector = unwrappedSelector;
		this.selector = unwrappedSelector;
	}

	SelectorTuple(Selector unwrappedSelector, Selector selector) {
		this.unwrappedSelector = unwrappedSelector;
		this.selector = selector;
	}
}

        SelectorTuple 只是一个包含两个 Selector 的内部类,用于封装优化前后的 Selector。而 openSelector() 方法就是为了返回 Selector 并且根据配置判断是否需要优化当前 Selector 。下面看具体代码:

private SelectorTuple openSelector() {
	final Selector unwrappedSelector;
	try {
		// 根据provider创建出个NIo的原生selector
		unwrappedSelector = provider.openSelector();
	} catch (IOException e) {
		throw new ChannelException("failed to open a new selector", e);
	}
	// 若禁用了keyset优化功能,则直接返回NIo原生的selector,优化就是将selector中的三个set集合变为三个数组
	// 因为数组是顺序存放的,要比随机存放的集合执行效率高
	if (DISABLE_KEY_SET_OPTIMIZATION) {
		return new SelectorTuple(unwrappedSelector);
	}

	// 此处优化逻辑省略...
}

        NioEventLoop的父类是一个Executor,所以我们在看看execute()方法:

@Override
public void execute(Runnable task) {
	if (task == null) {
		throw new NullPointerException("task");
	}
	// 判断当前线程是不是EventLoop中成员变量的executor线程
	boolean inEventLoop = inEventLoop();
	// 将任务添加到队列
	addTask(task);
	if (!inEventLoop) {
		// 启动线程(成员变量中的execute)
		startThread();
		if (isShutdown()) {
			boolean reject = false;
			try {
				if (removeTask(task)) {
					reject = true;
				}
			} catch (UnsupportedOperationException e) {
				// The task queue does not support removal so the best thing we can do is to just move on and
				// hope we will be able to pick-up the task before its completely terminated.
				// In worst case we will log on termination.
			}
			if (reject) {
				reject();
			}
		}
	}

	if (!addTaskWakesUp && wakesUpForTask(task)) {
		wakeup(inEventLoop);
	}
}

总结:

        NioEventLoopGroup创建CUP核心数两倍的EventLoop数组,NioEventLoopGroup内部还包含了一个Executor成员变量。

        随后对EventLoop数组进行初始化,传入NioEventLoopGroup的executor成员变量,EventLoop内部也有一个executor成员变量,EventLoop对内部的executor变量进行初始化,并在其executor的execute()方法调用NioEventLoopGroup的成员变量executor的execute()方法。

        也就是说EventLoop的executor调用execute()方法的时候,会调用NioEventLoopGroup的Executor的execute方法来执行具体的操作。

        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/597465.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

为什么 AIGC 和大模型创业者都在安利向量数据库?

从目前 VC 的投资数据来看&#xff0c;大家对 AI 的关注点主要有三个&#xff1a;一个是基础大模型 LLM&#xff0c;第二个是具体某个场景的应用&#xff08;包括小模型&#xff09;&#xff0c;第三个就属基础模型与应用层之间的中间层了&#xff08;开发者工具和数据库等&…

AI视频成工具D-ID介绍(AI数字人常用工具)

Studio D-id&#xff0c;​一个AI视频成工具&#xff0c;用AI创作的数字人,输入人物关键词就可一键生成人物&#xff0c;上传要说的文本或音频则可以开口说话了。 关于AI语音&#xff0c;有很多选项可供选择。目前&#xff0c;Studio D-id支持多达66种语言。然后&#xff0c;您…

蚂蚁集团发布2022可持续发展报告:科研投入204.6亿,实现3年翻倍

蚂蚁集团6月1日对外发布《2022年可持续发展报告》&#xff08;下简称‘报告’&#xff09;&#xff0c;披露2022年度相关工作进展。《报告》显示&#xff0c;2022年&#xff0c;蚂蚁持续攻坚前沿和底层数字科技&#xff0c;年度科研投入204.6亿元。较2019年&#xff0c;科研投入…

19c rac部署-ssh版本太高的问题

客户有个环境需要部署19c&#xff0c;安装的oracle linux 7.9.&#xff0c;OpenSSH_7.4p1版本适合安装19c集群&#xff0c;但接到环境时&#xff0c;发现openssh已升级到9.0了&#xff0c;理由是等保需要&#xff0c;哎&#xff0c;不抱怨自己解决问题 [INS-32070] Could not r…

外贸网站建设中经常遇到的问题有哪些?

企业外贸网站是企业展示自我&#xff0c;提高品牌知名度的重要工具。在外贸网站建设中&#xff0c;我们也会遇到各种问题&#xff0c;例如&#xff0c;外贸网站设计、 SEO优化、制作周期以及质量等&#xff0c;都是我们在建设外贸网站中需要考虑的问题。 建设一个外贸网站需要多…

BFT最前线 | iOS版ChatGPT周下载突破50万人次;英伟达市值突破万亿美元创造芯片公司历史;华为开发者大会即将发布

原创 | 文 BFT机器人 AI视界 TECHNOLOGY NEWS 01 联合国教育部聚焦生成式AI 联合国科教文组织提出发展路线图 近日&#xff0c;为应对生成式人工智能技术的迅速发展&#xff0c;联合国教科文组织就此议题召开了首次全球教育部长会议。40多位部长分享了将这些工具融入教育的…

10.ES6模块化规范(关键字 import,from,as,export的用法)

导入其他模块成员要使用关键字 import &#xff0c;导出需要使用关键字 export 我们明确一个概念&#xff0c;只有js与js之间需要使用import与export&#xff0c;如果是在html中引入js是不需要用import的&#xff0c;你导入的方式是直接srcxxx.js 目录 1 默认导入导出 2 …

CMake构建Makefile深度解析:从底层原理到复杂项目

CMake构建深度解析&#xff1a;从底层原理到复杂项目实践 一、CMake构建后的项目结构解析&#xff08;Analysis of the Project Structure After CMake Build&#xff09;1.1 CMake构建后的目录结构&#xff08;Directory Structure After CMake Build&#xff09;1.2 构建生成…

分布式锁的应用场景与分布式锁实现(一):传统锁处理并发及传统锁的问题

分布式锁 代码已同步至GitCode&#xff1a;https://gitcode.net/ruozhuliufeng/distributed-project.git ​ 在应用开发中&#xff0c;特别是Web工程开发&#xff0c;通常都是并发编程&#xff0c;不是多进程就是多线程。这种场景下极其容易出现线程并发性问题&#xff0c;此时…

新能源汽车充电桩的建设及优化分析

安科瑞虞佳豪 新能源汽车充电桩在经历了几年的发展之后&#xff0c;总体情况是在持续走好的&#xff0c;并且充电桩的建设相较于以往有了很大的普及度和安全度&#xff0c;这对新能源汽车车主是一个好事&#xff0c;也鼓励了更多人选择买新能源汽车&#xff0c;但这并不是说新…

HTTP劫持是什么?如何防止网站被劫持呢?

HTTP劫持&#xff08;HTTP hijacking&#xff09;是一种网络攻击技术&#xff0c;攻击者通过各种手段截取用户的HTTP请求或响应&#xff0c;篡改其内容或重定向到恶意服务器&#xff0c;从而实施恶意活动。这种攻击可能导致用户信息泄露、身份盗窃、篡改网页内容或植入恶意代码…

鼎盛合充气泵方案——便携车载充气泵方案

便携车载充气泵主要使用在汽车轮胎充气及车胎检测上&#xff0c;是一个气压精度测量产品。充气泵方案则是通过马达运转工作而进行设计&#xff0c;利用芯片和气压传感器所做的一个智能化便携车载充气泵方案。 便携车载充气泵方案的使用范围其实不仅仅是汽车轮胎&#xff0c;它在…

Android Studio Flamingo编译项目问题记录

系统版本&#xff1a;macOS 13.4 Android Studio Flamingo | 2022.2.1 Patch 2 下载地址&#xff1a;Download Android Studio & App Tools - Android DevelopersAndroid Studio provides app builders with an integrated development environment (IDE) optimized for …

protobuf笔记

protoc -Ipb/protos -Ipb/protos/third/github.com pb/protos/custom/*.proto -I 指定需要import的gogo.proto文件路径&#xff0c; protoc查找过程为 -I后面的路径和import的路径拼接在一起。 -Ipb/protos 指定proto源文件路径-Ipb/protos/third/github.com 指定第三方proto&…

el-select如何改变样式 (:popper-append-to-body=“false“)

在使用el-select的时候&#xff0c;其样式会按照Elementui自带的默认样式为基准&#xff1b; 但往往开发过程中&#xff0c;下拉框的样式可能并不是我们想要的&#xff1b;这是我遇到过的一个案例&#xff0c;开发需求上与elementui默认样式大相径庭&#xff1b; 如何进行修改呢…

JAVA基础 - CLASSLOADER双亲委派机制?

类的生命周期 在JAVA中数据类型分为基本数据类型和引用数据类型。基本数据类型&#xff0c;由虚拟机预先定义&#xff0c;引用数据类型则需要进行类加载。 JAVA将引用数据类型分为&#xff1a;类、接口、数组和泛型参数&#xff0c;而「泛型参数」在编译时期会被擦除&#xff…

web前端 --- javascript(01)-- 介绍、变量和数据类型

JavaScript w3c&#xff1a;三层分离 结构层&#xff1a;HTML 表示层&#xff1a;CSS 行为层&#xff1a;JavaScript 介绍 &#xff08;1&#xff09;作用&#xff1a; 数据校验网页特效数据交互服务器端编程&#xff08;NodeJS&#xff09; &#xff08;2&#xff09;javas…

开源赋能 普惠未来|UBSICE诚邀您参与2023开放原子全球开源峰会

UBSICE&#xff08;Unified Basic Service Infrastructure Community Edition&#xff09;是一个轻量级“面向领域”的高可用、高性能、业务连续性的微服务架构技术底座。UBSICE特有的“微服务容器”不仅是一个微服务的运行容器&#xff0c;还通过“容器控制器”管理其他微服务…

Linux超全整理Linux性能分析工具汇总

出于对Linux操作系统的兴趣&#xff0c;以及对底层知识的强烈欲望&#xff0c;因此整理了这篇文章。本文也可以作为检验基础知识的指标&#xff0c;另外文章涵盖了一个系统的方方面面。如果没有完善的计算机系统知识&#xff0c;网络知识和操作系统知识&#xff0c;文档中的工具…

大数据存储方式有哪些?

写在前面 本文隶属于专栏《大数据从 0 到 1》&#xff0c;该专栏为笔者原创&#xff0c;引用请注明来源&#xff0c;不足和错误之处请在评论区帮忙指出&#xff0c;谢谢&#xff01; 本专栏目录结构和文献引用请见《大数据从 0 到 1》 正文 数据常用的存储介质为磁盘和磁带。…