【Netty 源码】NioEventLoop 源码分析 篇二
1.NioEventLoop 继承关系
NioEventLoop继承自SingleThreadEventLoop,一次只能执行一个线程任务,因此在父类SingleThreadEventLoop中维护了 Queue tailTasks 线程队列。
NioEventLoop又间接继承了 AbstractScheduledEventExecutor ,因此也具有提交定时任务的能力
2. Selector 的创建时机
1.构造方法执行时赋值Selector
NioEventLoop 有且仅有一个有参构造方法,在构造方法执行时,对成员对象Selector 进行赋值
io.netty.channel.nio.NioEventLoop#NioEventLoop
2.两个Selector类型的成员变量
每个NioEventLoop 都维护了两个Selector
在其有参构造方法执行时,调用 openSelector() 方法,这里截图部分代码片段
final Selector unwrappedSelector;
try {
//调用java原生的api创建Selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
//如果没有开启对key Set集合的优化,默认返回原生的Selector,原生Selector遍历key时,使用的是set集合,效率低
//默认 false,也就是开启对key set的优化
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
.......
默认 DISABLE_KEY_SET_OPTIMIZATION 等于false
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
因此默认情况下,代码会继续往下执行
......
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
//开启暴力反射
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
//将原生的setKeys集合替换
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
.......
在这段代码中,我们可以看到Netty使用反射,将原生的Selector的两个成员变量 selectedKeys,publicSelectedKeys 进行替换,而替换后的对象SelectedSelectionKeySet,使用的是数组去存储keys
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
而原生的keys对象则是set集合
所以Netty对原生的Selector进行优化 第一个Selector对象 selector 遍历keys的效率更高,而第二Selector对象 unwrappedSelector 则是对第一个selector功能的完善,很多能力还是需要原生的Selectoral去实现
3. NioEventLoop 什么时候启动NIO线程
这里我们通过idea断点追踪调用栈类分析
public static void main(String[] args) {
EventLoop next = new NioEventLoopGroup().next();
next.execute(new Runnable() {
@Override
public void run() {
System.out.println("任务执行");
}
});
}
通过调用栈,发现main线程最终调用到 io.netty.util.concurrent.SingleThreadEventExecutor#addTask ,然后由nioEventLoopGroup-2-1线程调用run方法执行
NioEventLoop执行线程任务时,会调用父类SingleThreadEventExecutor.execute 方法,然后再调用 addTask 方法,将任务添加到 taskQueue 队列中。
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
//将任务添加到队列中 只是添加 取出使用在后面
addTask(task);
//此时是main方法调用 false false 就是ture
if (!inEventLoop) {
// 开始执行线程任务
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void startThread() {
// 更改线程状态
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
//继续往下跟
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
往 线程池 中提交任务,调用 io.netty.channel.nio.NioEventLoop#run 方法
private void doStartThread() {
assert thread == null;
//往线程池中提交任务
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//调用NioEventLoop的run方法 启动nio线程,监听事件并从队列中弹出任务并执行
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
//关闭selector
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
·死循环监听、处理事件
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//处理keys
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
因此我们从这可以知道,在首次调用EventLoop的executor方法,将会启动nio线程,重复调用并不会重复启动nio线程,因为有状态位进行控制
4. NIO线程会阻塞普通任务吗?
NioEventLoop 首先它是单线程 ,不仅仅会启动 Nio线程,有时还需要执行普通任务,那么nio会影响普通任务的执行吗?
Selector.select具有阻塞性,首先我们看netty是怎么处理的
io.netty.channel.nio.NioEventLoop#select
/**
* 核心思想:没有task要做时,select阻塞1s,如果有task,wakeup去做。
* @param oldWakenUp
* @throws IOException
*/
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
//按scheduled的task时间来计算select timeout时间。
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
if (nextWakeupTime != normalizedDeadlineNanos) {
nextWakeupTime = normalizedDeadlineNanos;
}
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) { //已经有定时task需要执行了,或者超过最长等待时间了
if (selectCnt == 0) {
//非阻塞,没有数据返回0
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
//设置超时时间,不会一直阻塞 其次
//下面select阻塞中,别人唤醒也可以可以的
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
可以看到 selector.select(timeoutMillis) 设置了超时时间。其次,nio线程只有首次NioEventloop调用executor方法才会启动,后续再次调用不会二次启动,并且会唤醒Selector.select
io.netty.util.concurrent.SingleThreadEventExecutor#execute
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
//将任务添加到队列中 只是添加 取出使用在后面
addTask(task);
//此时是main方法调用 false false 就是ture
if (!inEventLoop) {
// 开始执行线程任务
//二次调用不会执行
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
//唤醒Selector
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
注意,此时我们是在NioEventloop的父类SingleThreadEventExecutor中跟踪源码,子类NioEventloop 对父类的 wakeup 方法进行了重写
io.netty.channel.nio.NioEventLoop#wakeup
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
//唤醒 selector 以便执行普通任务
selector.wakeup();
}
}
5.什么时候去获取Selector上的事件
回到run方法,处理逻辑都在这个switch代码块中
io.netty.channel.nio.NioEventLoop#run
selectNowSupplier 是 IntSupplier 接口的实现类,重写了get方法,重写后的get方法会调用Selector.selectNow()方法,立即获取当前Selector上的IO事件
hasTasks() 判断当前Queue中是否有任务
在来看这个策略方法 :
如果有任务,获取当前Selector上的IO事件,并立刻返回 ;如果没有任务,返回 SelectStrategy.SELECT,进入匹配case,执行select(wakenUp.getAndSet(false)) 方法 超时阻塞等待获取Selector上的io事件
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
在当前Queue中有任务的情况下,即会处理keys,也会执行所有的任务
try {
//处理keys
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
6.Netty怎么处理NIO空轮询BUG
1.什么是NIO空轮询BUG
即使无客户端连接,NIO照样不断的从select本应该阻塞的Selector.select()
中wake up出来,导致CPU100%问题
2.Netty处理方式
io.netty.channel.nio.NioEventLoop#select
for (;;) {
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
}
在这截取了部分代码,可以看到Netty定义了一个 selectCnt ,当 selectCnt > SELECTOR_AUTO_REBUILD_THRESHOLD 时,就会调用selectRebuildSelector(selectCnt)创建一个新的selector。
SELECTOR_AUTO_REBUILD_THRESHOLD的默认值为512.用户可以设置 io.netty.selectorAutoRebuildThreshold 的值来进行控制
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
7.Netty怎么控制NIO事件处理与普通任务执行的任务时间
for循环执行在一个单线程中,又要执行 processSelectedKeys 处理Selector上的io事件,又要执行 runAllTasks 执行普通任务。
因此Netty需要协调两个任务的执行时间。
io.netty.channel.nio.NioEventLoop#ioRatio int类型的变量,默认值为50
private volatile int ioRatio = 50;
io.netty.channel.nio.NioEventLoop#run
for (;;) {
//ioRatio默认等于50 ,走else
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
//记录 Selector处理io事件的开始时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
//当前时间 - io事件开始时间 = io耗时时间
final long ioTime = System.nanoTime() - ioStartTime
// io耗时时间 * ioRatio比例 = 普通任务的执行时间
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
}