【Netty源码系列文章中源码出自4.1.84.Final
版本】
文章目录
- 1. EventLoopGroup接口类
- 2. NioEventLoopGroup创建过程
- 2.1 Executor实现机制
- 2.2 EventLoop对象创建(newChild()方法)
本篇文章主要看一下
EventLoopGroup
的源码,了解一下它的创建过程。
EventLoopGroup bossGroup = new NioEventLoopGroup(2);
EventLoopGroup workerGroup = new NioEventLoopGroup();
1. EventLoopGroup接口类
我们先进入EventLoopGroup
类中,看一下这个类的情况。有关这个类的信息都在源码中注释了
/**
* 这是一个接口类,继承自EventExecutorGroup
* 它的作用就是允许将 事件循环中每个加工过的Channel对象注册进来
*/
public interface EventLoopGroup extends EventExecutorGroup {
/**
* 枚举返回下一个EventLoop对象
*/
@Override
EventLoop next();
/**
* 使用EventLoop注册Channel,当注册完成时,返回通知对象ChannelFuture
*/
ChannelFuture register(Channel channel);
/**
* 使用EventLoop注册Channel,一旦注册完成,也会返回通知对象ChannelFuture,但是这里传参变为ChannelPromise,
* 这个对象内部也维护了一个Channel对象
*/
ChannelFuture register(ChannelPromise promise);
/**
* 这个方法已经弃用,上面的方法已经包含这个方法功能
*/
@Deprecated
ChannelFuture register(Channel channel, ChannelPromise promise);
}
ChannelFuture
接口类,异步返回Channel的I/O结果。
2. NioEventLoopGroup创建过程
new NioEventLoopGroup(2)
这里传入线程数量参数,就会根据传入的值进行创建,不传时,默认按系统cpu的核数*2
进行创建。
进入NioEventLoopGroup
类中,一直debug,可以看到如下图所示的过程。
此时,我们进入super(...)
这个方法中,进入到MultithreadEventLoopGroup
类里面。
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
这时,我们看到一个三目运算,而DEFAULT_EVENT_LOOP_THREADS
的值,在这个类加载时,就已经进行赋值操作,正是获取系统可用cpu核数的2倍,如下。
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
我们接着进入super(...)
方法中,可以看到下面的this(...)
方法,看一下注释,说创建一个新的实例
,我们再点击进入到真正的方法中MultithreadEventExecutorGroup(...)
。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//检查参数nThreads
checkPositive(nThreads, "nThreads");
//创建executor对象
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//创建EventExecutor数组
children = new EventExecutor[nThreads];
//枚举将生成的EventExecutor对象放入数组中
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//这里是核心,用来生成EventExecutor对象,后面会讲
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
//枚举设置监听器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
//将EventExecutor对象放入一个只读集合中
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
这个方法干了这几件事,见下图。我们先整体有个了解,再对其中几个重要的步骤进行深入。
2.1 Executor实现机制
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
我们知道,之前参数executor
传入的是null值,在这里会进行创建。
newDefaultThreadFactory()
创建默认线程工厂,并为线程设置一些属性,我们进入这个方法中。
//MultithreadEventExecutorGroup类
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
下一步:
//DefaultThreadFactory类,实现ThreadFactory接口
public DefaultThreadFactory(Class<?> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}
这里新增两个参数,
daemon
:守护进程,默认传false
priority
:优先级,默认传的是5,即Thread.NORM_PRIORITY
下一步:
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
this(toPoolName(poolType), daemon, priority);
}
toPoolName(poolType)
方法,是获取对应的池名称。
下一步:
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority, null);
}
新增一个参数,
ThreadGroup
:线程组,默认传null
下一步:
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
//检查池名称不为null,否则抛异常
ObjectUtil.checkNotNull(poolName, "poolName");
//检查优先级范围
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
"priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
//poolId,是AtomicInteger原子对象,用来拼接线程名称前缀
//后面对三个属性进行赋值
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}
到这里,只是对线程工厂的线程进行属性的设置;
回到开始的地方:
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
我们进入ThreadPerTaskExecutor
这个类中,看一下。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
这个类里,只是把刚刚设置好的线程工厂对象ThreadFactory
赋值给ThreadPerTaskExecutor
中的属性,这里并没有去调用execute()
方法;
真正调用执行的是这个方法:newChild(executor, args)
//MultithreadEventExecutorGroup类
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
...
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//真正去调用执行的方法
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
}
...
}
2.2 EventLoop对象创建(newChild()方法)
我们点击newChild()
方法,发现这个一个抽象方法,这里我们看一下它的实现方法,因为创建的是NioEventLoopGroup
对象,所以我们选择NioEventLoopGroup这个类的重写方法,点进去看一下:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
...
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
这里对之前传入的多个参数,根据不同的参数类型,进行强制转换;然后将转换后的参数传递到NioEventLoop类的构造方法中,创建一个NioEventLoop
对象。我们进入它的构造方法中,看一下:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
NioEventLoop构造方法中又调用了它的父类方法,校验参数,获取选择器并赋值。我们进入super(...)
方法中。
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
这里也是干了同样的事情,调用父类方法,校验参数。我们再点击super()方法。
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
//重点!!!
this.executor = ThreadExecutorMap.apply(executor, this);
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
到了这一步,我们会发现不一样了,又出现executor
这个对象了,而且通过ThreadExecutorMap.apply(executor, this)
这个方法又重新赋值给Executor,我们前面说过,之前创建Executor的时候,只是设置了一些属性参数,并没有真正去调用创建线程的方法,现在这里又出现了,会是这里吗?我们点击apply()
方法进去看看。
/**
* Decorate the given {@link Executor} and ensure {@link #currentExecutor()} will return {@code eventExecutor}
* when called from within the {@link Runnable} during execution.
*/
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Executor() {
@Override
public void execute(final Runnable command) {
executor.execute(apply(command, eventExecutor));
}
};
}
我们看到这里调用execute()这个方法,apply(command, eventExecutor)
这个方法通过command.run()
生成Runnable对象,然后执行器去调用。
到这里我们大体了解EventLoopGroup
的整个创建过程,本文有误的地方,烦请留言指正,谢谢!