参考资料:
《Tomcat源码解析系列(十一)ProtocolHandler》
《Tomcat源码解析系列(十二)NioEndpoint》
前文:
《Tomcat源码:启动类Bootstrap与Catalina的加载》
《Tomcat源码:容器的生命周期管理与事件监听》
《Tomcat源码:StandardServer与StandardService》
《Tomcat源码:Container接口》
《Tomcat源码:StandardEngine、StandardHost、StandardContext、StandardWrapper》
《Tomcat源码:Pipeline与Valve》
《Tomcat源码:连接器与Executor、Connector》
前言
前文中我们介绍了连接器与其入口Connector以及线程池Executor,而在Connector中则是通过protocolHandler的init与start启动了整个连接器,本文我们就来介绍下protocolHandler与其关键组件Endpoint的内容,以及线程池Executor是如何发挥其作用的。
目录
前言
一、ProtocolHandler
1、构造方法
1.1、Http11NioProtocol与其抽象父类
1.2、 AbstractProtocol与成员变量的赋值
2、生命周期方法
2.1、init方法
2.2、start方法
二、Endpoint
1、成员变量设置
2、init方法
2.1、bindOnInit
2.2、initServerSocket
3、start方法
3.1、startInternal
3.2、getExecutor
3.3、initializeConnectionLatch
3.4、 Poller与Acceptor
一、ProtocolHandler
1、构造方法
1.1、Http11NioProtocol与其抽象父类
在 Connector 的构造方法中,用反射创建了一个 Http11NioProtocol 对象,下面我们进入Http11NioProtocol的源码进行分析。
Http11NioProtocol构造方法里,第一步创建了NioEndpoint对象,该对象即我们上文中介绍的Endpoint组件,用来实现TCP/IP协议的(《Tomcat源码:连接器与Executor、Connector》)。第二步则是向上调用了父类AbstractHttp11JsseProtocol的构造方法。
public class Http11NioProtocol extends AbstractHttp11JsseProtocol<NioChannel> {
public Http11NioProtocol() {
this(new NioEndpoint());
}
public Http11NioProtocol(NioEndpoint endpoint) {
super(endpoint);
}
}
回溯到抽象父类AbstractHttp11JsseProtocol和AbstractHttp11Protocol中的构造方法中发现继续往上调用。需要注意的是这里AbstractHttp11Protocol中为连接设置了超时时间,而这个超时时间正是通过endpoint对象实现的(endpoint实际上就是限制了传输层的时间)。
public abstract class AbstractHttp11JsseProtocol<S> extends AbstractHttp11Protocol<S> {
public AbstractHttp11JsseProtocol(AbstractJsseEndpoint<S, ?> endpoint) {
super(endpoint);
}
}
public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> {
public AbstractHttp11Protocol(AbstractEndpoint<S, ?> endpoint) {
super(endpoint);
// 设置超时时间
setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
}
public void setConnectionTimeout(int timeout) {
endpoint.setConnectionTimeout(timeout);
}
}
1.2、 AbstractProtocol与成员变量的赋值
最后我们来到了抽象类 AbstractProtocol中,可以看到该类中有2个成员变量endpoint与handler,这里会将传过来的NioEndpoint对象赋值给endpoint。然后创建一个ConnectionHandler对象,赋值给handler,同时设置为NioEndpoint对象的属性,最后设置了Endpoint 对象的两个属性,setConnectionLinger 和 setTcpNoDelay 方法就是调用 Endpoint 对象的相关方法。
这里从注释里可以看出endpoint是用来处理低级网络I/O的,且必须要和ProtocolHandler的具体实现相匹配,比如使用了NIO模式的ProtocolHandler,那么也需要NIO模式的Endpoint。
public abstract class AbstractProtocol<S> implements ProtocolHandler, MBeanRegistration {
/**
* Endpoint that provides low-level network I/O - must be matched to the ProtocolHandler implementation
* (ProtocolHandler using NIO, requires NIO Endpoint etc.).
*/
private final AbstractEndpoint<S, ?> endpoint;
private Handler<S> handler;
public AbstractProtocol(AbstractEndpoint<S, ?> endpoint) {
this.endpoint = endpoint;
ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
setHandler(cHandler);
getEndpoint().setHandler(cHandler);
setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
}
protected void setHandler(Handler<S> handler) {
this.handler = handler;
}
protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S> {
private final AbstractProtocol<S> proto;
public ConnectionHandler(AbstractProtocol<S> proto) {
this.proto = proto;
}
}
2、生命周期方法
ProtocolHandler 的init 和 start 方法由子类AbstractProtocol与AbstractHttp11Protocol 实现/重载了。
2.1、init方法
AbstractHttp11Protocol中init方法有2部分是处理升级协议设置的,如果涉及到websocket或者HTTP2可能涉及,我们这里还是以主流的HTTP1.1的流程来分析,因此跳过下直接来看核心逻辑super.init(),调用抽象父类AbstractProtocol中的实现。
public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> {
public void init() throws Exception {
// 升级协议设置
for (UpgradeProtocol upgradeProtocol : upgradeProtocols) {
configureUpgradeProtocol(upgradeProtocol);
}
super.init();
// Http2相关配置
for (UpgradeProtocol upgradeProtocol : upgradeProtocols) {
if (upgradeProtocol instanceof Http2Protocol) {
((Http2Protocol) upgradeProtocol).setHttp11Protocol(this);
}
}
}
}
来到AbstractProtocol里中的init方法,我们发现其实就是调用成员endpoint的init方法。
public abstract class AbstractProtocol<S> implements ProtocolHandler, MBeanRegistration {
public void init() throws Exception {
// ...
String endpointName = getName();
endpoint.setName(endpointName.substring(1, endpointName.length() - 1));
endpoint.setDomain(domain);
endpoint.init();
}
}
2.2、start方法
start方法则是由AbstractProtocol类实现的,这里有2块重要的部分一个是调用endpoint的start方法,另一块则是启动异步线程调用AsyncTimeout的run方法。
public abstract class AbstractProtocol<S> implements ProtocolHandler, MBeanRegistration {
// The timeout thread.
private AsyncTimeout asyncTimeout = null;
public void start() throws Exception {
// ...
endpoint.start();
// Start timeout thread
asyncTimeout = new AsyncTimeout();
Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
int priority = endpoint.getThreadPriority();
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
priority = Thread.NORM_PRIORITY;
}
timeoutThread.setPriority(priority);
timeoutThread.setDaemon(true);
timeoutThread.start();
}
}
AsyncTimeout 是AbstractProtocol的内部类,结合上面的注释我们可以了解到这是一个用来处理超时线程的类,可以看出 startAsyncTimeout 方法的作用是定期调用 waitingProcessors 里的 Processor 对象的 timeoutAsync 方法来处理一些超时的请求。
public abstract class AbstractProtocol<S> implements ProtocolHandler, MBeanRegistration {
private final Set<Processor> waitingProcessors = Collections
.newSetFromMap(new ConcurrentHashMap<Processor, Boolean>());
protected class AsyncTimeout implements Runnable {
private volatile boolean asyncTimeoutRunning = true;
@Override
public void run() {
while (asyncTimeoutRunning) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
long now = System.currentTimeMillis();
for (Processor processor : waitingProcessors) {
processor.timeoutAsync(now);
}
}
}
}
}
到这里我们大致了解了ProtocolHandler的初始化与init、start方法的基本内容,可以了解到ProtocolHandler内包含了endpoint、handler与adapter这几个组件,而生命周期方法实际上就是调用了endpoint的init与start方法。
二、Endpoint
1、成员变量设置
在上文中,ProtocolHandler的构造方法最终创建了一个NioEndpoint对象和一个ConnectionHandler对象,并将ConnectionHandler赋值给了NioEndpoint的成员变量handlr。
从源码可以看出这个成员变量handlr是在NioEndpoint的抽象父类AbstractEndpoint中定义的。
public abstract class AbstractProtocol<S> implements ProtocolHandler, MBeanRegistration {
public AbstractProtocol(AbstractEndpoint<S, ?> endpoint) {
// ...
getEndpoint().setHandler(cHandler);
}
protected AbstractEndpoint<S, ?> getEndpoint() {
return endpoint;
}
}
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>{}
public abstract class AbstractJsseEndpoint<S,U> extends AbstractEndpoint<S,U> {}
public abstract class AbstractEndpoint<S,U> {
private Handler<S> handler = null;
public void setHandler(Handler<S> handler ) { this.handler = handler; }
}
2、init方法
ProtocolHandler的init方法最终调用了NioEndpoint的inti方法,而该方法的实现在其抽象父类 AbstractEndpoint 里,然后又回溯到了AbstractEndpoint中,可以看到这里通过bindWithCleanup()方法实现核心逻辑。
2.1、bindOnInit
从bindOnInit的注解可以看出这里的意思是控制Endpoint是否在初始化时绑定端口,通过前文我们知道Endpoint是传输层的实现,因此需要创建ServerSocketChannel 并进行监听端口,而这个参数正是控制端口监听的时机。
public abstract class AbstractJsseEndpoint<S,U> extends AbstractEndpoint<S,U> {
public void init() throws Exception {
testServerCipherSuitesOrderSupport();
super.init();
}
}
public abstract class AbstractEndpoint<S,U> {
/**
* Controls when the Endpoint binds the port. <code>true</code>, the default
* binds the port on {@link #init()} and unbinds it on {@link #destroy()}.
* If set to <code>false</code> the port is bound on {@link #start()} and
* unbound on {@link #stop()}.
*/
private boolean bindOnInit = true;
public void init() throws Exception {
if (bindOnInit) {
bindWithCleanup();
bindState = BindState.BOUND_ON_INIT;
}
// ...
}
}
bindWithCleanup方法继续调用bind方法。
public abstract class AbstractEndpoint<S,U> {
private void bindWithCleanup() throws Exception {
try {
bind();
} catch (Throwable t) {
// ...
}
}
}
bind方法的实现通过子类AbstractJsseEndpoint完成,这里最重要的就是第一步initServerSocket();
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
/**
* Initialize the endpoint.
*/
@Override
public void bind() throws Exception {
initServerSocket();
setStopLatch(new CountDownLatch(1));
// Initialize SSL if needed
initialiseSsl();
}
}
2.2、initServerSocket
initServerSocket方法内部逻辑可以看出来就是创建一个ServerSocketChannel 并设置了一些相关属性,然后绑定地址和端口进行监听。
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
/**
* Server socket "pointer".
*/
private volatile ServerSocketChannel serverSock = null;
private boolean useInheritedChannel = false;
public boolean getUseInheritedChannel() { return useInheritedChannel; }
protected void initServerSocket() throws Exception {
if (getUseInheritedChannel()) {
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
} else {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
serverSock.socket().bind(addr,getAcceptCount());
}
serverSock.configureBlocking(true); //mimic APR behavior
}
}
可以看出Endpoint的init方法目的就是创建ServerSocketChannel并监听请求。
3、start方法
start方法由于AbstractEndpoint类实现,其内部调用startInternal方法。
public abstract class AbstractEndpoint<S,U> {
public final void start() throws Exception {
if (bindState == BindState.UNBOUND) {
bindWithCleanup();
bindState = BindState.BOUND_ON_START;
}
startInternal();
}
}
3.1、startInternal
startInternal方法由NioEndpoint实现,这里先创建了三个SynchronizedStack对象来保存缓存,分别赋值给 processorCache,eventCache和nioChannels,这三个属性都使用来复用的,分别复用SocketProcessorBase对象,PollerEvent对象和NioChannel对象。其中 processorCache 在 AbstractEndpoint 里声明,其他两个在 NioEndpoint 里声明。然后就是一些方法调用,下面我们简单介绍下这些方法的作用。
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
protected volatile boolean running = false;
// Cache for SocketProcessor objects
protected SynchronizedStack<SocketProcessorBase<S>> processorCache;
// Cache for poller events
private SynchronizedStack<PollerEvent> eventCache;
// Bytebuffer cache, each channel holds a set of buffers
// (two, except for SSL holds four)
private SynchronizedStack<NioChannel> nioChannels;
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
if (socketProperties.getProcessorCache() != 0) {
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
}
if (socketProperties.getEventCache() != 0) {
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
}
if (socketProperties.getBufferPool() != 0) {
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
}
// Create worker collection
if (getExecutor() == null) {
createExecutor();
}
initializeConnectionLatch();
// Start poller thread
poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-Poller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
startAcceptorThread();
}
}
}
3.2、getExecutor
getExecutor这里会获取成员变量executor,这个变量的设置实际上是通过我们前文中介绍的ConnectorCreateRule(《Tomcat源码:连接器与Executor、Connector》)调用ProtocolHandler的setExecutor方法。
public class ConnectorCreateRule extends Rule {
private static void setExecutor(Connector con, Executor ex) throws Exception {
// 获取Connector中得ProtocolHandler,调用其setExecutor方法
Method m = IntrospectionUtils.findMethod(con.getProtocolHandler().getClass(),"setExecutor",new Class[] {java.util.concurrent.Executor.class});
if (m!=null) {
m.invoke(con.getProtocolHandler(), new Object[] {ex});
}else {
log.warn(sm.getString("connector.noSetExecutor", con));
}
}
}
ProtocolHandler的setExecutor方法内再调用endpoint.setExecutor(executor);为endpoint变量设置线程池,因此这里的getExecutor可以获取到在standardservice中创建的线程池。
public abstract class AbstractProtocol<S> implements ProtocolHandler, MBeanRegistration {
public void setExecutor(Executor executor) {
endpoint.setExecutor(executor);
}
}
public abstract class AbstractEndpoint<S,U> {
protected volatile boolean internalExecutor = true;
private Executor executor = null;
public void setExecutor(Executor executor) {
this.executor = executor;
this.internalExecutor = (executor == null);
}
public Executor getExecutor() { return executor; }
}
如果这里获取到的线程池为空则会自己创建一个,可以看到这里其实和我们之前介绍的StandardThreadExecutor中创建线程池的内容是一致的。
public abstract class AbstractEndpoint<S,U> {
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
}
3.3、initializeConnectionLatch
initializeConnectionLatch 方法初始换了 connectionLimitLatch 属性,这个属性是用来限制 tomcat 的最大连接数的,可以看到这里默认大小是8*1024。
protected LimitLatch initializeConnectionLatch() {
if (maxConnections==-1) {
return null;
}
if (connectionLimitLatch==null) {
connectionLimitLatch = new LimitLatch(getMaxConnections());
}
return connectionLimitLatch;
}
private int maxConnections = 8*1024;
public int getMaxConnections() { return this.maxConnections; }
3.4、 Poller与Acceptor
接下来启动了异步线程执行poller类的run方法。
// Start poller thread
poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-Poller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
最后是startAcceptorThread方法,可以看到这里创建了Acceptor对象并启动了异步线程执行run方法。
protected void startAcceptorThread() {
acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor";
acceptor.setThreadName(threadName);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
Endpoint方法的最后两步启动了两个异步线程Acceptor与Poller ,这两个组件构成了 tomcat 的线程模型,是非常重要的组件。其中Acceptor线程用于处理客户端连接,而Poller处理这些连接通道上的读写事件。
至此我们大致介绍完了Endpoint组件的基本内容,其中内容主要是创建ServerSocketChannel监听网络连接,创建/获取连接池,设置最大连接数,并启动线程模型Poller与Acceptor,他们是连接处理的关键,我们会在后续的文章中进行介绍。