源码解析目标
- 分析最核心组件EventLoop在Netty运行过程中所参与的事情,以及具体实现
源码解析
- 依然用netty包example下Echo目录下的案例代码,单我们写一个NettyServer时候,第一句话就是 EventLoopGroup bossGroup = new NioEventLoopGroup(1);,我们先来看看NioEventLoop的UML图
- 首先我们看到ScheduledEecutorService接口,这个接口是concurrent包下的一个定时任务接口,EventLoop实现了这个接口,因此可以接受定时任务,所以我们在Debug的时候,能在EventLoop中找到一个scheduledTaskQueue
- EventLoop接口我们看下源码,如下,从注释中我们了解到,EventLoop中一旦注册了Channel,就会处理该Channel对应的所有I/O操作
/**
* Will handle all the I/O operations for a {@link Channel} once registered.
*
* One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on
* implementation details and internals.
*
*/
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}
- SingleThreadEventExecutor 也是一个比较重要的类,看源码注释,说明了SingleThreadEventExecutor 是一个单个线程的线程池
/**
* Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
*
*/
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {......}
- 在SingleThreadEventExecutor 类中实现了很多对线程池的操作,例如runAllTask,executer,takeTask,pollTask,看下其中一个构造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
//其中taskQueue初始化
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
- 如上,SingleThreadEventExecutor 队列中元素是实现了Runnable接口的对象,线程池中最重要的方法当然是executer方法,EventLoop是SingleThreadEventExecutor 的子类,那么EventLoop 类也可以直接调用executer方法来完成对事件的执行,我们来看源码
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
- inEventLoop(); 首先判断EventLoop中线程是否是当前线程,如果是,则直接将task添加到线程池队列中
- 如果不是则尝试启动一个线程(因为是单个线程的线程池,所以只能且只需要启动一次),之后在将任务添加到队列中去
- isShutdown() && removeTask(task)) 中逻辑 如果线程已经停止并且删除任务失败,则直接拒绝策略
- 接着看下addTask的实现
/**
* Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
* before.
*/
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
- 从注释中可以看出,addTask方法会添加一个task任务到队列中,如果当前线程是shutdown的状态,那么直接抛出异常RejectedExecutionException
- 接着来看executer方法中的startThread(); ,当我们判断当前线程不是EventLoop中的线程的时候会执行这个方法,他是NioEventLoop中的核心方法,如下源码
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
-
state == ST_NOT_STARTED 首先通过状态判断是否执行过,保证EventLoop只有一个线程
-
如果没有启动 用cas的方式STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED) 去修改状态为ST_STARTED,直接调用doStartThread方法
-
如果失败就回滚
-
接着分析doStartThread方法,首先会调用Executor的execute方法,这个Executor 是我们在创建EventLoopGroup时候创建的是一个ThreadPerTaskExecutor类,如下图是在channel中对应的EventLoop找到的对象信息,该execute方法会将Runable包装成Netty的FastThreadLocalThread
- 接着通过Thread.currentThread() 判断线程是否中断
- updateLastExecutionTime(); 然后设置最后一次执行的事件
- 核心方法是:SingleThreadEventExecutor.this.run(); 执行单曲NioEventLoop的run方法,等会重点关注
- 接着完成run方法的事物处理后,在finally中使用cas不断的修改state状态,设置为ST_SHUTTING_DOWN,也就是当loop中run方法结束运行后,关闭线程,最后还会通过不断轮询来二次确认是否关闭,否则不会break跳出
- 接下来分析EventLoop中的Run方法,我们进入Run方法,就到了我们之前分析过的NioEventLoop中的run方法,此方法做了三件事情,如下源码
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
-
NioEventLoop 中的loop轮询是依靠run方法来执行的,在方法中可以看到是一个for循环其中三件事情,如下图中EventLoop部分
- case SelectStrategy.SELECT: 当事件类似是SELECT 时候, 通过select(wakenUp.getAndSet(false));方法获取感兴趣的事件
- processSelectedKeys(); 处理选中的事件
- runAllTasks 执行队列中的任务。
-
上图不管是bossGroup还是WorkerGroup中的EventLoop都是次run方法执行流程
-
select方法实现
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
}
}
-
关注点在于 select方法如何体现出非阻塞,如下图中,选择器获取对应事件debug
-
在select中传如参数是1 秒,也就是默认情况下阻塞1秒中,具体的算法: long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
-
其中 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 表示当前时间 - 定时任务的时间,那么timeoutMillis 意思就是,当有定时任务的时候 delayNanos(currentTimeNanos) 时间就部位空,那么定时任务剩余时间 t +0.5秒阻塞的时间,否则就默认1秒中阻塞时间
-
接着判断: if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks())
- 如果1秒(或者t+0.5)后能获取到selectedKeys :selectedKeys != 0
- 或者select被用户唤醒 :oldWakenUp, wakenUp.get()
- 或者任务队列中有任务存在 : hasTasks()
- 或者有定时任务即将被执行 : hasScheduledTasks()
-
有以上任何情况则跳出循环,否则继续沦陷,直到满足其中一个条件为止
-
接着processSelectedKeys对获取到的selectKey处理
-
在接着runAllTasks 执行队列任务
总结
- 每次执行execute方法就会向队列中添加任务。当第一次添加时候就启动线程,执行run方法,run方法是EventLoop的核心实现,负责轮询获取事件,处理事件,执行队列中任务
- 其中调用selector的select方法默认阻塞一秒,有定时任务就t+0.5,t是定时任务剩余时间,当执行execute方法时候,也就是添加任务的时候,唤醒selector,防止selector阻塞事件过长
- 当selector返回的时候,会调用processSelectedKeys对selectKey进行处理
- 当processSelectedKeys 方法执行结束,按照ioRatio比例执行runAllTasks方法默认是 IO 任务时间和非 IO 任务时间是相同的代码如下
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);