NioEndpoint组件
Tomcat的NioEndpoint实现了I/O多路复用模型。
工作流程
Java的多路复用器的使用:
- 创建一个Selector,在其上注册感兴趣的事件,然后调用select方法,等待感兴趣的事情发生
- 感兴趣的事情发生了,比如可读了,就创建一个新的线程从Channel中读数据
NioEndpoint包含LimitLatch、Acceptor、Poller、SocketProcessor和Executor共5个组件。
看网上有些说Poller是多线程处理的,但是在我使用的tomcat版本9.0.39中,一个Acceptor只会对应一个Poller。
LimitLatch
通过aqs实现的共享锁
连接控制器,控制最大连接数,NIO模式下默认是8192。
可以通过maxConnections参数配置
server:
tomcat:
maxConnections: 10000
当连接数到达最大时阻塞线程,直到后续组件处理完一个连接后将连接数减1。 到达最大连接数后,os底层还是会接收客户端连接,但用户层已不再接收。 核心代码:
public class LimitLatch {
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected int tryAcquireShared() {
long newCount = count.incrementAndGet();
if (newCount > limit) {
count.decrementAndGet();
return -1;
} else {
return 1;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();
return true;
}
}
private final Sync sync;
private final AtomicLong count;
private volatile long limit;
// 线程调用该方法,获得接收新连接的许可,线程可能被阻塞
public void countUpOrAwait() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 调用这个方法来释放一个连接许可,则前面阻塞的线程可能被唤醒
public long countDown() {
sync.releaseShared(0);
long result = getCount();
return result;
}
}
用户线程调用LimitLatch#countUpOrAwait拿到锁,若无法获取,则该线程会被阻塞在AQS队列。 AQS又是怎么知道是阻塞还是不阻塞用户线程的呢? 由AQS的使用者决定,即内部类Sync决定,因为Sync类重写了AQS#tryAcquireShared():若当前连接数count < limit,线程能获取锁,返回1,否则返回-1。
如何用户线程被阻塞到了AQS的队列,由Sync内部类决定什么时候唤醒,Sync重写AQS#tryReleaseShared(),当一个连接请求处理完了,又可以接收新连接,这样前面阻塞的线程将会被唤醒。
连接超时释放,请求出错等情况会关闭连接,因为http也是长连接,因此并不会处理完毕一个请求立马就释放tcp连接
LimitLatch用来限制应用接收连接的数量,Acceptor用来限制系统层面的连接数量,首先是LimitLatch限制,应用层处理不过来了,连接才会堆积在操作系统的Queue,而Queue的大小由acceptCount控制。
server:
tomcat:
acceptCount: 10000
看下 socket的bind方法的第二个参数,backlog,调用方处传值即为accept-count
backlog的定义是已经建立tcp连接但未进行accept处理的SOCKET队列大小,已是(并非syn的SOCKET队列)。如果这个队列满了,将会发送一个ECONNREFUSED错误信息给到客户端,即 “Connection refused”
Acceptor
Acceptor实现了Runnable接口,因此可以跑在单独线程里,在这个死循环里调用accept接收新连接。一旦有新连接请求到达,accept方法返回一个Channel对象,接着把Channel对象交给Poller去处理。
一个端口号只能对应一个ServerSocketChannel,因此这个ServerSocketChannel是在多个Acceptor线程之间共享的,它是Endpoint的属性,由Endpoint完成初始化和端口绑定。 可以同时有过个Acceptor调用accept方法,accept是线程安全的。
org.apache.tomcat.util.net.NioEndpoint#bind方法初始化
/**
* Initialize the endpoint.
*/
@Override
public void bind() throws Exception {
initServerSocket();
setStopLatch(new CountDownLatch(1));
// Initialize SSL if needed
initialiseSsl();
selectorPool.open(getName());
}
初始化
protected void initServerSocket() throws Exception {
if (!getUseInheritedChannel()) {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
serverSock.socket().bind(addr,getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
// 阻塞模式
serverSock.configureBlocking(true); //mimic APR behavior
}
- bind方法的 getAcceptCount() 参数表示os的等待队列长度。当应用层的连接数到达最大值时,os可以继续接收连接,os能继续接收的最大连接数就是这个队列长度,可以通过acceptCount参数配置,默认是100
- ServerSocketChannel通过accept()接受新的连接,accept()方法返回获得SocketChannel对象,然后将SocketChannel对象封装在一个PollerEvent对象中,并将PollerEvent对象压入Poller的Queue里。 这是个典型的“生产者-消费者”模式,Acceptor与Poller线程之间通过Queue通信。
具体逻辑在
acceptor的run方法
@Override
protected boolean setSocketOptions(SocketChannel socket) {
NioSocketWrapper socketWrapper = null;
try {
// Allocate channel and wrapper
NioChannel channel = null;
if (nioChannels != null) {
channel = nioChannels.pop();
}
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(bufhandler, selectorPool, this);
} else {
channel = new NioChannel(bufhandler);
}
}
NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
channel.reset(socket, newWrapper);
connections.put(socket, newWrapper);
socketWrapper = newWrapper;
// Set socket properties
// Disable blocking, polling will be used
// 设置等待读写事件非阻塞,依赖selector轮询
socket.configureBlocking(false);
//设置属性
socketProperties.setProperties(socket.socket());
//设置连接读事件超时时间,超时自动断开连接
socketWrapper.setReadTimeout(getConnectionTimeout());
//设置连接写事件超时时间,超时自动断开连接
socketWrapper.setWriteTimeout(getConnectionTimeout());
//设置一次连接的最大请求数,默认100
socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
//将socket注册到selector,轮询read事件
poller.register(channel, socketWrapper);
return true;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
if (socketWrapper == null) {
destroySocket(socket);
}
}
// Tell to close the socket if needed
return false;
}
这里是对nio的channel做了包装处理。
需要注意的是这里将setReadTimeout和setWriteTimeout都设置成了connecionTimeOut
Poller
本质是一个Selector,也跑在单独线程里。
NioEndpointendPoint中启动poller
启org.apache.tomcat.util.net.NioEndpoint#startInternal中,这里看到我这个版本的Poller是单线程处理的。
// Start poller thread
poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
startAcceptorThread();
Poller在内部维护一个Channel数组,它在一个死循环里不断检测Channel的数据就绪状态,一旦有Channel可读,就生成一个SocketProcessor任务对象扔给Executor去处理。
内核空间的接收连接是对每个连接都产生一个channel,该channel就是Acceptor里accept方法得到的scoketChannel,后面的Poller在用selector#select监听内核是否准备就绪,才知道监听内核哪个channel。
维护了一个 Queue:
poller将channel注册到selector上,通过selector轮询每个channel的事件,进行处理。
职责
- Poller不断的通过内部的Selector对象向内核查询Channel状态,一旦可读就生成任务类SocketProcessor交给Executor处理
Poller#run
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
// If we are here, means we have other stuff to do
// Do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
// Either we timed out or we woke up, process events first
if (keyCount == 0) {
hasEvents = (hasEvents | events());
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (socketWrapper == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, socketWrapper);
}
}
// Process timeouts
timeout(keyCount,hasEvents);
}
getStopLatch().countDown();
}
(1)尝试从SynchronizedStack nioChannels 中获取信道
(2)如果上一步未获取到则创建SocketBufferHandler对象
(3)使用SocketBufferHandler对象创建NioChannel对象
(4)使用NioChannel与NioEndpoint对象创建NioSocketWrapper对象
(5)为NioChannel对象设置SocketChannel与NioSocketWrapper对象作为属性
(6)为NioSocketWrapper对象设置属性并注册入poller中
org.apache.tomcat.util.net.NioEndpoint.Poller#timeout
- Poller循环遍历检查自己所管理的SocketChannel是否已超时。若超时就关闭该SocketChannel,释放连接数
protected void timeout(int keyCount, boolean hasEvents) {
long now = System.currentTimeMillis();
// This method is called on every loop of the Poller. Don't process
// timeouts on every loop of the Poller since that would create too
// much load and timeouts can afford to wait a few seconds.
// However, do process timeouts if any of the following are true:
// - the selector simply timed out (suggests there isn't much load)
// - the nextExpiration time has passed
// - the server socket is being closed
if (nextExpiration > 0 && (keyCount > 0 || hasEvents) && (now < nextExpiration) && !close) {
return;
}
int keycount = 0;
try {
for (SelectionKey key : selector.keys()) {
keycount++;
try {
NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if (socketWrapper == null) {
// We don't support any keys without attachments
cancelledKey(key, null);
} else if (close) {
key.interestOps(0);
// Avoid duplicate stop calls
socketWrapper.interestOps(0);
cancelledKey(key, socketWrapper);
} else if ((socketWrapper.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ ||
(socketWrapper.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
boolean readTimeout = false;
boolean writeTimeout = false;
// Check for read timeout
if ((socketWrapper.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
long delta = now - socketWrapper.getLastRead();
long timeout = socketWrapper.getReadTimeout();
if (timeout > 0 && delta > timeout) {
readTimeout = true;
}
}
// Check for write timeout
if (!readTimeout && (socketWrapper.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
long delta = now - socketWrapper.getLastWrite();
long timeout = socketWrapper.getWriteTimeout();
if (timeout > 0 && delta > timeout) {
writeTimeout = true;
}
}
if (readTimeout || writeTimeout) {
key.interestOps(0);
// Avoid duplicate timeout calls
socketWrapper.interestOps(0);
socketWrapper.setError(new SocketTimeoutException());
if (readTimeout && socketWrapper.readOperation != null) {
if (!socketWrapper.readOperation.process()) {
cancelledKey(key, socketWrapper);
}
} else if (writeTimeout && socketWrapper.writeOperation != null) {
if (!socketWrapper.writeOperation.process()) {
cancelledKey(key, socketWrapper);
}
} else if (!processSocket(socketWrapper, SocketEvent.ERROR, true)) {
cancelledKey(key, socketWrapper);
}
}
}
} catch (CancelledKeyException ckx) {
cancelledKey(key, (NioSocketWrapper) key.attachment());
}
}
} catch (ConcurrentModificationException cme) {
// See https://bz.apache.org/bugzilla/show_bug.cgi?id=57943
log.warn(sm.getString("endpoint.nio.timeoutCme"), cme);
}
// For logging purposes only
long prevExp = nextExpiration;
nextExpiration = System.currentTimeMillis() +
socketProperties.getTimeoutInterval();
if (log.isTraceEnabled()) {
log.trace("timeout completed: keys processed=" + keycount +
"; now=" + now + "; nextExpiration=" + prevExp +
"; keyCount=" + keyCount + "; hasEvents=" + hasEvents +
"; eval=" + ((now < prevExp) && (keyCount>0 || hasEvents) && (!close) ));
}
}
}
判断超时是通过readTimeOut和writeTimeOut,看了前面我们知道这两个值实际上就是connecionTimeOut
值得注意的是,在线程池处理消息时
在读取消息时,例如post请求读取输入流请求体时,或者写消息到输出流,超时时间也为connecionTimeOut。如果connecionTimeOut设置太大,碰到请求提比较大的情况,或者客户端发送速度慢情况,反而会导致连接被一致占据,线程资源得不到释放,导致无法处理新请求,吞吐量下降,响应速度降低。
Caused by: org.apache.catalina.connector.ClientAbortException: java.net.SocketTimeoutException
at org.apache.catalina.connector.InputBuffer.realReadBytes(InputBuffer.java:340)
at org.apache.catalina.connector.InputBuffer.checkByteBufferEof(InputBuffer.java:632)
at org.apache.catalina.connector.InputBuffer.readByte(InputBuffer.java:350)
at org.apache.catalina.connector.CoyoteInputStream.read(CoyoteInputStream.java:84)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at java.io.PushbackInputStream.read(PushbackInputStream.java:139)
at org.springframework.web.servlet.mvc.method.annotation.AbstractMessageConverterMethodArgumentResolver$EmptyBodyCheckingHttpInputMessage.<init>(AbstractMessageConverterMethodArgumentResolver.java:325)
at org.springframework.web.servlet.mvc.method.annotation.AbstractMessageConverterMethodArgumentResolver.readWithMessageConverters(AbstractMessageConverterMethodArgumentResolver.java:194)
写入消息也同理,如果发送缓冲区满了,那么write方法会阻塞,如果超多connection也会抛出SocketTimeOut错误,及时释放资源
private int fillReadBuffer(boolean block, ByteBuffer to) throws IOException {
int nRead;
NioChannel socket = getSocket();
if (socket == NioChannel.CLOSED_NIO_CHANNEL) {
throw new ClosedChannelException();
}
if (block) {
Selector selector = null;
try {
selector = pool.get();
} catch (IOException x) {
// Ignore
}
try {
nRead = pool.read(to, socket, selector, getReadTimeout());
} finally {
if (selector != null) {
pool.put(selector);
}
}
} else {
nRead = socket.read(to);
if (nRead == -1) {
throw new EOFException();
}
}
return nRead;
}
@Override
protected void doWrite(boolean block, ByteBuffer from) throws IOException {
NioChannel socket = getSocket();
if (socket == NioChannel.CLOSED_NIO_CHANNEL) {
throw new ClosedChannelException();
}
if (block) {
long writeTimeout = getWriteTimeout();
Selector selector = null;
try {
selector = pool.get();
} catch (IOException x) {
// Ignore
}
try {
pool.write(from, socket, selector, writeTimeout);
// Make sure we are flushed
do {
} while (!socket.flush(true, selector, writeTimeout));
} finally {
if (selector != null) {
pool.put(selector);
}
}
// If there is data left in the buffer the socket will be registered for
// write further up the stack. This is to ensure the socket is only
// registered for write once as both container and user code can trigger
// write registration.
} else {
int n = 0;
do {
n = socket.write(from);
if (n == -1) {
throw new EOFException();
}
} while (n > 0 && from.hasRemaining());
}
updateLastWrite();
}
任务类交给线程池处理
protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
try {
if (close) {
cancelledKey(sk, socketWrapper);
} else if (sk.isValid() && socketWrapper != null) {
if (sk.isReadable() || sk.isWritable()) {
if (socketWrapper.getSendfileData() != null) {
processSendfile(sk, socketWrapper, false);
} else {
unreg(sk, socketWrapper, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
if (socketWrapper.readOperation != null) {
if (!socketWrapper.readOperation.process()) {
closeSocket = true;
}
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (socketWrapper.writeOperation != null) {
if (!socketWrapper.writeOperation.process()) {
closeSocket = true;
}
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk, socketWrapper);
}
}
}
} else {
// Invalid key
cancelledKey(sk, socketWrapper);
}
} catch (CancelledKeyException ckx) {
cancelledKey(sk, socketWrapper);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
}
}
SocketProcessor
Poller会创建SocketProcessor任务类交给线程池处理,而SocketProcessor实现了Runnable接口,用来定义Executor中线程所执行的任务,主要就是调用Http11Processor组件处理请求:Http11Processor读取Channel的数据来生成ServletRequest对象。
Http11Processor并非直接读取Channel。因为Tomcat支持同步非阻塞I/O、异步I/O模型,在Java API中,对应Channel类不同,比如有AsynchronousSocketChannel和SocketChannel,为了对Http11Processor屏蔽这些差异,Tomcat设计了一个包装类叫作SocketWrapper,Http11Processor只调用SocketWrapper的方法去读写数据。
Executor
线程池,负责运行SocketProcessor任务类,SocketProcessor的run方法会调用Http11Processor来读取和解析请求数据。我们知道,Http11Processor是应用层协议的封装,它会调用容器获得响应,再把响应通过Channel写出。
Tomcat定制的线程池,它负责创建真正干活的工作线程。就是执行SocketProcessor#run,即解析请求并通过容器来处理请求,最终调用Servlet。
因此从并发数的角度看,配置好这个线程十分重要
maxThreads=“1000” 最大并发数
minSpareThreads=“100”///初始化时创建的线程数
maxSpareThreads=“500”///一旦创建的线程超过这个值,Tomcat就会关闭不再需要的socket线程。
Tomcat的高并发设计
高并发就是能快速地处理大量请求,需合理设计线程模型让CPU忙起来,尽量不要让线程阻塞,因为一阻塞,CPU就闲了。 有多少任务,就用相应规模线程数去处理。 比如NioEndpoint要完成三件事情:接收连接、检测I/O事件和处理请求,关键就是把这三件事情分别定制线程数处理:
- 专门的线程组去跑Acceptor,并且Acceptor的个数可以配置
- 专门的线程组去跑Poller,Poller的个数也可以配置
- 具体任务的执行也由专门的线程池来处理,也可以配置线程池的大小
总结
I/O模型是为了解决内存和外部设备速度差异。
- 所谓阻塞或非阻塞是指应用程序在发起I/O操作时,是立即返回还是等待
- 同步和异步,是指应用程序在与内核通信时,数据从内核空间到应用空间的拷贝,是由内核主动发起还是由应用程序来触发。
Tomcat#Endpoint组件的主要工作就是处理I/O,而NioEndpoint利用Java NIO API实现了多路复用I/O模型。 读写数据的线程自己不会阻塞在I/O等待上,而是把这个工作交给Selector。
当客户端发起一个HTTP请求时,首先由Acceptor#run中的
socket = endpoint.serverSocketAccept();
接收连接,然后传递给名称为Poller的线程去侦测I/O事件,Poller线程会一直select,选出内核将数据从网卡拷贝到内核空间的 channel(也就是内核已经准备好数据)然后交给名称为Catalina-exec的线程去处理,这个过程也包括内核将数据从内核空间拷贝到用户空间这么一个过程,所以对于exec线程是阻塞的,此时用户空间(也就是exec线程)就接收到了数据,可以解析然后做业务处理了。