深入拆解Tomcat&Jetty
专栏地址: 极客时间-深入拆解Tomcat & Jetty
IO 多路复用
当用户线程发起 I/O 操作后,网络数据读取操作会经历两个步骤:
- 用户线程等待内核将数据从网卡拷贝到内核空间。
- 内核将数据从内核空间拷贝到用户空间。
IO 多路复用 是 Linux 五种 IO 模型中的一种,逻辑如下图:
这时用户线程读取数据分为了两步:
- 间断的发起 select 调用询问内核数据是否已经准备好
- 在数据就绪后发起 read 系统调用,注意
在数据从内核空间拷贝到用户空间时,线程依然是阻塞的
多路复用体现在一次 select 调用可以查询多个 数据通道(channel) 上的数据是否已经准备好
Tomcat 如何实现多路复用模型
对于 Java 的多路复用器的使用,无非是两步:
- 创建一个 Selector,在它身上注册各种感兴趣的事件,然后调用 select 方法,等待感兴趣的事情发生。
- 感兴趣的事情发生了,比如可以读了,这时便创建一个新的线程从 Channel 中读数据。
Tomcat 的 NioEndpoint 组件虽然实现比较复杂,但基本原理就是上面两步。它一共包含 LimitLatch、Acceptor、Poller、SocketProcessor 和 Executor 共 5 个组件,它们的工作过程如下图所示:
- LimitLatch:连接控制器,它负责控制最大连接数,NIO 模式下默认是 10000,达到这个阈值后,连接请求被拒绝(这里的拒绝指的是应用层意义上的拒绝,操作系统依然会接收 socket 连接,直到等待队列满)。
- Acceptor 跑在一个单独的线程里,它在一个死循环里调用 accept 方法来接收新连接,一旦有新的连接请求到来,accept 方法返回一个 Channel 对象,接着把 Channel 对象交给 Poller 去处理。
- Poller 的本质是一个 Selector,也跑在单独线程里。Poller 在内部维护一个 Channel 数组,它在一个死循环里不断检测 Channel 的数据就绪状态,一旦有 Channel 可读,就生成一个 SocketProcessor 任务对象扔给 Executor 去处理。
- Executor 就是线程池,负责运行 SocketProcessor 任务类,SocketProcessor 的 run 方法会调用 Http11Processor 来读取和解析请求数据。Http11Processor 是应用层协议的封装,它会调用容器获得响应,再把响应通过 Channel 写出。
LimitLatch(org.apache.tomcat.util.threads.LimitLatch)
部分核心代码如下:
public class LimitLatch {
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected int tryAcquireShared() {
long newCount = count.incrementAndGet();
if (newCount > limit) {
count.decrementAndGet();
return -1;
} else {
return 1;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();
return true;
}
}
private final Sync sync;
private final AtomicLong count;
private volatile long limit;
//线程调用这个方法来获得接收新连接的许可,线程可能被阻塞
public void countUpOrAwait() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//调用这个方法来释放一个连接许可,那么前面阻塞的线程可能被唤醒
public long countDown() {
sync.releaseShared(0);
long result = getCount();
return result;
}
}
- 内部类 Sync 继承了 AQS,AQS 是 Java 并发包中的一个核心类,它在内部维护一个状态和一个线程队列,可以用来控制线程什么时候挂起,什么时候唤醒。
- 用户线程通过调用 LimitLatch 的 countUpOrAwait 方法来拿到锁,如果暂时无法获取,这个线程会被阻塞到 AQS 的队列中。而这个方法实际会调用拓展类所重写的 tryAcquireShared 方法,它的实现逻辑是如果当前连接数 count 小于 limit,线程能获取锁,返回 1,否则返回 -1。
- 如果用户线程被阻塞到了 AQS 的队列,同样是由 Sync 内部类决定唤醒,Sync 重写了 AQS 的 tryReleaseShared() 方法,其实就是当一个连接请求处理完了,这时又可以接收一个新连接了,这样前面阻塞的线程将会被唤醒。
Acceptor(org.apache.tomcat.util.net.Acceptor)
Acceptor 实现了 Runnable 接口,因此可以跑在单独线程里。
一个端口号只能对应一个 ServerSocketChannel,因此这个 ServerSocketChannel 是在多个 Acceptor 线程之间共享的,它是 Endpoint 的属性,由 Endpoint 完成初始化和端口绑定。初始化过程如下:
// org.apache.tomcat.util.net.NioEndpoint.initServerSocket()
serverSock = ServerSocketChannel.open();
// getAcceptCount() Acceptor负责从ACCEPT队列中取出连接,当Acceptor处理不过来时,连接就堆积在ACCEPT队列中,默认100
serverSock.socket().bind(addr, getAcceptCount());
serverSock.configureBlocking(true);
ServerSocketChannel 通过 accept() 接受新的连接,accept() 方法返回获得 SocketChannel 对象,然后将 SocketChannel 对象封装在一个 PollerEvent 对象中,并将 PollerEvent 对象压入 Poller 的 Queue 里,这是个典型的“生产者 - 消费者”模式,Acceptor 与 Poller 线程之间通过 Queue 通信。
NioEndpoint.start->
startInternal()->
startAcceptorThread() {new Thread(acceptor, threadName).start()} ->
Acceptor.run(){ socket = endpoint.serverSocketAccept(); endpoint.setSocketOptions(socket); } ->
NioEndpoint.setSocketOptions(socke){ NioSocketWrapper socketWrapper = new Nio2SocketWrapper(channel, this); poller.register(); } ->
NioEndpoint.register(socketWrapper){ addEvent(new PollerEvent(socketWrapper, OP_REGISTER)); } ->
NioEndpoint.events.offer(event){ Poller.events.offer(event); }
Poller(org.apache.tomcat.util.net.NioEndpoint.Poller)
Poller 本质是一个 Selector,它内部维护一个 Queue,这个 Queue 定义如下:
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
SynchronizedQueue 的方法比如 offer、poll、size 和 clear 方法,都使用了 synchronized 关键字进行修饰,用来保证同一时刻只有一个 Acceptor 线程对 Queue 进行读写。同时有多个 Poller 线程在运行(Tomcat9只有一个线程在运行,NioEndpoint#startInternal()
),每个 Poller 线程都有自己的 Queue。每个 Poller 线程可能同时被多个 Acceptor 线程调用来注册 PollerEvent。同样 Poller 的个数可以通过 pollers 参数配置。
- Poller 不断的通过内部的 Selector 对象向内核查询 Channel 的状态,一旦可读就生成任务类 SocketProcessor 交给 Executor 去处理。
- Poller 的另一个重要任务是循环遍历检查自己所管理的 SocketChannel 是否已经超时,如果有超时就关闭这个 SocketChannel。
SocketProcessor(org.apache.tomcat.util.net.NioEndpoint.SocketProcessor)
Poller 会创建 SocketProcessor 任务类交给线程池处理,而 SocketProcessor 实现了 Runnable 接口,(这里是 SocketProcessorBase 实现了 Runnable 接口,在 run 方法里调用了抽象方法 doRun,SocketProcessor 继承了它并重写了 doRun 方法
),用来定义 Executor 中线程所执行的任务,主要就是调用 Http11Processor 组件来处理请求。Http11Processor 读取 Channel 的数据来生成 ServletRequest 对象(Http11Processor#service()
)。
这里请注意:Http11Processor 并不是直接读取 Channel 的
。这是因为 Tomcat 支持同步非阻塞 I/O 模型和异步 I/O 模型,在 Java API 中,相应的 Channel 类也是不一样的,比如有 AsynchronousSocketChannel 和 SocketChannel,为了对 Http11Processor 屏蔽这些差异,Tomcat 设计了一个包装类叫作 SocketWrapper,Http11Processor 只调用 SocketWrapper 的方法去读写数据。
Executor
Executor 是 Tomcat 定制版的线程池,它负责创建真正干活的工作线程,干什么活呢?就是执行 SocketProcessor 的 run 方法,也就是解析请求并通过容器来处理请求,最终会调用到 Servlet。
如何实现高并发
高并发就是能快速地处理大量的请求,需要合理设计线程模型让 CPU 忙起来,尽量不要让线程阻塞,因为一阻塞,CPU 就闲下来了。另外就是有多少任务,就用相应规模的线程数去处理。我们注意到 NioEndpoint 要完成三件事情:接收连接、检测 I/O 事件以及处理请求,那么最核心的就是把这三件事情分开,用不同规模的线程数去处理,比如用专门的线程组去跑 Acceptor,并且 Acceptor 的个数可以配置;用专门的线程组去跑 Poller,Poller 的个数也可以配置;最后具体任务的执行也由专门的线程池来处理,也可以配置线程池的大小。
这其中比较核心的就是把检测IO事件这一操作由少量selector集中处理,避免大量线程占用cpu时间在轮询IO事件上
。
Java中自身的NIO到底是同步非阻塞,还是IO多路复用
NIO API 可以不用 Selector,就是同步非阻塞。使用了 Selector 就是 IO 多路复用
Tomcat 该组件虽然是叫 NioEndpoint,但使用了 Selector,所以其实是 IO 多路复用
如何理解 IO 操作模型中的同步异步,阻塞非阻塞
同步异步:
- 同步可以理解为线程在请求 IO 数据后是直接返回,还是阻塞等待数据从网卡(或者其他地方)拷贝到内存空间再到用户空间
- 异步可以理解为线程在请求 IO 数据后直接返回,但是在请求时注册了一个回调函数,内核将数据准备好后通过回调函数通知
阻塞和非阻塞主要是看发起I/O操作时,内核空间没有数据可读时,线程是否会阻塞等待,直到有数据到来
- 阻塞:调用 read() 时,如果内核空间中没有数据可读,线程就让出 cpu 阻塞等待,直到内核把数据拷贝到用户空间,唤醒线程,read()调用返回
- 非阻塞:调用 read() 时,如果内核空间没有可读数据,线程立刻返回,直到再次调用read(),内核空间有数据可读时,阻塞等待内核把数据拷贝到 read() 函数指定的buff中,唤醒线程,read()调用返回
关于 IO 的几篇文章推荐:
【1】https://time.geekbang.org/column/article/100307 Tomcat如何实现IO多路复用
【2】https://time.geekbang.org/column/article/103959 内核如何阻塞与唤醒进程
【3】https://mp.weixin.qq.com/s/LYbJxorhsyoWWtP6OR6-eQ 一顿饭的事儿,搞懂Linux5种IO模型