前言:在使用Netty 时不管是服务端还是客户端都需要 new NioEventLoopGroup 对象进行工作,NioEventLoopGroup的作用是什么呢;
1 NioEventLoopGroup 类图:
从类名字来看它是一个Nio 流的事件轮询器组,既然是一组顾名思义这个组里应该存放了同一种类型的字对象;从类图看它是继承Executor 它是Java 线程池的顶级类,说明它是具有创建线程执行线程的能力;
NioEventLoopGroup 对于任务的处理:
package com.example.nettydemo.netty.task;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Slf4j
public class EventLoopTask {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 可以处理 io 事件,普通任务,定时任务
EventLoopGroup group = new NioEventLoopGroup(2);
// 可以处理,普通任务,定时任务
// EventLoopGroup defaultGroup = new DefaultEventLoopGroup();
Future future = group.next().submit(() -> {
log.debug("普通任务");
return "success";
});
// 异步结果获取 nio 获取结果
future.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> stringFuture) throws Exception {
log.debug("结果:{}", stringFuture.get());
}
});
// 同步调用 main 线程获取结果
log.debug("结果:{}", future.get());
// 定时任务 延迟2s 之后 以1s 的频率执行任务
group.next().scheduleAtFixedRate(() -> {
log.debug("定时任务");
}, 2, 1, TimeUnit.SECONDS);
// group.shutdownGracefully();
}
}
代码比价简单直接看下执行结果:
2 NioEventLoopGroup的初始化 :
2.1 当创建 NioEventLoopGroup 时 会看到其嵌了很多层的构造方法:
NioEventLoopGroup 类:
public NioEventLoopGroup() {
// 核心的线程数量此时给的时0
this(0);
}
public NioEventLoopGroup(int nThreads) {
// 线程的执行器Executor 给的是null
this(nThreads, (Executor)null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
// SelectorProvider 根据当前的环境获得 SelectorProvider的单例
// 如果是window 环境 最终会给到WindowsSelectorProvider 对象
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
// DefaultSelectStrategyFactory.INSTANCE 默认的选择策略
this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, SelectorProvider selectorProvider, SelectStrategyFactory selectStrategyFactory) {
// RejectedExecutionHandlers.reject() 拒绝的处理器,直接抛出了异常
super(nThreads, executor, new Object[]{selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()});
}
SelectorProvider.provider()可以参考:window–Select.open()
选择策略类 DefaultSelectStrategyFactory.INSTANCE:
public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();
private DefaultSelectStrategyFactory() {
}
public SelectStrategy newSelectStrategy() {
return DefaultSelectStrategy.INSTANCE;
}
}
拒绝策略类 RejectedExecutionHandlers:
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
throw new RejectedExecutionException();
}
};
此时我们可以看到在构造 NioEventLoopGroup所用到的几个参数:
- super(nThreads, executor, new Object[]{selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()}
- nThreads: 处理任务的核心线程数 此时为 0
- executor: 线程的执行器 此时为 null
- selectorProvider:Selector的提供者 此时为 WindowsSelectorProvider
- selectStrategyFactory: 默认的选择器策略,此时为 DefaultSelectStrategyFactory 对象
- RejectedExecutionHandlers.reject():拒绝的策略直接抛出了RejectedExecutionException
执行到此这里有个关键代码:SelectorProvider.provider(),可以看到此时已经获取到了selector 对象并当做参数向下传递:
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
// SelectorProvider 根据当前的环境获得 SelectorProvider的单例
// 如果是window 环境 最终会给到WindowsSelectorProvider 对象
this(nThreads, threadFactory, SelectorProvider.provider());
}
最后super 的方法会 进入其父类MultithreadEventLoopGroup 继续完成初始化:
2.2 MultithreadEventLoopGroup 类的初始化:
// 核心线程数的获取,可以理解为当前电脑cpu的核心数量*2;
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// 继续进入父类方法,如果核心数为0,则取DEFAULT_EVENT_LOOP_THREADS ;
// 如果自己在new NioEventLoopGroup(核心线程数),传入了则使用自定义的核心线程数
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
这里看到一个关键点 NioEventLoopGroup 中线程数量的定义: 核心线程数的获取,可以理解为当前电脑cpu的核心数量*2;
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2))
2.3 MultithreadEventExecutorGroup类的初始化:
// 分组内的 多个事件执行器
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren;
private final Promise<?> terminationFuture;
// 事件执行器 选择工厂
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
// 已经完成工作的个数初始为0
this.terminatedChildren = new AtomicInteger();
// 异步线程任务的执行(任务队列的单线程事件执行器)
this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
} else {
if (executor == null) {
// 初始化 线程工厂类
// (对要创建的线程,优先级,是否守护线程,线程前准,线程分组 赋值)
executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
}
// NioEventLoopGroup 子事件处理个数 数组初始化
this.children = new EventExecutor[nThreads];
int j;
// 创建 nThreads 个数的 EventExecutor 事件执行器
for(int i = 0; i < nThreads; ++i) {
boolean success = false;
boolean var18 = false;
try {
var18 = true;
// 子事件 初始化
// 每个子事件都放入了NioEventLoopGroup 父类对象
// 每个子事件都有构建线程并且执行任务的能力
// 每个子事件都有integer 最大值的任务队列
// 每个子事件 都初始化了一个Selector 并且将Selector 中已经准备好的
// select 从set 结构优化到了数组结构
this.children[i] = this.newChild((Executor)executor, args);
success = true;
var18 = false;
} catch (Exception var19) {
// 子事件初始化异常 抛出异常 并在finally 中将已经创建的子事件进行关闭
throw new IllegalStateException("failed to create a child event loop", var19);
} finally {
if (var18) {
if (!success) {
int j;
for(j = 0; j < i; ++j) {
this.children[j].shutdownGracefully();
}
for(j = 0; j < i; ++j) {
EventExecutor e = this.children[j];
try {
while(!e.isTerminated()) {
e.awaitTermination(2147483647L, TimeUnit.SECONDS);
}
} catch (InterruptedException var20) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
if (!success) {
for(j = 0; j < i; ++j) {
this.children[j].shutdownGracefully();
}
for(j = 0; j < i; ++j) {
EventExecutor e = this.children[j];
try {
while(!e.isTerminated()) {
e.awaitTermination(2147483647L, TimeUnit.SECONDS);
}
} catch (InterruptedException var22) {
Thread.currentThread().interrupt();
break;
}
}
}
}
// 事件选择工厂赋值 如果NioEventLoopGroup分组下的时间处理是2的倍数
// 使用位运算取得下次可以处理时间的某个EventExecutor
// 否则使用取模的方式取得下次可以处理时间的某个EventExecutor
// (isPowerOfTwo(executors.length) ? new PowerOfTwoEventExecutorChooser(executors)
// : new GenericEventExecutorChooser(executors));
this.chooser = chooserFactory.newChooser(this.children);
//
FutureListener<Object> terminationListener = new FutureListener<Object>() {
public void operationComplete(Future<Object> future) throws Exception {
if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
// 当NioEventLoopGroup分组下的所有EventExecutor 都完成了任务,
// 则设置NioEventLoopGroup 的任务执行成功数据
MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);
}
}
};
EventExecutor[] var24 = this.children;
j = var24.length;
// 为每一个 EventExecutor 添加 terminationListener 事件回调
// 当每个EventExecutor 任务完成之后,都进入改监听方法
for(int var26 = 0; var26 < j; ++var26) {
EventExecutor e = var24[var26];
e.terminationFuture().addListener(terminationListener);
}
// 将NioEventLoopGroup 分组下的所有EventExecutor 进行复制并赋值
Set<EventExecutor> childrenSet = new LinkedHashSet(this.children.length);
Collections.addAll(childrenSet, this.children);
this.readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
}
到这里可以看到MultithreadEventExecutorGroup 一些关键点的初始化:
关键点1:线程工厂类的初始化
if (executor == null) {
// 初始化 线程工厂类
// (对要创建的线程,优先级,是否守护线程,线程前准,线程分组 赋值)
executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
}
关键点2:NioEventLoopGroup 中每个NioEventLoop 的创建:
声明NioEventLoopGroup 的NioEventLoop 数量:
this.children = new EventExecutor[nThreads];
每个NioEventLoop 的初始化:
this.children[i] = this.newChild((Executor)executor, args);
关键点3:下一个执行任务的EventExecutor获取的定义:
this.chooser = chooserFactory.newChooser(this.children);
改类中一些属性赋值的具体方法实现:
(1)MultithreadEventExecutorGroup 类中 newDefaultThreadFactory():
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(this.getClass());
}
// 设置改工厂创建线程类名称,非守护线程,线程的优先级为5
public DefaultThreadFactory(Class<?> poolType) {
this((Class)poolType, false, 5);
}
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
this(toPoolName(poolType), daemon, priority);
}
// 获取线程池的类名称
public static String toPoolName(Class<?> poolType) {
ObjectUtil.checkNotNull(poolType, "poolType");
String poolName = StringUtil.simpleClassName(poolType);
switch (poolName.length()) {
case 0:
return "unknown";
case 1:
return poolName.toLowerCase(Locale.US);
default:
return Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1)) ? Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1) : poolName;
}
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority, System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}
// threadGroup 线程进行分组(取的当前线程的线程组),方便追踪
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
// 线程属性赋值
// 线程id
this.nextId = new AtomicInteger();
ObjectUtil.checkNotNull(poolName, "poolName");
if (priority >= 1 && priority <= 10) {
// 线程名字
this.prefix = poolName + '-' + poolId.incrementAndGet() + '-';
// 线程是否守护线程
this.daemon = daemon;
// 线程的优先级
this.priority = priority;
// 线程的分组
this.threadGroup = threadGroup;
} else {
throw new IllegalArgumentException("priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
}
(2) new ThreadPerTaskExecutor(this.newDefaultThreadFactory()):
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
// threadFactory 赋值
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = (ThreadFactory)ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
public void execute(Runnable command) {
this.threadFactory.newThread(command).start();
}
}
this.newChild((Executor)executor, args): 子事件的初始化:
executor :线程工厂类newDefaultThreadFactory
args 数组:
selectorProvider:Selector的提供者 此时为 WindowsSelectorProvider
selectStrategyFactory: 默认的选择器策略,此时为 DefaultSelectStrategyFactory 对象
RejectedExecutionHandlers.reject():拒绝的策略直接抛出了RejectedExecutionException
(3)这里进入到一个关键点 NioEventLoopGroup 类中的newChild 具体 NioEventLoop的初始化方法:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory)args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2], queueFactory);
}
进入NioEventLoop 类的构造方法:
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
private static final int CLEANUP_INTERVAL = 256;
private static final boolean DISABLE_KEY_SET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
private final IntSupplier selectNowSupplier = new IntSupplier() {
public int get() throws Exception {
return NioEventLoop.this.selectNow();
}
};
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
private static final long AWAKE = -1L;
private static final long NONE = Long.MAX_VALUE;
private final AtomicLong nextWakeupNanos = new AtomicLong(-1L);
private final SelectStrategy selectStrategy;
private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;
// 参数: executor 线程工厂类newDefaultThreadFactory
// selectorProvider:Selector的提供者 此时为 WindowsSelectorProvider
// selectStrategyFactory: 默认的选择器策略,此时为 DefaultSelectStrategyFactory 对象
// RejectedExecutionHandlers.reject():拒绝的策略直接抛出了RejectedExecutionException
// queueFactory null
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
// 调用父类方法,完成任务队列的赋值
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler);
// selectorProvider 赋值将 WindowsSelectorProvider 赋值
this.provider = (SelectorProvider)ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
// 选择策略赋值
this.selectStrategy = (SelectStrategy)ObjectUtil.checkNotNull(strategy, "selectStrategy");
// netty 自己优化的Selector SelectorTuple
SelectorTuple selectorTuple = this.openSelector();
// 赋值 selector
this.selector = selectorTuple.selector;
// unwrappedSelector 中既有selector又有存放被优化后的 selectionKeys
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
到此可以看到NioEventLoop 的初始化完成了,不过在NioEventLoop 初始化中有些关键点需要注意:
关键点1 :newTaskQueue(queueFactory):队列创建,可以看到改队列的大小是integer 的最大值,并且队列的消费是多生产者一个消费者,此消费模型以为这每个NioEventLoop 实际上只有一个线程在进行任务的处理
// 创建多个线程充当生产者,只有一个线程充当消费者
private static Queue<Runnable> newTaskQueue(EventLoopTaskQueueFactory queueFactory) {
// queueFactory 为null
// 进入 newTaskQueue0(DEFAULT_MAX_PENDING_TASKS) 创建队列
// 队列长度为int 的最大值
return queueFactory == null ? newTaskQueue0(DEFAULT_MAX_PENDING_TASKS) : queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue() : PlatformDependent.newMpscQueue(maxPendingTasks);
}
public static <T> Queue<T> newMpscQueue() {
return PlatformDependent.Mpsc.newMpscQueue();
}
super 方法调用 SingleThreadEventLoop :
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
private final Queue<Runnable> tailTasks;
// executor 线程工厂类newDefaultThreadFactory
// addTaskWakesUp false
// taskQueue 多生产者一个消费者的队列
// tailTaskQueue多生产者一个消费者的队列
// rejectedExecutionHandler :拒绝的策略直接抛出了RejectedExecutionException
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
// 赋值任务队列
this.tailTasks = (Queue)ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
super 方法进入SingleThreadEventExecutor :
// executor 线程工厂类newDefaultThreadFactory
// addTaskWakesUp false
// taskQueue 多生产者一个消费者的队列
// rejectedExecutionHandler :拒绝的策略直接抛出了RejectedExecutionException
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
// 为AbstractEventExecutor 每个childEvent 赋值其父类NioEventLoopGroup 对象
super(parent);
this.threadLock = new CountDownLatch(1);
this.shutdownHooks = new LinkedHashSet();
this.state = 1;
// 线程异步任务执行
this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
this.addTaskWakesUp = addTaskWakesUp;
// 最大任务数
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
// executor 赋值
this.executor = ThreadExecutorMap.apply(executor, this);
// 队列赋值
this.taskQueue = (Queue)ObjectUtil.checkNotNull(taskQueue, "taskQueue");
// 拒绝策略
this.rejectedExecutionHandler = (RejectedExecutionHandler)ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
关键点2 : netty 中对于Selector 的改造 SelectorTuple openSelector():通过此方法可以看到 对于要遍历的 selectKey 底层是用数组进行了替换
private SelectorTuple openSelector() {
final AbstractSelector unwrappedSelector;
try {
// 通过 WindowsSelectorProvider 初始化:WindowsSelector
unwrappedSelector = this.provider.openSelector();
} catch (IOException var7) {
throw new ChannelException("failed to open a new selector", var7);
}
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
} else {
// 获取SelectorImpl 的类
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
try {
return Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
} catch (Throwable var2) {
return var2;
}
}
});
if (maybeSelectorImplClass instanceof Class && ((Class)maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
final Class<?> selectorImplClass = (Class)maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// 构建数组类型的 selectedKeySet 替换掉 SelectorImpl 中原有的
// Set<SelectionKey> selectedKeys 和 Set<SelectionKey> publicSelectedKeys
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1L && publicSelectedKeysFieldOffset != -1L) {
PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
}
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
} else {
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
} else {
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
}
}
} catch (NoSuchFieldException var7) {
return var7;
} catch (IllegalAccessException var8) {
return var8;
}
}
});
if (maybeException instanceof Exception) {
this.selectedKeys = null;
Exception e = (Exception)maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
} else {
// 赋值成功后将优化后的 selectedKeySet 赋值给 selectedKeys
this.selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
// 返回优化后的selector
return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
} else {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable)maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
}
}
至此NioEventLoopGroup 对象初始化完毕,代码嵌套内容还是比较深的;
3 NioEventLoopGroup 初始化的总结:
- NioEventLoopGroup 中在设置核心线程数之后,会创建对应核心线程数的事件处理器,如果没有设置“io.netty.eventLoopThreads” 参数 ,则核心线程数为当前电脑cpu的核心数量*2;
- 每个事件处理都拥有一个自己的Selector 事件选择器,并且拥有创建线程执行任务的能力;
- 每个事件处理器中都会有一个线程来执行任务,模式是多生产者,一个消费者模型,如果任务无法及时处理会被放入到队列长度为integer最大值的任务队列;
- NioEventLoopGroup 初始化的bossGroup 后面会用来处理客户端的accept 时间,而wokerGroup 会用来处理客户端的读事件,以及服务端的写事件;