NioEventLoop介绍
NioEventLoop继承SingleThreadEventLoop,核心是一个单例线程池,可以理解为单线程,这也是Netty解决线程并发问题的最根本思路--同一个channel连接上的IO事件只由一个线程来处理,NioEventLoop中的单例线程池轮询事件队列,有新的IO事件或者用户提交的task时便执行对应的handler逻辑进行处理;
NioEventLoop循环执行三件事:
- 响应selector中的IO事件
- 检查任务队列中是否有用户提交的任务
- 检查定时任务是否到期,到期则移交至任务队列中
首先,一个NioEventLoop聚合一个selector对象,这个selector对象就是JDK NIO中的selector对象(Netty可以配置选择是否对JDK 中的selector优化,优化主要是对selectionkeys集合优化,后续详细解释),通过代码可以看到,NioEventLoop的构造器里完成了selector的创建----(具体的selector创建Netty通过继承了jdk的SelectorProvider来实现的)
,先看一下NioEventLoop的构造方法,NioEventLoop只提供了一个构造方法--
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
构造函数中主要完成了selector的创建,选择器的实现策略,任务队列的创建,
先大概说一下run()方法的逻辑--
- 调用selector.select方法获取就绪IO事件的个数
- 判断是否有task---非定时任务
- 更新下一次定时任务的执行时间
- 处理selectedKeys---处理IO事件
- 执行任务---
- 获取到期的定时任务
- 根据配置控制任务异步任务执行时间
- 空轮训问题的处理
看一下具体实现--
首先获取IO就绪IO事件的个数
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
calculateStrategy方法 --
- 如果当前有任务,则返回selectNow()方法的值---就绪的selected keys 个数
- 如果没有任务则返回-1;
然后看一下strategy为-1 的时候--
//返回-1的时候表示没有任务,此时计算
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
//如果还是没有任务就需要重新计算一下就绪IO事件的个数,所以第一步在没有任务的时候直接将strategy赋值为-1是为了给处理定时任务留机会;
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
//他的更新只是为了阻止不必要的选择器唤醒,所以lazySet的使用是可以的(没有比赛条件)
nextWakeupNanos.lazySet(AWAKE);
}
taskQueue中没有任务的时候获取定时任务中最近要生效的任务时间,然后再执行一次select方法;
之后根据ioRation(ioRation默认为50)来处理channel的IO事件和执行taskQueue中的任务;这里分三种情况:
- ioRation为100的时候,处理所有的IO事件并执行taskQueue中的所有任务;
if (ioRatio == 100) { try { if (strategy > 0) { processSelectedKeys(); } } finally { // Ensure we always run tasks. ranTasks = runAllTasks(); } }
- ioRation小于100并且有就绪的IO事件的时候,先处理所有的就绪IO事件,然后以处理IO事件的时间作为基准分配异步任务的执行时间
else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } }
- ioRation小于100且没有就绪IO事件的时候只执行一个异步任务
else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
最后,判断是否发生了select是否发生了空轮训--
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
至此,一次Eventloop循环就处理完了,总结一下---
- Eventloop是Netty运行的核心逻辑,主要处理三件事--IO读写事件,用户提交的异步任务,处理JDK中的空轮训问题;
- 核心逻辑体现在run()方法中;run()方法首先根据异步任务队列中是否有任务需要执行来决定是否需要处理定时任务;
- 如果有异步任务需要处理则同时获取就绪IO事件的个数;如果没有异步任务则计算定时任务的处理时间---处理完定时任务如果还是没有任务提交则轮询IO事件
- 根据配置控制时间执行IO事件和异步任务;