接上文解决Netty那些事儿之Reactor在Netty中的实现(创建篇)-上
Netty对JDK NIO 原生Selector的优化
首先在NioEventLoop
中有一个Selector优化开关DISABLE_KEY_SET_OPTIMIZATION
,通过系统变量-D io.netty.noKeySetOptimization
指定,默认是开启的,表示需要对JDK NIO原生Selector
进行优化。
public final class NioEventLoop extends SingleThreadEventLoop {
//Selector优化开关 默认开启 为了遍历的效率 会对Selector中的SelectedKeys进行数据结构优化
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
}
如果优化开关DISABLE_KEY_SET_OPTIMIZATION
是关闭的,那么直接返回JDK NIO原生的Selector
。
private SelectorTuple openSelector() {
..........SelectorProvider创建JDK NIO 原生Selector..............
if (DISABLE_KEY_SET_OPTIMIZATION) {
//JDK NIO原生Selector ,Selector优化开关 默认开启需要对Selector进行优化
return new SelectorTuple(unwrappedSelector);
}
}
下面为Netty对JDK NIO原生的Selector
的优化过程:
-
获取
JDK NIO原生Selector
的抽象实现类sun.nio.ch.SelectorImpl
。JDK NIO原生Selector
的实现均继承于该抽象类。用于判断由SelectorProvider
创建出来的Selector
是否为JDK默认实现
(SelectorProvider
第三种加载方式)。因为SelectorProvider
可以是自定义加载,所以它创建出来的Selector
并不一定是JDK NIO 原生的。
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
JDK NIO Selector的抽象类sun.nio.ch.SelectorImpl
public abstract class SelectorImpl extends AbstractSelector {
// The set of keys with data ready for an operation
// //IO就绪的SelectionKey(里面包裹着channel)
protected Set<SelectionKey> selectedKeys;
// The set of keys registered with this Selector
//注册在该Selector上的所有SelectionKey(里面包裹着channel)
protected HashSet<SelectionKey> keys;
// Public views of the key sets
//用于向调用线程返回的keys,不可变
private Set<SelectionKey> publicKeys; // Immutable
//当有IO就绪的SelectionKey时,向调用线程返回。只可删除其中元素,不可增加
private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = new HashSet<SelectionKey>();
selectedKeys = new HashSet<SelectionKey>();
if (Util.atBugLevel("1.4")) {
publicKeys = keys;
publicSelectedKeys = selectedKeys;
} else {
//不可变
publicKeys = Collections.unmodifiableSet(keys);
//只可删除其中元素,不可增加
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
}
}
这里笔者来简单介绍下JDK NIO中的Selector
中这几个字段的含义,我们可以和上篇文章讲到的epoll在内核中的结构做类比,方便大家后续的理解:
image
-
Set<SelectionKey> selectedKeys
类似于我们上篇文章讲解Epoll
时提到的就绪队列eventpoll->rdllist
,Selector
这里大家可以理解为Epoll
。Selector
会将自己监听到的IO就绪
的Channel
放到selectedKeys
中。
这里的
SelectionKey
暂且可以理解为Channel
在Selector
中的表示,类比上图中epitem结构
里的epoll_event
,封装IO就绪Socket的信息。其实SelectionKey
里包含的信息不止是Channel
还有很多IO相关的信息。后面我们在详细介绍。
-
HashSet<SelectionKey> keys:
这里存放的是所有注册到该Selector
上的Channel
。类比epoll中的红黑树结构rb_root
SelectionKey
在Channel
注册到Selector
中后生成。
-
Set<SelectionKey> publicSelectedKeys
相当于是selectedKeys
的视图,用于向外部线程返回IO就绪
的SelectionKey
。这个集合在外部线程中只能做删除操作不可增加元素
,并且不是线程安全的
。 -
Set<SelectionKey> publicKeys
相当于keys
的不可变视图,用于向外部线程返回所有注册在该Selector
上的SelectionKey
这里需要重点关注
抽象类sun.nio.ch.SelectorImpl
中的selectedKeys
和publicSelectedKeys
这两个字段,注意它们的类型都是HashSet
,一会优化的就是这里!!!!
-
判断由
SelectorProvider
创建出来的Selector
是否是JDK NIO原生的Selector
实现。因为Netty优化针对的是JDK NIO 原生Selector
。判断标准为sun.nio.ch.SelectorImpl
类是否为SelectorProvider
创建出Selector
的父类。如果不是则直接返回。不在继续下面的优化过程。
//判断是否可以对Selector进行优化,这里主要针对JDK NIO原生Selector的实现类进行优化,因为SelectorProvider可以加载的是自定义Selector实现
//如果SelectorProvider创建的Selector不是JDK原生sun.nio.ch.SelectorImpl的实现类,那么无法进行优化,直接返回
if (!(maybeSelectorImplClass instanceof Class) ||
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
通过前面对SelectorProvider
的介绍我们知道,这里通过provider.openSelector()
创建出来的Selector
实现类为KQueueSelectorImpl类
,它继承实现了sun.nio.ch.SelectorImpl
,所以它是JDK NIO 原生的Selector
实现
class KQueueSelectorImpl extends SelectorImpl {
}
-
创建
SelectedSelectionKeySet
通过反射替换掉sun.nio.ch.SelectorImpl类
中selectedKeys
和publicSelectedKeys
的默认HashSet
实现。
为什么要用SelectedSelectionKeySet
替换掉原来的HashSet
呢??
因为这里涉及到对HashSet类型
的sun.nio.ch.SelectorImpl#selectedKeys
集合的两种操作:
-
插入操作: 通过前边对
sun.nio.ch.SelectorImpl类
中字段的介绍我们知道,在Selector
监听到IO就绪
的SelectionKey
后,会将IO就绪
的SelectionKey
插入sun.nio.ch.SelectorImpl#selectedKeys
集合中,这时Reactor线程
会从java.nio.channels.Selector#select(long)
阻塞调用中返回(类似上篇文章提到的epoll_wait
)。 -
遍历操作:
Reactor线程
返回后,会从Selector
中获取IO就绪
的SelectionKey
集合(也就是sun.nio.ch.SelectorImpl#selectedKeys
),Reactor线程
遍历selectedKeys
,获取IO就绪
的SocketChannel
,并处理SocketChannel
上的IO事件
。
我们都知道HashSet
底层数据结构是一个哈希表
,由于Hash冲突
这种情况的存在,所以导致对哈希表
进行插入
和遍历
操作的性能不如对数组
进行插入
和遍历
操作的性能好。
还有一个重要原因是,数组可以利用CPU缓存的优势来提高遍历的效率。后面笔者会有一篇专门的文章来讲述利用CPU缓存行如何为我们带来性能优势。
所以Netty为了优化对sun.nio.ch.SelectorImpl#selectedKeys
集合的插入,遍历
性能,自己用数组
这种数据结构实现了SelectedSelectionKeySet
,用它来替换原来的HashSet
实现。
资料直通车:Linux内核源码技术学习路线+视频教程内核源码
学习直通车:Linux内核源码内存调优文件系统进程管理设备驱动/网络协议栈
SelectedSelectionKeySet
-
初始化
SelectionKey[] keys
数组大小为1024
,当数组容量不够时,扩容为原来的两倍大小。 -
通过数组尾部指针
size
,在向数组插入元素的时候可以直接定位到插入位置keys[size++]
。操作一步到位,不用像哈希表
那样还需要解决Hash冲突
。 -
对数组的遍历操作也是如丝般顺滑,CPU直接可以在缓存行中遍历读取数组元素无需访问内存。比
HashSet
的迭代器java.util.HashMap.KeyIterator
遍历方式性能不知高到哪里去了。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
//采用数组替换到JDK中的HashSet,这样add操作和遍历操作效率更高,不需要考虑hash冲突
SelectionKey[] keys;
//数组尾部指针
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
/**
* 数组的添加效率高于 HashSet 因为不需要考虑hash冲突
* */
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
//时间复杂度O(1)
keys[size++] = o;
if (size == keys.length) {
//扩容为原来的两倍大小
increaseCapacity();
}
return true;
}
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
/**
* 采用数组的遍历效率 高于 HashSet
* */
@Override
public Iterator<SelectionKey> iterator() {
return new Iterator<SelectionKey>() {
private int idx;
@Override
public boolean hasNext() {
return idx < size;
}
@Override
public SelectionKey next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return keys[idx++];
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
看到这里不禁感叹,从各种小的细节可以看出Netty对性能的优化简直淋漓尽致,对性能的追求令人发指。细节真的是魔鬼。
-
Netty通过反射的方式用
SelectedSelectionKeySet
替换掉sun.nio.ch.SelectorImpl#selectedKeys
,sun.nio.ch.SelectorImpl#publicSelectedKeys
这两个集合中原来HashSet
的实现。
-
反射获取
sun.nio.ch.SelectorImpl
类中selectedKeys
和publicSelectedKeys
。
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
-
Java9
版本以上通过sun.misc.Unsafe
设置字段值的方式
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
}
-
通过反射的方式用
SelectedSelectionKeySet
替换掉hashSet
实现的sun.nio.ch.SelectorImpl#selectedKeys,sun.nio.ch.SelectorImpl#publicSelectedKeys
。
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
//Java8反射替换字段
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
-
将与
sun.nio.ch.SelectorImpl
类中selectedKeys
和publicSelectedKeys
关联好的Netty优化实现SelectedSelectionKeySet
,设置到io.netty.channel.nio.NioEventLoop#selectedKeys
字段中保存。
//会通过反射替换selector对象中的selectedKeySet保存就绪的selectKey
//该字段为持有selector对象selectedKeys的引用,当IO事件就绪时,直接从这里获取
private SelectedSelectionKeySet selectedKeys;
后续
Reactor线程
就会直接从io.netty.channel.nio.NioEventLoop#selectedKeys
中获取IO就绪
的SocketChannel
-
用
SelectorTuple
封装unwrappedSelector
和wrappedSelector
返回给NioEventLoop
构造函数。到此Reactor
中的Selector
就创建完毕了。
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
private static final class SelectorTuple {
final Selector unwrappedSelector;
final Selector selector;
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
-
所谓的
unwrappedSelector
是指被Netty优化过的JDK NIO原生Selector。 -
所谓的
wrappedSelector
就是用SelectedSelectionKeySetSelector
装饰类将unwrappedSelector
和与sun.nio.ch.SelectorImpl类
关联好的Netty优化实现SelectedSelectionKeySet
封装装饰起来。
wrappedSelector
会将所有对Selector
的操作全部代理给unwrappedSelector
,并在发起轮询IO事件
的相关操作中,重置SelectedSelectionKeySet
清空上一次的轮询结果。
final class SelectedSelectionKeySetSelector extends Selector {
//Netty优化后的 SelectedKey就绪集合
private final SelectedSelectionKeySet selectionKeys;
//优化后的JDK NIO 原生Selector
private final Selector delegate;
SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
this.delegate = delegate;
this.selectionKeys = selectionKeys;
}
@Override
public boolean isOpen() {
return delegate.isOpen();
}
@Override
public SelectorProvider provider() {
return delegate.provider();
}
@Override
public Set<SelectionKey> keys() {
return delegate.keys();
}
@Override
public Set<SelectionKey> selectedKeys() {
return delegate.selectedKeys();
}
@Override
public int selectNow() throws IOException {
//重置SelectedKeys集合
selectionKeys.reset();
return delegate.selectNow();
}
@Override
public int select(long timeout) throws IOException {
//重置SelectedKeys集合
selectionKeys.reset();
return delegate.select(timeout);
}
@Override
public int select() throws IOException {
//重置SelectedKeys集合
selectionKeys.reset();
return delegate.select();
}
@Override
public Selector wakeup() {
return delegate.wakeup();
}
@Override
public void close() throws IOException {
delegate.close();
}
}
到这里Reactor的核心Selector就创建好了,下面我们来看下用于保存异步任务的队列是如何创建出来的。
newTaskQueue
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
//通过用SelectedSelectionKeySet装饰后的unwrappedSelector
this.selector = selectorTuple.selector;
//Netty优化过的JDK NIO远程Selector
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
我们继续回到创建Reactor
的主线上,到目前为止Reactor
的核心Selector
就创建好了,前边我们提到Reactor
除了需要监听IO就绪事件
以及处理IO就绪事件
外,还需要执行一些异步任务,当外部线程向Reactor
提交异步任务后,Reactor
就需要一个队列来保存这些异步任务,等待Reactor线程
执行。
下面我们来看下Reactor
中任务队列的创建过程:
//任务队列大小,默认是无界队列
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
-
在
NioEventLoop
的父类SingleThreadEventLoop
中提供了一个静态变量DEFAULT_MAX_PENDING_TASKS
用来指定Reactor
任务队列的大小。可以通过系统变量-D io.netty.eventLoop.maxPendingTasks
进行设置,默认为Integer.MAX_VALUE
,表示任务队列默认为无界队列
。 -
根据
DEFAULT_MAX_PENDING_TASKS
变量的设定,来决定创建无界任务队列还是有界任务队列。
//创建无界任务队列
PlatformDependent.<Runnable>newMpscQueue()
//创建有界任务队列
PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks)
public static <T> Queue<T> newMpscQueue() {
return Mpsc.newMpscQueue();
}
public static <T> Queue<T> newMpscQueue(final int maxCapacity) {
return Mpsc.newMpscQueue(maxCapacity);
}
Reactor
内的异步任务队列的类型为MpscQueue
,它是由JCTools
提供的一个高性能无锁队列,从命名前缀Mpsc
可以看出,它适用于多生产者单消费者
的场景,它支持多个生产者线程安全的访问队列,同一时刻只允许一个消费者线程读取队列中的元素。
我们知道Netty中的
Reactor
可以线程安全
的处理注册其上的多个SocketChannel
上的IO数据
,保证Reactor线程安全
的核心原因正是因为这个MpscQueue
,它可以支持多个业务线程在处理完业务逻辑后,线程安全的向MpscQueue
添加异步写任务
,然后由单个Reactor线程
来执行这些写任务
。既然是单线程执行,那肯定是线程安全的了。
Reactor对应的NioEventLoop类型继承结构
image.png
NioEventLoop
的继承结构也是比较复杂,这里我们只关注在Reactor
创建过程中涉及的到两个父类SingleThreadEventLoop
,SingleThreadEventExecutor
。
剩下的继承体系,我们在后边随着Netty
源码的深入在慢慢介绍。
前边我们提到,其实Reactor
我们可以看作是一个单线程的线程池,只有一个线程用来执行IO就绪事件的监听
,IO事件的处理
,异步任务的执行
。用MpscQueue
来存储待执行的异步任务。
命名前缀为SingleThread
的父类都是对Reactor
这些行为的分层定义。也是本小节要介绍的对象
SingleThreadEventLoop
Reactor
负责执行的异步任务分为三类:
-
普通任务:
这是Netty最主要执行的异步任务,存放在普通任务队列taskQueue
中。在NioEventLoop
构造函数中创建。 -
定时任务:
存放在优先级队列中。后续我们介绍。 -
尾部任务:
存放于尾部任务队列tailTasks
中,尾部任务一般不常用,在普通任务执行完后 Reactor线程会执行尾部任务。**使用场景:**比如对Netty 的运行状态做一些统计数据,例如任务循环的耗时、占用物理内存的大小等等都可以向尾部队列添加一个收尾任务完成统计数据的实时更新。
SingleThreadEventLoop
负责对尾部任务队列tailTasks
进行管理。并且提供Channel
向Reactor
注册的行为。
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
//任务队列大小,默认是无界队列
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
//尾部任务队列
private final Queue<Runnable> tailTasks;
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
//尾部队列 执行一些统计任务 不常用
tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
@Override
public ChannelFuture register(Channel channel) {
//注册channel到绑定的Reactor上
return register(new DefaultChannelPromise(channel, this));
}
}
SingleThreadEventExecutor
SingleThreadEventExecutor
主要负责对普通任务队列
的管理,以及异步任务的执行
,Reactor线程的启停
。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
//parent为Reactor所属的NioEventLoopGroup Reactor线程组
super(parent);
//向Reactor添加任务时,是否唤醒Selector停止轮询IO就绪事件,马上执行异步任务
this.addTaskWakesUp = addTaskWakesUp;
//Reactor异步任务队列的大小
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
//用于启动Reactor线程的executor -> ThreadPerTaskExecutor
this.executor = ThreadExecutorMap.apply(executor, this);
//普通任务队列
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
//任务队列满时的拒绝策略
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
}
到现在为止,一个完整的Reactor架构
就被创建出来了。
Reactor结构.png
3. 创建Channel到Reactor的绑定策略
到这一步,Reactor线程组NioEventLoopGroup
里边的所有Reactor
就已经全部创建完毕。
无论是Netty服务端NioServerSocketChannel
关注的OP_ACCEPT
事件也好,还是Netty客户端NioSocketChannel
关注的OP_READ
和OP_WRITE
事件也好,都需要先注册到Reactor
上,Reactor
才能监听Channel
上关注的IO事件
实现IO多路复用
。
NioEventLoopGroup
(Reactor线程组)里边有众多的Reactor
,那么以上提到的这些Channel
究竟应该注册到哪个Reactor
上呢?这就需要一个绑定的策略来平均分配。
还记得我们前边介绍MultithreadEventExecutorGroup类
的时候提到的构造器参数EventExecutorChooserFactory
吗?
这时候它就派上用场了,它主要用来创建Channel
到Reactor
的绑定策略。默认为DefaultEventExecutorChooserFactory.INSTANCE
。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
//从Reactor集合中选择一个特定的Reactor的绑定策略 用于channel注册绑定到一个固定的Reactor上
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
chooser = chooserFactory.newChooser(children);
}
下面我们来看下具体的绑定策略:
DefaultEventExecutorChooserFactory
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
...................省略.................
}
我们看到在newChooser
方法绑定策略有两个分支,不同之处在于需要判断Reactor线程组中的Reactor
个数是否为2的次幂
。
Netty中的绑定策略就是采用round-robin
轮询的方式来挨个选择Reactor
进行绑定。
采用round-robin
的方式进行负载均衡,我们一般会用round % reactor.length
取余的方式来挨个平均的定位到对应的Reactor
上。
如果Reactor
的个数reactor.length
恰好是2的次幂
,那么就可以用位操作&
运算round & reactor.length -1
来代替%
运算round % reactor.length
,因为位运算的性能更高。具体为什么&
运算能够代替%
运算,笔者会在后面讲述时间轮的时候为大家详细证明,这里大家只需记住这个公式,我们还是聚焦本文的主线。
了解了优化原理,我们在看代码实现就很容易理解了。
利用%
运算的方式Math.abs(idx.getAndIncrement() % executors.length)
来进行绑定。
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicLong idx = new AtomicLong();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
}
利用&
运算的方式idx.getAndIncrement() & executors.length - 1
来进行绑定。
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
又一次被Netty对性能的极致追求所折服~~~~
4. 向Reactor线程组中所有的Reactor注册terminated回调函数
当Reactor线程组NioEventLoopGroup
中所有的Reactor
已经创建完毕,Channel
到Reactor
的绑定策略也创建完毕后,我们就来到了创建NioEventGroup
的最后一步。
俗话说的好,有创建就有启动,有启动就有关闭,这里会创建Reactor关闭
的回调函数terminationListener
,在Reactor
关闭时回调。
terminationListener
回调的逻辑很简单:
-
通过
AtomicInteger terminatedChildren
变量记录已经关闭的Reactor
个数,用来判断NioEventLoopGroup
中的Reactor
是否已经全部关闭。 -
如果所有
Reactor
均已关闭,设置NioEventLoopGroup
中的terminationFuture
为success
。表示Reactor线程组
关闭成功。
//记录关闭的Reactor个数,当Reactor全部关闭后,才可以认为关闭成功
private final AtomicInteger terminatedChildren = new AtomicInteger();
//关闭future
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
//当所有Reactor关闭后 才认为是关闭成功
terminationFuture.setSuccess(null);
}
}
};
//为所有Reactor添加terminationListener
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
我们在回到文章开头Netty服务端代码模板
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//创建主从Reactor线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
...........省略............
}
}
现在Netty的主从Reactor线程组
就已经创建完毕,此时Netty服务端的骨架已经搭建完毕,骨架如下:
主从Reactor线程组.png
总结
本文介绍了首先介绍了Netty对各种IO模型
的支持以及如何轻松切换各种IO模型
。
还花了大量的篇幅介绍Netty服务端的核心引擎主从Reactor线程组
的创建过程。在这个过程中,我们还提到了Netty对各种细节进行的优化,展现了Netty对性能极致的追求。
好了,Netty服务端的骨架已经搭好,剩下的事情就该绑定端口地址然后接收连接了