前文:
《Tomcat源码:启动类Bootstrap与Catalina的加载》
《Tomcat源码:容器的生命周期管理与事件监听》
《Tomcat源码:StandardServer与StandardService》
《Tomcat源码:Container接口》
《Tomcat源码:StandardEngine、StandardHost、StandardContext、StandardWrapper》
《Tomcat源码:Pipeline与Valve》
《Tomcat源码:连接器与Executor、Connector》
《Tomcat源码:ProtocolHandler与Endpoint》
《Tomcat源码:Acceptor与Poller、PollerEvent》
前言
前文中我们介绍了Acceptor与Poller,其中Acceptor负责监听socket连接,并将请求转交到Poller中调用processSocket方法处理。
结合我们之前介绍连接器时的讲解,EndPoint 接收到 Socket 连接后,生成一个 SocketProcessor 任务提交到线程池去处理,SocketProcessor 的 Run 方法会调用 Processor 组件去解析应用层协议,这一操作的起点就是processSocket方法,下面我们就从该方法开始讲起。
目录
前言
一、SocketProcessor
1、processSocket
2、createSocketProcessor
3、doRun
3.1、isHandshakeComplete
3.2、ConnectionHandler
3.3、registerReadInterest、registerWriteInterest
二、ConnectionHandler
1、构造方法
2、process
2.1、removeWaitingProcessor、addWaitingProcessor
2.2、longPoll
2.3、release
三、Http11Processor
1、Http11Processor的创建
1.1、createProcessor
1.2、构造方法
2、process
3、service
一、SocketProcessor
1、processSocket
首先尝试从processorCache(AbstractEndpoint中的成员变量,前文中有过介绍,类型为SynchronizedStack<SocketProcessorBase<S>>)中获取现有的SocketProcessorBase对象,如果没有则调用createSocketProcessor方法创建,有的话则调用rest方法重置socket包装类与监听事件。
在SocketProcessor创建完了后,根据dispatch是否为true与 executor 是否为空来判断是否将当前连接的处理放入线程池中等待处理还是直接进行方法调用处理。
public abstract class AbstractEndpoint<S,U> {
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
}
2、createSocketProcessor
可以看到这个方法直接new了一个SocketProcessor对象出来
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
protected SocketProcessorBase<NioChannel> createSocketProcessor(
SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
return new SocketProcessor(socketWrapper, event);
}
}
}
SocketProcessor的构造方法使用了super来调用父类构造方法,而父类SocketProcessorBase的构造方法则是通过rest方法给socketWrapper与event这两个成员变量来赋值。
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
super(socketWrapper, event);
}
}
public abstract class SocketProcessorBase<S> implements Runnable {
protected SocketWrapperBase<S> socketWrapper;
protected SocketEvent event;
public SocketProcessorBase(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
reset(socketWrapper, event);
}
public void reset(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
Objects.requireNonNull(event);
this.socketWrapper = socketWrapper;
this.event = event;
}
}
3、doRun
run方法的实现在父类SocketProcessorBase中,可以看到这里将处理逻辑交给了doRun来实现
public abstract class SocketProcessorBase<S> implements Runnable {
public final void run() {
synchronized (socketWrapper) {
if (socketWrapper.isClosed()) {
return;
}
doRun();
}
}
}
doRun方法的实现在子类SocketProcessor 中,步骤比较长,但可以简单归结为两步,首先是根据socketWrapper与event的状态给handshake 赋值,然后根据handshake 的值走不同的处理逻辑。
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
protected void doRun() {
Poller poller = NioEndpoint.this.poller;
if (poller == null) {
socketWrapper.close();
return;
}
try {
int handshake = -1;
try {
if (socketWrapper.getSocket().isHandshakeComplete()) {
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
handshake = -1;
} else {
handshake = socketWrapper.getSocket().handshake(event == SocketEvent.OPEN_READ, event == SocketEvent.OPEN_WRITE);
event = SocketEvent.OPEN_READ;
}
} catch (IOException x) {
handshake = -1;
if (logHandshake.isDebugEnabled()) {
logHandshake.debug(sm.getString("endpoint.err.handshake",
socketWrapper.getRemoteAddr(), Integer.toString(socketWrapper.getRemotePort())), x);
}
} catch (CancelledKeyException ckx) {
handshake = -1;
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
poller.cancelledKey(getSelectionKey(), socketWrapper);
}
} else if (handshake == -1 ) {
getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
poller.cancelledKey(getSelectionKey(), socketWrapper);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
} catch (CancelledKeyException cx) {
poller.cancelledKey(getSelectionKey(), socketWrapper);
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error(sm.getString("endpoint.processing.fail"), t);
poller.cancelledKey(getSelectionKey(), socketWrapper);
} finally {
socketWrapper = null;
event = null;
//return to cache
if (running && processorCache != null) {
processorCache.push(this);
}
}
}
}
3.1、isHandshakeComplete
isHandshakeComplete分别在NioChannel 与其子类SecureNioChannel中进行了重写,这里我们按标准的请求流程来看下NioChannel中的实现,可以看到这里始终为true。后续调用的handshake方法也一样,在NioChannel中始终为0。
public class NioChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel {
public boolean isHandshakeComplete() {
return true;
}
public int handshake(boolean read, boolean write) throws IOException {
return 0;
}
}
3.2、ConnectionHandler
看过了上面NioChannel中的两个方法实现后,可以确定在doRun方法中handshake为0。然后我们看到这里如果event为空则默认当作读事件处理,处理方法为调用Handler的process方法。然后得到一个 SocketState 对象 state,如果 state 的值为SocketState.CLOSED,则执行 close(socket, key) 方法。
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
poller.cancelledKey(getSelectionKey(), socketWrapper);
}
} else if (handshake == -1 ) {
getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
poller.cancelledKey(getSelectionKey(), socketWrapper);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
这里的getHandler获取的是AbstractProtocol构造方法中传入的ConnectionHandler,该类我们会在下文介绍。
public AbstractProtocol(AbstractEndpoint<S, ?> endpoint) {
this.endpoint = endpoint;
ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
setHandler(cHandler);
getEndpoint().setHandler(cHandler);
setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
}
3.3、registerReadInterest、registerWriteInterest
如果 handshake 的值是 SelectionKey.OP_READ 或者 SelectionKey.OP_WRITE 的话,就调用 socketWrapper.registerReadInterest() 或者 socketWrapper.registerWriteInterest() 通过poller重新注册感兴趣事件。
public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> {
public void registerReadInterest() {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.debug.registerRead", this));
}
getPoller().add(this, SelectionKey.OP_READ);
}
public void registerWriteInterest() {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.debug.registerWrite", this));
}
getPoller().add(this, SelectionKey.OP_WRITE);
}
}
可以看出,SocketProcessor这里的作用是作为连接池中的一个执行单位来提交poller中传递过来的请求,而在其异步线程的处理步骤中,又会将任务递交给ConnectionHandler。
二、ConnectionHandler
1、构造方法
ConnectionHandler的构造方法中传入了Http11NioProtocol对象
protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S> {
private final AbstractProtocol<S> proto;
public ConnectionHandler(AbstractProtocol<S> proto) {
this.proto = proto;
}
}
2、process
process的过程比较复杂,这里单独拎出来我们本次要讲的标准HTTP1.1的流程来看。
首先是获取processor 对象,如果有的话将其从waitingProcessors中剔除,如果没有再尝试从缓存recycledProcessors中获取,都没有则调用createProcessor创建一个。processor对象有了之后调用其process方法,并返回方法结果state,后续根据该结果进行不同的处理。
protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S> {
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
// ...
S socket = wrapper.getSocket();
Processor processor = (Processor) wrapper.takeCurrentProcessor();
// ...
try{
// ...
if (processor != null) {
getProtocol().removeWaitingProcessor(processor);
} else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {
return SocketState.CLOSED;
}
// ...
if (processor == null) {
processor = recycledProcessors.pop();
}
if (processor == null) {
processor = getProtocol().createProcessor();
register(processor);
}
// ...
SocketState state = SocketState.CLOSED;
do {
state = processor.process(wrapper, status);
if (state == SocketState.UPGRADING) {
// ...
}
} while (state == SocketState.UPGRADING);
if (state == SocketState.LONG) {
longPoll(wrapper, processor);
if (processor.isAsync()) {
getProtocol().addWaitingProcessor(processor);
}
} else if (state == SocketState.OPEN) {
release(processor);
processor = null;
wrapper.registerReadInterest();
} else if (state == SocketState.SENDFILE) {
} else if (state == SocketState.UPGRADED) {
if (status != SocketEvent.OPEN_WRITE) {
longPoll(wrapper, processor);
getProtocol().addWaitingProcessor(processor);
}
} else if (state == SocketState.SUSPENDED) {
} else {
}
if (processor != null) {
wrapper.setCurrentProcessor(processor);
}
return state;
}
release(processor);
return SocketState.CLOSED;
}
}
2.1、removeWaitingProcessor、addWaitingProcessor
removeWaitingProcessor、addWaitingProcessor都是操作WaitingProcessor的方法
public abstract class AbstractProtocol<S> implements ProtocolHandler, MBeanRegistration {
public void removeWaitingProcessor(Processor processor) {
waitingProcessors.remove(processor);
}
public void addWaitingProcessor(Processor processor) {
waitingProcessors.add(processor);
}
}
2.2、longPoll
longPoll 判断是否异步,这里默认为false因此会调用 socket.registerReadInterest() 方法,该方法在上篇文章里讲过了,这里就不多赘述。
protected void longPoll(SocketWrapperBase<?> socket, Processor processor) {
if (!processor.isAsync()) {
socket.registerReadInterest();
}
}
2.3、release
从方法名可以推测出该方法用于释放process资源,如果不是升级协议则放入recycledProcessors缓存中供后续请求使用。
private void release(Processor processor) {
if (processor != null) {
processor.recycle();
if (processor.isUpgrade()) {
getProtocol().removeWaitingProcessor(processor);
} else {
recycledProcessors.push(processor);
if (getLog().isDebugEnabled()) {
getLog().debug("Pushed Processor [" + processor + "]");
}
}
}
}
ConnectionHandler也没有做什么操作,也是简单的讲请求递交给processor 组件来处理。
三、Http11Processor
1、Http11Processor的创建
1.1、createProcessor
上文中ConnectionHandler#process会调用createProcessor创建processor 对象,因为我们走的标准HTTP1.1所以这里的实现类为Http11Processor。在使用构造方法创建了Http11Processor对象后会为其设置相关属性,包括之前创建的Adapter对象等。
public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> {
protected Processor createProcessor() {
Http11Processor processor = new Http11Processor(this, getEndpoint());
processor.setAdapter(getAdapter());
processor.setMaxKeepAliveRequests(getMaxKeepAliveRequests());
processor.setConnectionUploadTimeout(getConnectionUploadTimeout());
processor.setDisableUploadTimeout(getDisableUploadTimeout());
processor.setRestrictedUserAgents(getRestrictedUserAgents());
processor.setMaxSavePostSize(getMaxSavePostSize());
return processor;
}
}
1.2、构造方法
Http11Processor构造方法第一步使用super调用了父类AbstractProcessor的构造方法,可以看到里面使用new创建了request(org.apache.coyote.Request)与response(org.apache.coyote.Response)对象,而子类中使用这两个对象创建httpParser(HttpParser)、inputBuffer(Http11InputBuffer)、outputBuffer(Http11OutputBuffer)等组件。
public Http11Processor(AbstractHttp11Protocol<?> protocol, AbstractEndpoint<?, ?> endpoint) {
super(endpoint);
this.protocol = protocol;
httpParser = new HttpParser(protocol.getRelaxedPathChars(), protocol.getRelaxedQueryChars());
inputBuffer = new Http11InputBuffer(request, protocol.getMaxHttpRequestHeaderSize(),
protocol.getRejectIllegalHeader(), httpParser);
request.setInputBuffer(inputBuffer);
outputBuffer = new Http11OutputBuffer(response, protocol.getMaxHttpResponseHeaderSize(),
protocol.getSendReasonPhrase());
response.setOutputBuffer(outputBuffer);
inputBuffer.addFilter(new IdentityInputFilter(protocol.getMaxSwallowSize()));
outputBuffer.addFilter(new IdentityOutputFilter());
inputBuffer.addFilter(
new ChunkedInputFilter(protocol.getMaxTrailerSize(), protocol.getAllowedTrailerHeadersInternal(),
protocol.getMaxExtensionSize(), protocol.getMaxSwallowSize()));
outputBuffer.addFilter(new ChunkedOutputFilter());
inputBuffer.addFilter(new VoidInputFilter());
outputBuffer.addFilter(new VoidOutputFilter());
inputBuffer.addFilter(new BufferedInputFilter(protocol.getMaxSwallowSize()));
outputBuffer.addFilter(new GzipOutputFilter());
pluggableFilterIndex = inputBuffer.getFilters().length;
}
public abstract class AbstractProcessor extends AbstractProcessorLight implements ActionHook {
protected final AbstractEndpoint<?, ?> endpoint;
protected final Request request;
protected final Response response;
public AbstractProcessor(AbstractEndpoint<?, ?> endpoint) {
this(endpoint, new Request(), new Response());
}
protected AbstractProcessor(AbstractEndpoint<?, ?> endpoint, Request coyoteRequest, Response coyoteResponse) {
this.endpoint = endpoint;
asyncStateMachine = new AsyncStateMachine(this);
request = coyoteRequest;
response = coyoteResponse;
response.setHook(this);
request.setResponse(response);
request.setHook(this);
userDataHelper = new UserDataHelper(getLog());
}
}
2、process
process方法在一个 do-while 循环里,根据不同的条件,分别处理,其中重要的处理是调用 dispatch 方法或者 service 方法。
public abstract class AbstractProcessorLight implements Processor {
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException {
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
if (!dispatches.hasNext()) {
state = checkForPipelinedData(state, socketWrapper);
}
} else if (status == SocketEvent.DISCONNECT) {
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
state = checkForPipelinedData(state, socketWrapper);
} else if (status == SocketEvent.OPEN_WRITE) {
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ) {
state = service(socketWrapper);
} else if (status == SocketEvent.CONNECT_FAIL) {
logAccess(socketWrapper);
} else {
state = SocketState.CLOSED;
}
if (isAsync()) {
state = asyncPostProcess();
}
if (dispatches == null || !dispatches.hasNext()) {
dispatches = getIteratorAndClearDispatches();
}
} while (state == SocketState.ASYNC_END || dispatches != null && state != SocketState.CLOSED);
return state;
}
}
从注释里可以看出,dispatch 方法是处理非标准 HTTP 模式下的正在处理中的请求,这是在 Servlet 3.0 Async 和 HTTP 升级连接里用到的。
/**
* Process an in-progress request that is not longer in standard HTTP mode. Uses currently include Servlet 3.0 Async
* and HTTP upgrade connections. Further uses may be added in the future. These will typically start as HTTP
* requests.
*
* @param status The event to process
*
* @return The state the caller should put the socket in when this method returns
*
* @throws IOException If an I/O error occurs during the processing of the request
*/
protected abstract SocketState dispatch(SocketEvent status) throws IOException;
与 dispatch 方法相对立,service 方法是用来处理标准的 HTTP 请求的。service 方法的实现 Http11Processor 里。
/**
* Service a 'standard' HTTP request. This method is called for both new requests and for requests that have
* partially read the HTTP request line or HTTP headers. Once the headers have been fully read this method is not
* called again until there is a new HTTP request to process. Note that the request type may change during
* processing which may result in one or more calls to {@link #dispatch(SocketEvent)}. Requests may be pipe-lined.
*
* @param socketWrapper The connection to process
*
* @return The state the caller should put the socket in when this method returns
*
* @throws IOException If an I/O error occurs during the processing of the request
*/
protected abstract SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException;
3、service
service先执行一下初步工作setSocketWrapper,内部调用了inputBuffer.init(socketWrapper) 、 outputBuffer.init(socketWrapper) 等,然后就进入一个 while 循环。在 while 循环里,是在 try-catch 语句块里执行inputBuffer.parseRequestLine(用来处理请求行)以及一些状态和属性的设置。
然后就执行调用 prepareRequestProtocol() 方法来对请求就行初步处理,也就是针对请求头里的一些属性加入一些 InputFilter 到 Http11InputBuffer 里。比如解析请求头里的 host,transfer-encoding,content-length 等。
最后就调用 Adapter 的方法进行处理了,也就是getAdapter().service(request, response);
public class Http11Processor extends AbstractProcessor {
public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Setting up the I/O
setSocketWrapper(socketWrapper);
// Flags
keepAlive = true;
openSocket = false;
readComplete = true;
boolean keptAlive = false;
SendfileState sendfileState = SendfileState.DONE;
while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
sendfileState == SendfileState.DONE && !endpoint.isPaused()) {
// Parsing the request header
try {
if (!inputBuffer.parseRequestLine(keptAlive)) {
if (inputBuffer.getParsingRequestLinePhase() == -1) {
return SocketState.UPGRADING;
} else if (handleIncompleteRequestLineRead()) {
break;
}
}
// Process the Protocol component of the request line
// Need to know if this is an HTTP 0.9 request before trying to
// parse headers.
prepareRequestProtocol();
}
// Has an upgrade been requested?
// ...
// Process the request in the adapter
if (getErrorState().isIoAllowed()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
getAdapter().service(request, response);
if (keepAlive && !getErrorState().isError() && !isAsync() &&
statusDropsConnection(response.getStatus())) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
}
}
}
}
}
可以看出Processor 接收来自 EndPoint 的 Socket,读取字节流解析成 Tomcat Request 和 Response 对象,最后将请求传递给了Adapter做进一步的处理。