记录背景
最近在学习Netty,阅读了部分源码,记录一下笔记,方便自己回顾和也希望能作为初学的小伙伴们的部分参考。
本次Netty源码小窥探会是一个小合集,因为个人能力有限,介绍程度肯定不会太深奥,个人是基于尚硅谷的Netty课程进行的学习,在最后的源码讲解的基础上加入了自己的理解。有错误请见谅
适用人员
会Netty的基础使用的小伙伴
正文
引入
分析Netty的源码需要跟踪到NioServerSocketChannel的doBind方法,因为对于连接的监听,还是底层自然还是通过NioServerSocketChannel来监听获得NioSocketChannel的,所以这里是必须的;然后是NioEventLoop中的run方法,我们知道事件组就是一个无限循环的过程,也就是说服务端和客户端其实就是在NioEventLoop中循环执行,内部有selector进行事件区分,所以事件组中的内容也需要看。
首先我们来看一下简答配置的,标准的一个Netty服务端搭建案例代码:
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL. 对SSL进行相关配置
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server. 开始和我们之前一样,正常配置服务端
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new LoggingHandler(LogLevel.INFO));
//p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
对于能使用基础使用Netty的开发人员而言,这段代码的难度肯定不大,我们本次的源码介绍就是根据这段代码顺序进行讲接,从而给大家介绍Netty底层的相关设计。
NioEventLoopGroup的子线程数目决定
对于我们整个Netty项目而言,无论是服务端还是客户端其本质上都是一个在内部循环执行的事件组,每当有对应的逻辑处理,就会事件组中拿出一个线程执行,这也是Netty并发和异步的基础。
对于服务端而言,Netty的连接模式是基于主从Reactor多线程,连接请求和一般逻辑请求处理是分开的,所以我们会需要建立两个循环事件组分别处理连接事件和其余事件,在Netty中是用NioEventLoopGroup数据类型来表示循环事件组的,所以我们一般会创建bossGroup来处理连接事件,workerGroup来处理其余事件(在Netty源码中,一般把bossGroup称之为parentGroup,WorkerGroup称之为ChildGroup)
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
NioEventLoopGroup是个事件循环组,内部会分配子线程,来处理事件,比如bossGroup,内部很多个就绪状态下的子线程,每当有一个客户端来请求连接,就会分配一个子线程去处理连接请求;如果同时来多个,就会将多个空闲就绪子线程分配去分别处理连接请求,那么这里有存在一个问题,NioEventLoopGroup内部的子线程数目是如何决定的呢?
在这里我们可以看到,我们创建bossGroup和WorkerGroup分别使用了有参和无参构造,我们进去查看源码就能知道子线程数据决定的逻辑了。
首先进入无参构造函数
如下图 当我们使用空参构造的时候 内部会传入nThread=0的参数 从而调用内部的有参构造函数
所以实际上我们还是去看有参构造函数
有参构造
从有参构造的结构一直往下,会运行到父类的MultithreadEventLoopGroup方法,代码和调用逻辑如下
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
在MultithreadEventLoopGroup方法的逻辑中我们可以看到,有参无参影响到的就是nThread这个参数,内部实际上就是将有参构造的nThreads参数设置为0,然后跟踪往下走,就会发现方法走到最后构造的时候,会单独判断nThreads是否为0,如果为0就是调用一个默认值,这个默认是在static静态代码块中看到数值为当前设备CPU核数*2
所以总的而言,NioEventLoopGroup内部的子线程数目可以通过构造函数传参数决定,无参构造的话默认为当前设备CPU核数*2,因为连接处理请求一般只会有一次,但是其余事件处理请求会有很多次,所以一般我们将bossGroup进行有参构造,设置一定的量满足需求即可,而对于WorkerGroup我们一般无参构造,充分发挥多核CPU优势,更加高效的处理。
构造方法和EventExecutor
我们上面探寻子线程数目是追踪到了父类的MultithreadEventLoopGroup方法,方法内部也是一个对父类方法的super调用,那我们继续往下追踪父类的构造方法中查看如下:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
checkPositive(nThreads, "nThreads");
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
方法很长,我们慢慢看,这次先看前面的关于子线程的创建部分,很明显的看到成员变量children 也能想到这个变量就是我们所关注的关于子线程创建的变量
可以看到,代码中对于children的第一次赋值就是执行了下面代码
this.children = new EventExecutor[nThreads];
我们知道,children就是NioEventLoopGroup的子线程,从这里的赋值我们可以看到,实际上children就是一个new出来创建的EventExceutor类型的数组,数组长度就是有参构造传入的nThread值或者static静态代码块中决定的数值 ,这也说明了为啥传参决定nThread能决定子线程的多少,子线程就是EventExceutor数组统一包含管理,数组的长度nThread就决定了子线程的多少。
我们走到下面for循环中 可以看到里面有对每一个元素的赋值语句 走到这个语句之后,我们可以看到,children确实是EventExecutor数组类型,但是每一个元素是NioEventLoop类型,为啥NioEventLoop类型能装到EventExcutor类型数组中呢?
我们点开EventExecutor类型 可以看到他是一个接口,然后查看他的继承结构可以看到
NioEventLoopGroup是EventExecutor的子接口的子接口的是实现类的子类,所以可以装进去
然后我们可以看到 每一个子线程的创建 都是依赖于newChild方法 方法中传入的第一个参数就是executor执行器
children[i] = newChild(executor, args);
同时也可以看到 在finally代码块中,会通过标志success进行判断是否创建成功,如果不成功就会调用shutdownGracefully方法关闭线程,并且也会拿到数组中的每一个元素EventExecutor e 对其调用awaitTerminated方法进行处理
while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); }
这部分代码的作用是确保事件执行器(EventExecutor)已经完全终止。
具体来说:
-
while (!e.isTerminated())
:这是一个循环,它会一直执行,直到e.isTerminated()
返回true
。isTerminated()
方法用来检查事件执行器是否已经终止。 -
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS)
:这个方法是用来等待事件执行器终止。它接受两个参数,分别是等待的最大时间和时间单位。这里使用Integer.MAX_VALUE
作为等待时间,意味着如果事件执行器没有在自然停止的情况下终止,方法将无限期等待。
这段代码的目的在于:
- 确保在某个事件执行器创建失败后,已经成功创建的事件执行器都能够正确地关闭。
- 在关闭过程中,如果事件执行器因为某些原因未能正常终止,通过
awaitTermination
方法进行等待,确保所有资源都被正确释放,避免资源泄露。
并且在该方法的后面一点 可以看到 对children线程数组进行循环,对每一个元素使用addListener方法添加了监听器,方便后续事件监听
添加完了监听器之后,我们还可以看到,源码将EveentExecutor数组转化为了一个LinkedHashSet集合的形式进行保存,基于LinkedHashSet的特性,可以保证按照数组顺序链式保存,并且去重
readonlyChildren = Collections.unmodifiableSet(childrenSet);
最后,这行代码将childrenSet
转换成了一个不可修改的集合。Collections.unmodifiableSet
方法返回的集合不允许添加、删除或者修改其中的元素。这意味着一旦这个集合被创建,它的内容就不能被改变了。这通常用于安全地发布集合,使得集合的消费者不能更改集合的内容,从而保护数据的一致性和安全性。将结果赋值给自己的成员Set集合变量