使用背景
在Nacos
配置更新和Apollo的配置更新,我们可以看到长轮询(长连接)的身影。长连接的实现可以节约系统资源,长连接可以在连接建立后持续通信,避免频繁地建立和断开连接,减少系统开销。使用长连接可以保证连接的实时性,及时推送服务实例的上下线情况和配置信息,保证客户端能够及时地感知到变化。
Nacos
主要是通过AsyncContext
来实现长连接的。而Apollo
是通过DeferredResult
来实现的。DeferredResult
是SpringMvc
对于AsyncContext
的封装,都是SpringMvc
异步请求的实现方式。
AsyncContext Demo
@RequestMapping("/t1")
public void t1(HttpServletRequest request) {
AsyncContext asyncContext = request.startAsync();
asyncContext.start(new TestTask(asyncContext));
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
System.out.println("onComplete...");
}
@Override
public void onTimeout(AsyncEvent event) throws IOException {
System.out.println("onTimeout...");
}
@Override
public void onError(AsyncEvent event) throws IOException {
System.out.println("onError...");
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
System.out.println("onStartAsync...");
}
});
}
public static class TestTask implements Runnable {
private AsyncContext asyncContext;
public TestTask(AsyncContext asyncContext) {
this.asyncContext = asyncContext;
}
@SneakyThrows
@Override
public void run() {
Thread.sleep(6000);
asyncContext.getResponse().getWriter().write("hello world");
asyncContext.complete();
}
}
AsyncContext
源码解析
开启异步调用
Request#startAsync(ServletRequest,ServletResponse)
,异步调用的开启。设置延时时长,默认30s。
@Override
public AsyncContext startAsync(ServletRequest request,
ServletResponse response) {
if (!isAsyncSupported()) {
IllegalStateException ise =
new IllegalStateException(sm.getString("request.asyncNotSupported"));
log.warn(sm.getString("coyoteRequest.noAsync",
StringUtils.join(getNonAsyncClassNames())), ise);
throw ise;
}
if (asyncContext == null) {
asyncContext = new AsyncContextImpl(this);
}
asyncContext.setStarted(getContext(), request, response,
request==getRequest() && response==getResponse().getResponse());
asyncContext.setTimeout(getConnector().getAsyncTimeout());
return asyncContext;
}
AsyncContextImpl#setStarted
,执行action
。并且执行监听器的fireOnStartAsync
方法。
public void setStarted(Context context, ServletRequest request,
ServletResponse response, boolean originalRequestResponse) {
synchronized (asyncContextLock) {
this.request.getCoyoteRequest().action(
ActionCode.ASYNC_START, this);
this.context = context;
context.incrementInProgressAsyncCount();
this.servletRequest = request;
this.servletResponse = response;
this.hasOriginalRequestAndResponse = originalRequestResponse;
this.event = new AsyncEvent(this, request, response);
List<AsyncListenerWrapper> listenersCopy = new ArrayList<>(listeners);
listeners.clear();
if (log.isDebugEnabled()) {
log.debug(sm.getString("asyncContextImpl.fireOnStartAsync"));
}
for (AsyncListenerWrapper listener : listenersCopy) {
try {
listener.fireOnStartAsync(event);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.warn(sm.getString("asyncContextImpl.onStartAsyncError",
listener.getClass().getName()), t);
}
}
}
}
Request#action
,执行对应的请求标识。hook对象是Http11Processor
。
public void action(ActionCode actionCode, Object param) {
if (hook != null) {
if (param == null) {
hook.action(actionCode, this);
} else {
hook.action(actionCode, param);
}
}
}
AbstractProcessor#action
,根据状态值执行对应的方法。
@Override
public final void action(ActionCode actionCode, Object param) {
switch (actionCode) {
// 'Normal' servlet support
case COMMIT: {
if (!response.isCommitted()) {
try {
// Validate and write response headers
prepareResponse();
} catch (IOException e) {
handleIOException(e);
}
}
break;
}
// Servlet 3.0 asynchronous support
case ASYNC_START: {
asyncStateMachine.asyncStart((AsyncContextCallback) param);
break;
}
case ASYNC_COMPLETE: {
clearDispatches();
if (asyncStateMachine.asyncComplete()) {
processSocketEvent(SocketEvent.OPEN_READ, true);
}
break;
}
case ASYNC_DISPATCH: {
if (asyncStateMachine.asyncDispatch()) {
processSocketEvent(SocketEvent.OPEN_READ, true);
}
break;
}
case ASYNC_DISPATCHED: {
asyncStateMachine.asyncDispatched();
break;
}
}
}
AsyncStateMachine#asyncStart
,状态机执行开启请求。
synchronized void asyncStart(AsyncContextCallback asyncCtxt) {
if (state == AsyncState.DISPATCHED) {
generation.incrementAndGet();
updateState(AsyncState.STARTING);
// Note: In this instance, caller is responsible for calling
// asyncCtxt.incrementInProgressAsyncCount() as that allows simpler
// error handling.
this.asyncCtxt = asyncCtxt;
lastAsyncStart = System.currentTimeMillis();
} else {
throw new IllegalStateException(
sm.getString("asyncStateMachine.invalidAsyncState",
"asyncStart()", state));
}
}
运行异步
AsyncContextImpl#start
,执行ASYNC_RUN
的请求。
@Override
public void start(final Runnable run) {
if (log.isDebugEnabled()) {
logDebug("start ");
}
check();
Runnable wrapper = new RunnableWrapper(run, context, this.request.getCoyoteRequest());
this.request.getCoyoteRequest().action(ActionCode.ASYNC_RUN, wrapper);
}
AsyncStateMachine#asyncRun
,调用内部的线程池来执行。一般建议引入业务线程池来执行,不影响服务器的处理效率。
synchronized void asyncRun(Runnable runnable) {
if (state == AsyncState.STARTING || state == AsyncState.STARTED ||
state == AsyncState.READ_WRITE_OP) {
// Execute the runnable using a container thread from the
// Connector's thread pool. Use a wrapper to prevent a memory leak
ClassLoader oldCL;
if (Constants.IS_SECURITY_ENABLED) {
PrivilegedAction<ClassLoader> pa = new PrivilegedGetTccl();
oldCL = AccessController.doPrivileged(pa);
} else {
oldCL = Thread.currentThread().getContextClassLoader();
}
try {
if (Constants.IS_SECURITY_ENABLED) {
PrivilegedAction<Void> pa = new PrivilegedSetTccl(
this.getClass().getClassLoader());
AccessController.doPrivileged(pa);
} else {
Thread.currentThread().setContextClassLoader(
this.getClass().getClassLoader());
}
processor.execute(runnable);
} finally {
if (Constants.IS_SECURITY_ENABLED) {
PrivilegedAction<Void> pa = new PrivilegedSetTccl(
oldCL);
AccessController.doPrivileged(pa);
} else {
Thread.currentThread().setContextClassLoader(oldCL);
}
}
} else {
throw new IllegalStateException(
sm.getString("asyncStateMachine.invalidAsyncState",
"asyncRun()", state));
}
}
完成异步
AsyncContextImpl#complete
public void complete() {
if (log.isDebugEnabled()) {
logDebug("complete ");
}
check();
request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);
}
AbstractProcessor#processSocketEvent
protected void processSocketEvent(SocketEvent event, boolean dispatch) {
SocketWrapperBase<?> socketWrapper = getSocketWrapper();
if (socketWrapper != null) {
socketWrapper.processSocket(event, dispatch);
}
}
AbstractEndpoint#processSocket
,调用线程池处理Socket
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
DeferredResult Demo
@RequestMapping("/t3")
public DeferredResult<String> t3() {
final DeferredResult<String> dr = new DeferredResult<>();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
try {
Thread.sleep(6000);
dr.setResult("成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
return dr;
}
DeferredResult
源码解析
处理请求
DispatcherServlet#doDispatch
,获取WebAsyncManager
protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {
HttpServletRequest processedRequest = request;
HandlerExecutionChain mappedHandler = null;
boolean multipartRequestParsed = false;
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
// ...
}
WebAsyncUtils#getAsyncManager(ServletRequest)
,如果不存在就创建一个WebAsyncManager
public static WebAsyncManager getAsyncManager(ServletRequest servletRequest) {
WebAsyncManager asyncManager = null;
Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE);
if (asyncManagerAttr instanceof WebAsyncManager) {
asyncManager = (WebAsyncManager) asyncManagerAttr;
}
if (asyncManager == null) {
asyncManager = new WebAsyncManager();
servletRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager);
}
return asyncManager;
}
RequestMappingHandlerAdapter#handleInternal
,处理请求。进一步的封装WebAsyncManager
。第一次处理asyncManager.hasConcurrentResult()
,返回false。
protected ModelAndView handleInternal(HttpServletRequest request,
HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
ModelAndView mav;
checkRequest(request);
// Execute invokeHandlerMethod in synchronized block if required.
if (this.synchronizeOnSession) {
HttpSession session = request.getSession(false);
if (session != null) {
Object mutex = WebUtils.getSessionMutex(session);
synchronized (mutex) {
mav = invokeHandlerMethod(request, response, handlerMethod);
}
}
else {
// No HttpSession available -> no mutex necessary
mav = invokeHandlerMethod(request, response, handlerMethod);
}
}
else {
// No synchronization on session demanded at all...
mav = invokeHandlerMethod(request, response, handlerMethod);
}
if (!response.containsHeader(HEADER_CACHE_CONTROL)) {
if (getSessionAttributesHandler(handlerMethod).hasSessionAttributes()) {
applyCacheSeconds(response, this.cacheSecondsForSessionAttributeHandlers);
}
else {
prepareResponse(response);
}
}
return mav;
}
@Nullable
protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
ServletWebRequest webRequest = new ServletWebRequest(request, response);
try {
WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);
ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);
ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod);
if (this.argumentResolvers != null) {
invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
}
if (this.returnValueHandlers != null) {
invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
}
invocableMethod.setDataBinderFactory(binderFactory);
invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);
ModelAndViewContainer mavContainer = new ModelAndViewContainer();
mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
modelFactory.initModel(webRequest, mavContainer, invocableMethod);
mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
asyncWebRequest.setTimeout(this.asyncRequestTimeout);
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.setTaskExecutor(this.taskExecutor);
asyncManager.setAsyncWebRequest(asyncWebRequest);
asyncManager.registerCallableInterceptors(this.callableInterceptors);
asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);
if (asyncManager.hasConcurrentResult()) {
Object result = asyncManager.getConcurrentResult();
mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
asyncManager.clearConcurrentResult();
LogFormatUtils.traceDebug(logger, traceOn -> {
String formatted = LogFormatUtils.formatValue(result, !traceOn);
return "Resume with async result [" + formatted + "]";
});
invocableMethod = invocableMethod.wrapConcurrentResult(result);
}
invocableMethod.invokeAndHandle(webRequest, mavContainer);
if (asyncManager.isConcurrentHandlingStarted()) {
return null;
}
return getModelAndView(mavContainer, modelFactory, webRequest);
}
finally {
webRequest.requestCompleted();
}
}
WebAsyncManager#hasConcurrentResult
,判断结果是否等于RESULT_NONE
。默认就是RESULT_NONE
private volatile Object concurrentResult = RESULT_NONE;
public boolean hasConcurrentResult() {
return (this.concurrentResult != RESULT_NONE);
}
ServletInvocableHandlerMethod#invokeAndHandle
,如果返回值是DeferredResult
,会调用DeferredResultMethodReturnValueHandler
来处理返回值。
public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer,
Object... providedArgs) throws Exception {
Object returnValue = invokeForRequest(webRequest, mavContainer, providedArgs);
setResponseStatus(webRequest);
if (returnValue == null) {
if (isRequestNotModified(webRequest) || getResponseStatus() != null || mavContainer.isRequestHandled()) {
disableContentCachingIfNecessary(webRequest);
mavContainer.setRequestHandled(true);
return;
}
}
else if (StringUtils.hasText(getResponseStatusReason())) {
mavContainer.setRequestHandled(true);
return;
}
mavContainer.setRequestHandled(false);
Assert.state(this.returnValueHandlers != null, "No return value handlers");
try {
this.returnValueHandlers.handleReturnValue(
returnValue, getReturnValueType(returnValue), mavContainer, webRequest);
}
catch (Exception ex) {
if (logger.isTraceEnabled()) {
logger.trace(formatErrorForReturnValue(returnValue), ex);
}
throw ex;
}
}
返回值处理器
DeferredResultMethodReturnValueHandler#handleReturnValue
,如果返回值是ListenableFuture
或者CompletionStage
,进行转换成DeferredResult
。
@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
DeferredResult<?> result;
if (returnValue instanceof DeferredResult) {
result = (DeferredResult<?>) returnValue;
}
else if (returnValue instanceof ListenableFuture) {
result = adaptListenableFuture((ListenableFuture<?>) returnValue);
}
else if (returnValue instanceof CompletionStage) {
result = adaptCompletionStage((CompletionStage<?>) returnValue);
}
else {
// Should not happen...
throw new IllegalStateException("Unexpected return value type: " + returnValue);
}
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
}
WebAsyncManager#startDeferredResultProcessing
,设置异常处理器,超时时间,超时处理器
public void startDeferredResultProcessing(
final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
Assert.notNull(deferredResult, "DeferredResult must not be null");
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
Long timeout = deferredResult.getTimeoutValue();
if (timeout != null) {
this.asyncWebRequest.setTimeout(timeout);
}
List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>();
interceptors.add(deferredResult.getInterceptor());
interceptors.addAll(this.deferredResultInterceptors.values());
interceptors.add(timeoutDeferredResultInterceptor);
final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
this.asyncWebRequest.addTimeoutHandler(() -> {
try {
interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
});
this.asyncWebRequest.addErrorHandler(ex -> {
if (!this.errorHandlingInProgress) {
try {
if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) {
return;
}
deferredResult.setErrorResult(ex);
}
catch (Throwable interceptorEx) {
setConcurrentResultAndDispatch(interceptorEx);
}
}
});
this.asyncWebRequest.addCompletionHandler(()
-> interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult));
interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult);
startAsyncProcessing(processingContext);
try {
interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
deferredResult.setResultHandler(result -> {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
setConcurrentResultAndDispatch(result);
});
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
}
private void startAsyncProcessing(Object[] processingContext) {
synchronized (WebAsyncManager.this) {
this.concurrentResult = RESULT_NONE;
this.concurrentResultContext = processingContext;
this.errorHandlingInProgress = false;
}
this.asyncWebRequest.startAsync();
if (logger.isDebugEnabled()) {
logger.debug("Started async request");
}
}
StandardServletAsyncWebRequest#startAsync
,调用的是tomcat的异步请求实现。
@Override
public void startAsync() {
Assert.state(getRequest().isAsyncSupported(),
"Async support must be enabled on a servlet and for all filters involved " +
"in async request processing. This is done in Java code using the Servlet API " +
"or by adding \"<async-supported>true</async-supported>\" to servlet and " +
"filter declarations in web.xml.");
Assert.state(!isAsyncComplete(), "Async processing has already completed");
if (isAsyncStarted()) {
return;
}
this.asyncContext = getRequest().startAsync(getRequest(), getResponse());
this.asyncContext.addListener(this);
if (this.timeout != null) {
this.asyncContext.setTimeout(this.timeout);
}
}
异步设置结果
DeferredResult#setResult
,设置结果集。
public boolean setResult(T result) {
return setResultInternal(result);
}
private boolean setResultInternal(Object result) {
// Immediate expiration check outside of the result lock
if (isSetOrExpired()) {
return false;
}
DeferredResultHandler resultHandlerToUse;
synchronized (this) {
// Got the lock in the meantime: double-check expiration status
if (isSetOrExpired()) {
return false;
}
// At this point, we got a new result to process
this.result = result;
resultHandlerToUse = this.resultHandler;
if (resultHandlerToUse == null) {
// No result handler set yet -> let the setResultHandler implementation
// pick up the result object and invoke the result handler for it.
return true;
}
this.resultHandler = null;
}
resultHandlerToUse.handleResult(result);
return true;
}
在WebAsyncManager#startDeferredResultProcessing
有设置过结果处理器。
public void startDeferredResultProcessing(
final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
// ...
try {
interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
deferredResult.setResultHandler(result -> {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
setConcurrentResultAndDispatch(result);
});
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
}
WebAsyncManager#setConcurrentResultAndDispatch
,执行this.asyncWebRequest.dispatch()
。重新设置结果,concurrentResult = result;
。
private void setConcurrentResultAndDispatch(Object result) {
synchronized (WebAsyncManager.this) {
if (this.concurrentResult != RESULT_NONE) {
return;
}
this.concurrentResult = result;
this.errorHandlingInProgress = (result instanceof Throwable);
}
if (this.asyncWebRequest.isAsyncComplete()) {
if (logger.isDebugEnabled()) {
logger.debug("Async result set but request already complete: " + formatRequestUri());
}
return;
}
if (logger.isDebugEnabled()) {
boolean isError = result instanceof Throwable;
logger.debug("Async " + (isError ? "error" : "result set") + ", dispatch to " + formatRequestUri());
}
this.asyncWebRequest.dispatch();
}
AsyncContextImpl#dispatch(String)
,进行转发请求。使用AsyncRunnable
重新发起请求。
@Override
public void dispatch(String path) {
check();
dispatch(getRequest().getServletContext(), path);
}
@Override
public void dispatch(ServletContext servletContext, String path) {
synchronized (asyncContextLock) {
if (log.isDebugEnabled()) {
logDebug("dispatch ");
}
check();
if (dispatch != null) {
throw new IllegalStateException(
sm.getString("asyncContextImpl.dispatchingStarted"));
}
if (request.getAttribute(ASYNC_REQUEST_URI)==null) {
request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI());
request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath());
request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath());
request.setAttribute(ASYNC_PATH_INFO, request.getPathInfo());
request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString());
}
final RequestDispatcher requestDispatcher = servletContext.getRequestDispatcher(path);
if (!(requestDispatcher instanceof AsyncDispatcher)) {
throw new UnsupportedOperationException(
sm.getString("asyncContextImpl.noAsyncDispatcher"));
}
final AsyncDispatcher applicationDispatcher =
(AsyncDispatcher) requestDispatcher;
final ServletRequest servletRequest = getRequest();
final ServletResponse servletResponse = getResponse();
this.dispatch = new AsyncRunnable(
request, applicationDispatcher, servletRequest, servletResponse);
this.request.getCoyoteRequest().action(ActionCode.ASYNC_DISPATCH, null);
clearServletRequestResponse();
}
}
二次处理
RequestMappingHandlerAdapter#invokeHandlerMethod
,核心在于asyncManager.hasConcurrentResult()
判断为true,invocableMethod = invocableMethod.wrapConcurrentResult(result);
,处理方法被替换成新方法。
protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
ServletWebRequest webRequest = new ServletWebRequest(request, response);
try {
WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);
ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);
ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod);
if (this.argumentResolvers != null) {
invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
}
if (this.returnValueHandlers != null) {
invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
}
invocableMethod.setDataBinderFactory(binderFactory);
invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);
ModelAndViewContainer mavContainer = new ModelAndViewContainer();
mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
modelFactory.initModel(webRequest, mavContainer, invocableMethod);
mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
asyncWebRequest.setTimeout(this.asyncRequestTimeout);
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.setTaskExecutor(this.taskExecutor);
asyncManager.setAsyncWebRequest(asyncWebRequest);
asyncManager.registerCallableInterceptors(this.callableInterceptors);
asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);
if (asyncManager.hasConcurrentResult()) {
Object result = asyncManager.getConcurrentResult();
mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
asyncManager.clearConcurrentResult();
LogFormatUtils.traceDebug(logger, traceOn -> {
String formatted = LogFormatUtils.formatValue(result, !traceOn);
return "Resume with async result [" + formatted + "]";
});
invocableMethod = invocableMethod.wrapConcurrentResult(result);
}
invocableMethod.invokeAndHandle(webRequest, mavContainer);
if (asyncManager.isConcurrentHandlingStarted()) {
return null;
}
return getModelAndView(mavContainer, modelFactory, webRequest);
}
finally {
webRequest.requestCompleted();
}
}
ServletInvocableHandlerMethod#wrapConcurrentResult
,封装
ServletInvocableHandlerMethod wrapConcurrentResult(Object result) {
return new ConcurrentResultHandlerMethod(result, new ConcurrentResultMethodParameter(result));
}
ServletInvocableHandlerMethod.ConcurrentResultHandlerMethod
,封装了Callable
的处理器。
private static final Method CALLABLE_METHOD = ClassUtils.getMethod(Callable.class, "call");
public ConcurrentResultHandlerMethod(final Object result, ConcurrentResultMethodParameter returnType) {
super((Callable<Object>) () -> {
if (result instanceof Exception) {
throw (Exception) result;
}
else if (result instanceof Throwable) {
throw new NestedServletException("Async processing failed", (Throwable) result);
}
return result;
}, CALLABLE_METHOD);
if (ServletInvocableHandlerMethod.this.returnValueHandlers != null) {
setHandlerMethodReturnValueHandlers(ServletInvocableHandlerMethod.this.returnValueHandlers);
}
this.returnType = returnType;
}
ServletInvocableHandlerMethod#invokeAndHandle
,获取返回值。
public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer,
Object... providedArgs) throws Exception {
Object returnValue = invokeForRequest(webRequest, mavContainer, providedArgs);
setResponseStatus(webRequest);
if (returnValue == null) {
if (isRequestNotModified(webRequest) || getResponseStatus() != null || mavContainer.isRequestHandled()) {
disableContentCachingIfNecessary(webRequest);
mavContainer.setRequestHandled(true);
return;
}
}
else if (StringUtils.hasText(getResponseStatusReason())) {
mavContainer.setRequestHandled(true);
return;
}
mavContainer.setRequestHandled(false);
Assert.state(this.returnValueHandlers != null, "No return value handlers");
try {
this.returnValueHandlers.handleReturnValue(
returnValue, getReturnValueType(returnValue), mavContainer, webRequest);
}
catch (Exception ex) {
if (logger.isTraceEnabled()) {
logger.trace(formatErrorForReturnValue(returnValue), ex);
}
throw ex;
}
}
InvocableHandlerMethod#invokeForRequest
,就会调用Callable方法,获取结果。
public Object invokeForRequest(NativeWebRequest request, @Nullable ModelAndViewContainer mavContainer,
Object... providedArgs) throws Exception {
Object[] args = getMethodArgumentValues(request, mavContainer, providedArgs);
if (logger.isTraceEnabled()) {
logger.trace("Arguments: " + Arrays.toString(args));
}
return doInvoke(args);
}
@Nullable
protected Object doInvoke(Object... args) throws Exception {
Method method = getBridgedMethod();
try {
if (KotlinDetector.isSuspendingFunction(method)) {
return CoroutinesUtils.invokeSuspendingFunction(method, getBean(), args);
}
return method.invoke(getBean(), args);
}
catch (IllegalArgumentException ex) {
assertTargetBean(method, getBean(), args);
String text = (ex.getMessage() != null ? ex.getMessage() : "Illegal argument");
throw new IllegalStateException(formatInvokeError(text, args), ex);
}
catch (InvocationTargetException ex) {
// Unwrap for HandlerExceptionResolvers ...
Throwable targetException = ex.getTargetException();
if (targetException instanceof RuntimeException) {
throw (RuntimeException) targetException;
}
else if (targetException instanceof Error) {
throw (Error) targetException;
}
else if (targetException instanceof Exception) {
throw (Exception) targetException;
}
else {
throw new IllegalStateException(formatInvokeError("Invocation failure", args), targetException);
}
}
}
总结一下
- 控制器返回
DeferredResult
,返回值处理器会使用DeferredResultMethodReturnValueHandler
,开启异步处理。 - 另一个线程处理结束后会执行
setResult
方法,并进行请求转发。 - 创建Callable对象,反射调用获取返回值。