【源码解析】SpringBoot使用DeferredResult实现长轮询的原理分析

news2025/1/23 13:52:52

使用背景

Nacos配置更新和Apollo的配置更新,我们可以看到长轮询(长连接)的身影。长连接的实现可以节约系统资源,长连接可以在连接建立后持续通信,避免频繁地建立和断开连接,减少系统开销。使用长连接可以保证连接的实时性,及时推送服务实例的上下线情况和配置信息,保证客户端能够及时地感知到变化。

Nacos主要是通过AsyncContext 来实现长连接的。而Apollo是通过DeferredResult来实现的。DeferredResultSpringMvc对于AsyncContext的封装,都是SpringMvc异步请求的实现方式。

AsyncContext Demo

    @RequestMapping("/t1")
    public void t1(HttpServletRequest request) {
        AsyncContext asyncContext = request.startAsync();
        asyncContext.start(new TestTask(asyncContext));
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                System.out.println("onComplete...");
            }

            @Override
            public void onTimeout(AsyncEvent event) throws IOException {
                System.out.println("onTimeout...");
            }

            @Override
            public void onError(AsyncEvent event) throws IOException {
                System.out.println("onError...");
            }

            @Override
            public void onStartAsync(AsyncEvent event) throws IOException {
                System.out.println("onStartAsync...");
            }
        });
    }

    public static class TestTask implements Runnable {
        private AsyncContext asyncContext;

        public TestTask(AsyncContext asyncContext) {
            this.asyncContext = asyncContext;
        }

        @SneakyThrows
        @Override
        public void run() {
            Thread.sleep(6000);
            asyncContext.getResponse().getWriter().write("hello world");
            asyncContext.complete();
        }
    }

AsyncContext 源码解析

开启异步调用

Request#startAsync(ServletRequest,ServletResponse),异步调用的开启。设置延时时长,默认30s。

    @Override
    public AsyncContext startAsync(ServletRequest request,
            ServletResponse response) {
        if (!isAsyncSupported()) {
            IllegalStateException ise =
                    new IllegalStateException(sm.getString("request.asyncNotSupported"));
            log.warn(sm.getString("coyoteRequest.noAsync",
                    StringUtils.join(getNonAsyncClassNames())), ise);
            throw ise;
        }

        if (asyncContext == null) {
            asyncContext = new AsyncContextImpl(this);
        }

        asyncContext.setStarted(getContext(), request, response,
                request==getRequest() && response==getResponse().getResponse());
        asyncContext.setTimeout(getConnector().getAsyncTimeout());

        return asyncContext;
    }

AsyncContextImpl#setStarted,执行action。并且执行监听器的fireOnStartAsync方法。

    public void setStarted(Context context, ServletRequest request,
            ServletResponse response, boolean originalRequestResponse) {

        synchronized (asyncContextLock) {
            this.request.getCoyoteRequest().action(
                    ActionCode.ASYNC_START, this);

            this.context = context;
            context.incrementInProgressAsyncCount();
            this.servletRequest = request;
            this.servletResponse = response;
            this.hasOriginalRequestAndResponse = originalRequestResponse;
            this.event = new AsyncEvent(this, request, response);

            List<AsyncListenerWrapper> listenersCopy = new ArrayList<>(listeners);
            listeners.clear();
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("asyncContextImpl.fireOnStartAsync"));
            }
            for (AsyncListenerWrapper listener : listenersCopy) {
                try {
                    listener.fireOnStartAsync(event);
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.warn(sm.getString("asyncContextImpl.onStartAsyncError",
                            listener.getClass().getName()), t);
                }
            }
        }
    }

Request#action,执行对应的请求标识。hook对象是Http11Processor

    public void action(ActionCode actionCode, Object param) {
        if (hook != null) {
            if (param == null) {
                hook.action(actionCode, this);
            } else {
                hook.action(actionCode, param);
            }
        }
    }

AbstractProcessor#action,根据状态值执行对应的方法。

    @Override
    public final void action(ActionCode actionCode, Object param) {
        switch (actionCode) {
        // 'Normal' servlet support
        case COMMIT: {
            if (!response.isCommitted()) {
                try {
                    // Validate and write response headers
                    prepareResponse();
                } catch (IOException e) {
                    handleIOException(e);
                }
            }
            break;
        }
        // Servlet 3.0 asynchronous support
        case ASYNC_START: {
            asyncStateMachine.asyncStart((AsyncContextCallback) param);
            break;
        }
        case ASYNC_COMPLETE: {
            clearDispatches();
            if (asyncStateMachine.asyncComplete()) {
                processSocketEvent(SocketEvent.OPEN_READ, true);
            }
            break;
        }
        case ASYNC_DISPATCH: {
            if (asyncStateMachine.asyncDispatch()) {
                processSocketEvent(SocketEvent.OPEN_READ, true);
            }
            break;
        }
        case ASYNC_DISPATCHED: {
            asyncStateMachine.asyncDispatched();
            break;
        }
        }
    }

AsyncStateMachine#asyncStart,状态机执行开启请求。

    synchronized void asyncStart(AsyncContextCallback asyncCtxt) {
        if (state == AsyncState.DISPATCHED) {
            generation.incrementAndGet();
            updateState(AsyncState.STARTING);
            // Note: In this instance, caller is responsible for calling
            // asyncCtxt.incrementInProgressAsyncCount() as that allows simpler
            // error handling.
            this.asyncCtxt = asyncCtxt;
            lastAsyncStart = System.currentTimeMillis();
        } else {
            throw new IllegalStateException(
                    sm.getString("asyncStateMachine.invalidAsyncState",
                            "asyncStart()", state));
        }
    }

运行异步

AsyncContextImpl#start,执行ASYNC_RUN的请求。

    @Override
    public void start(final Runnable run) {
        if (log.isDebugEnabled()) {
            logDebug("start      ");
        }
        check();
        Runnable wrapper = new RunnableWrapper(run, context, this.request.getCoyoteRequest());
        this.request.getCoyoteRequest().action(ActionCode.ASYNC_RUN, wrapper);
    }

AsyncStateMachine#asyncRun,调用内部的线程池来执行。一般建议引入业务线程池来执行,不影响服务器的处理效率。

    synchronized void asyncRun(Runnable runnable) {
        if (state == AsyncState.STARTING || state ==  AsyncState.STARTED ||
                state == AsyncState.READ_WRITE_OP) {
            // Execute the runnable using a container thread from the
            // Connector's thread pool. Use a wrapper to prevent a memory leak
            ClassLoader oldCL;
            if (Constants.IS_SECURITY_ENABLED) {
                PrivilegedAction<ClassLoader> pa = new PrivilegedGetTccl();
                oldCL = AccessController.doPrivileged(pa);
            } else {
                oldCL = Thread.currentThread().getContextClassLoader();
            }
            try {
                if (Constants.IS_SECURITY_ENABLED) {
                    PrivilegedAction<Void> pa = new PrivilegedSetTccl(
                            this.getClass().getClassLoader());
                    AccessController.doPrivileged(pa);
                } else {
                    Thread.currentThread().setContextClassLoader(
                            this.getClass().getClassLoader());
                }

                processor.execute(runnable);
            } finally {
                if (Constants.IS_SECURITY_ENABLED) {
                    PrivilegedAction<Void> pa = new PrivilegedSetTccl(
                            oldCL);
                    AccessController.doPrivileged(pa);
                } else {
                    Thread.currentThread().setContextClassLoader(oldCL);
                }
            }
        } else {
            throw new IllegalStateException(
                    sm.getString("asyncStateMachine.invalidAsyncState",
                            "asyncRun()", state));
        }

    }

完成异步

AsyncContextImpl#complete

    public void complete() {
        if (log.isDebugEnabled()) {
            logDebug("complete   ");
        }
        check();
        request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);
    }

AbstractProcessor#processSocketEvent

    protected void processSocketEvent(SocketEvent event, boolean dispatch) {
        SocketWrapperBase<?> socketWrapper = getSocketWrapper();
        if (socketWrapper != null) {
            socketWrapper.processSocket(event, dispatch);
        }
    }

AbstractEndpoint#processSocket,调用线程池处理Socket

    public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            SocketProcessorBase<S> sc = null;
            if (processorCache != null) {
                sc = processorCache.pop();
            }
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            getLog().error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

DeferredResult Demo

    @RequestMapping("/t3")
    public DeferredResult<String> t3() {
        final DeferredResult<String> dr = new DeferredResult<>();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            try {
                Thread.sleep(6000);
                dr.setResult("成功");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        return dr;
    }

DeferredResult源码解析

处理请求

DispatcherServlet#doDispatch,获取WebAsyncManager

    protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {
        HttpServletRequest processedRequest = request;
        HandlerExecutionChain mappedHandler = null;
        boolean multipartRequestParsed = false;
        WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
        // ...
    }

WebAsyncUtils#getAsyncManager(ServletRequest),如果不存在就创建一个WebAsyncManager

	public static WebAsyncManager getAsyncManager(ServletRequest servletRequest) {
		WebAsyncManager asyncManager = null;
		Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE);
		if (asyncManagerAttr instanceof WebAsyncManager) {
			asyncManager = (WebAsyncManager) asyncManagerAttr;
		}
		if (asyncManager == null) {
			asyncManager = new WebAsyncManager();
			servletRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager);
		}
		return asyncManager;
	}

RequestMappingHandlerAdapter#handleInternal,处理请求。进一步的封装WebAsyncManager。第一次处理asyncManager.hasConcurrentResult(),返回false。

	protected ModelAndView handleInternal(HttpServletRequest request,
			HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {

		ModelAndView mav;
		checkRequest(request);

		// Execute invokeHandlerMethod in synchronized block if required.
		if (this.synchronizeOnSession) {
			HttpSession session = request.getSession(false);
			if (session != null) {
				Object mutex = WebUtils.getSessionMutex(session);
				synchronized (mutex) {
					mav = invokeHandlerMethod(request, response, handlerMethod);
				}
			}
			else {
				// No HttpSession available -> no mutex necessary
				mav = invokeHandlerMethod(request, response, handlerMethod);
			}
		}
		else {
			// No synchronization on session demanded at all...
			mav = invokeHandlerMethod(request, response, handlerMethod);
		}

		if (!response.containsHeader(HEADER_CACHE_CONTROL)) {
			if (getSessionAttributesHandler(handlerMethod).hasSessionAttributes()) {
				applyCacheSeconds(response, this.cacheSecondsForSessionAttributeHandlers);
			}
			else {
				prepareResponse(response);
			}
		}

		return mav;
	}


	@Nullable
	protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
			HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {

		ServletWebRequest webRequest = new ServletWebRequest(request, response);
		try {
			WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);
			ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);

			ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod);
			if (this.argumentResolvers != null) {
				invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
			}
			if (this.returnValueHandlers != null) {
				invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
			}
			invocableMethod.setDataBinderFactory(binderFactory);
			invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);

			ModelAndViewContainer mavContainer = new ModelAndViewContainer();
			mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
			modelFactory.initModel(webRequest, mavContainer, invocableMethod);
			mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);

			AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
			asyncWebRequest.setTimeout(this.asyncRequestTimeout);

			WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
			asyncManager.setTaskExecutor(this.taskExecutor);
			asyncManager.setAsyncWebRequest(asyncWebRequest);
			asyncManager.registerCallableInterceptors(this.callableInterceptors);
			asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);

			if (asyncManager.hasConcurrentResult()) {
				Object result = asyncManager.getConcurrentResult();
				mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
				asyncManager.clearConcurrentResult();
				LogFormatUtils.traceDebug(logger, traceOn -> {
					String formatted = LogFormatUtils.formatValue(result, !traceOn);
					return "Resume with async result [" + formatted + "]";
				});
				invocableMethod = invocableMethod.wrapConcurrentResult(result);
			}

			invocableMethod.invokeAndHandle(webRequest, mavContainer);
			if (asyncManager.isConcurrentHandlingStarted()) {
				return null;
			}

			return getModelAndView(mavContainer, modelFactory, webRequest);
		}
		finally {
			webRequest.requestCompleted();
		}
	}

WebAsyncManager#hasConcurrentResult,判断结果是否等于RESULT_NONE。默认就是RESULT_NONE

	private volatile Object concurrentResult = RESULT_NONE;
	
	public boolean hasConcurrentResult() {
		return (this.concurrentResult != RESULT_NONE);
	}

ServletInvocableHandlerMethod#invokeAndHandle,如果返回值是DeferredResult,会调用DeferredResultMethodReturnValueHandler来处理返回值。

	public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer,
			Object... providedArgs) throws Exception {

		Object returnValue = invokeForRequest(webRequest, mavContainer, providedArgs);
		setResponseStatus(webRequest);

		if (returnValue == null) {
			if (isRequestNotModified(webRequest) || getResponseStatus() != null || mavContainer.isRequestHandled()) {
				disableContentCachingIfNecessary(webRequest);
				mavContainer.setRequestHandled(true);
				return;
			}
		}
		else if (StringUtils.hasText(getResponseStatusReason())) {
			mavContainer.setRequestHandled(true);
			return;
		}

		mavContainer.setRequestHandled(false);
		Assert.state(this.returnValueHandlers != null, "No return value handlers");
		try {
			this.returnValueHandlers.handleReturnValue(
					returnValue, getReturnValueType(returnValue), mavContainer, webRequest);
		}
		catch (Exception ex) {
			if (logger.isTraceEnabled()) {
				logger.trace(formatErrorForReturnValue(returnValue), ex);
			}
			throw ex;
		}
	}

返回值处理器

DeferredResultMethodReturnValueHandler#handleReturnValue,如果返回值是ListenableFuture或者CompletionStage,进行转换成DeferredResult

	@Override
	public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
			ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

		if (returnValue == null) {
			mavContainer.setRequestHandled(true);
			return;
		}

		DeferredResult<?> result;

		if (returnValue instanceof DeferredResult) {
			result = (DeferredResult<?>) returnValue;
		}
		else if (returnValue instanceof ListenableFuture) {
			result = adaptListenableFuture((ListenableFuture<?>) returnValue);
		}
		else if (returnValue instanceof CompletionStage) {
			result = adaptCompletionStage((CompletionStage<?>) returnValue);
		}
		else {
			// Should not happen...
			throw new IllegalStateException("Unexpected return value type: " + returnValue);
		}

		WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
	}

WebAsyncManager#startDeferredResultProcessing,设置异常处理器,超时时间,超时处理器

	public void startDeferredResultProcessing(
			final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {

		Assert.notNull(deferredResult, "DeferredResult must not be null");
		Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");

		Long timeout = deferredResult.getTimeoutValue();
		if (timeout != null) {
			this.asyncWebRequest.setTimeout(timeout);
		}

		List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>();
		interceptors.add(deferredResult.getInterceptor());
		interceptors.addAll(this.deferredResultInterceptors.values());
		interceptors.add(timeoutDeferredResultInterceptor);

		final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);

		this.asyncWebRequest.addTimeoutHandler(() -> {
			try {
				interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
			}
			catch (Throwable ex) {
				setConcurrentResultAndDispatch(ex);
			}
		});

		this.asyncWebRequest.addErrorHandler(ex -> {
			if (!this.errorHandlingInProgress) {
				try {
					if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) {
						return;
					}
					deferredResult.setErrorResult(ex);
				}
				catch (Throwable interceptorEx) {
					setConcurrentResultAndDispatch(interceptorEx);
				}
			}
		});

		this.asyncWebRequest.addCompletionHandler(()
				-> interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult));

		interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult);
		startAsyncProcessing(processingContext);

		try {
			interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
			deferredResult.setResultHandler(result -> {
				result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
				setConcurrentResultAndDispatch(result);
			});
		}
		catch (Throwable ex) {
			setConcurrentResultAndDispatch(ex);
		}
	}

	private void startAsyncProcessing(Object[] processingContext) {
		synchronized (WebAsyncManager.this) {
			this.concurrentResult = RESULT_NONE;
			this.concurrentResultContext = processingContext;
			this.errorHandlingInProgress = false;
		}
		this.asyncWebRequest.startAsync();

		if (logger.isDebugEnabled()) {
			logger.debug("Started async request");
		}
	}

StandardServletAsyncWebRequest#startAsync,调用的是tomcat的异步请求实现。

	@Override
	public void startAsync() {
		Assert.state(getRequest().isAsyncSupported(),
				"Async support must be enabled on a servlet and for all filters involved " +
				"in async request processing. This is done in Java code using the Servlet API " +
				"or by adding \"<async-supported>true</async-supported>\" to servlet and " +
				"filter declarations in web.xml.");
		Assert.state(!isAsyncComplete(), "Async processing has already completed");

		if (isAsyncStarted()) {
			return;
		}
		this.asyncContext = getRequest().startAsync(getRequest(), getResponse());
		this.asyncContext.addListener(this);
		if (this.timeout != null) {
			this.asyncContext.setTimeout(this.timeout);
		}
	}

异步设置结果

DeferredResult#setResult,设置结果集。

	public boolean setResult(T result) {
		return setResultInternal(result);
	}

	private boolean setResultInternal(Object result) {
		// Immediate expiration check outside of the result lock
		if (isSetOrExpired()) {
			return false;
		}
		DeferredResultHandler resultHandlerToUse;
		synchronized (this) {
			// Got the lock in the meantime: double-check expiration status
			if (isSetOrExpired()) {
				return false;
			}
			// At this point, we got a new result to process
			this.result = result;
			resultHandlerToUse = this.resultHandler;
			if (resultHandlerToUse == null) {
				// No result handler set yet -> let the setResultHandler implementation
				// pick up the result object and invoke the result handler for it.
				return true;
			}
			this.resultHandler = null;
		}
		resultHandlerToUse.handleResult(result);
		return true;
	}

WebAsyncManager#startDeferredResultProcessing有设置过结果处理器。

	public void startDeferredResultProcessing(
			final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {

		// ...

		try {
			interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
			deferredResult.setResultHandler(result -> {
				result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
				setConcurrentResultAndDispatch(result);
			});
		}
		catch (Throwable ex) {
			setConcurrentResultAndDispatch(ex);
		}
	}

WebAsyncManager#setConcurrentResultAndDispatch,执行this.asyncWebRequest.dispatch()。重新设置结果,concurrentResult = result;

	private void setConcurrentResultAndDispatch(Object result) {
		synchronized (WebAsyncManager.this) {
			if (this.concurrentResult != RESULT_NONE) {
				return;
			}
			this.concurrentResult = result;
			this.errorHandlingInProgress = (result instanceof Throwable);
		}

		if (this.asyncWebRequest.isAsyncComplete()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Async result set but request already complete: " + formatRequestUri());
			}
			return;
		}

		if (logger.isDebugEnabled()) {
			boolean isError = result instanceof Throwable;
			logger.debug("Async " + (isError ? "error" : "result set") + ", dispatch to " + formatRequestUri());
		}
		this.asyncWebRequest.dispatch();
	}

AsyncContextImpl#dispatch(String),进行转发请求。使用AsyncRunnable重新发起请求。

    @Override
    public void dispatch(String path) {
        check();
        dispatch(getRequest().getServletContext(), path);
    }

    @Override
    public void dispatch(ServletContext servletContext, String path) {
        synchronized (asyncContextLock) {
            if (log.isDebugEnabled()) {
                logDebug("dispatch   ");
            }
            check();
            if (dispatch != null) {
                throw new IllegalStateException(
                        sm.getString("asyncContextImpl.dispatchingStarted"));
            }
            if (request.getAttribute(ASYNC_REQUEST_URI)==null) {
                request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI());
                request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath());
                request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath());
                request.setAttribute(ASYNC_PATH_INFO, request.getPathInfo());
                request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString());
            }
            final RequestDispatcher requestDispatcher = servletContext.getRequestDispatcher(path);
            if (!(requestDispatcher instanceof AsyncDispatcher)) {
                throw new UnsupportedOperationException(
                        sm.getString("asyncContextImpl.noAsyncDispatcher"));
            }
            final AsyncDispatcher applicationDispatcher =
                    (AsyncDispatcher) requestDispatcher;
            final ServletRequest servletRequest = getRequest();
            final ServletResponse servletResponse = getResponse();
            this.dispatch = new AsyncRunnable(
                    request, applicationDispatcher, servletRequest, servletResponse);
            this.request.getCoyoteRequest().action(ActionCode.ASYNC_DISPATCH, null);
            clearServletRequestResponse();
        }
    }

二次处理

RequestMappingHandlerAdapter#invokeHandlerMethod,核心在于asyncManager.hasConcurrentResult()判断为true,invocableMethod = invocableMethod.wrapConcurrentResult(result);,处理方法被替换成新方法。

	protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
			HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {

		ServletWebRequest webRequest = new ServletWebRequest(request, response);
		try {
			WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);
			ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);

			ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod);
			if (this.argumentResolvers != null) {
				invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
			}
			if (this.returnValueHandlers != null) {
				invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
			}
			invocableMethod.setDataBinderFactory(binderFactory);
			invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);

			ModelAndViewContainer mavContainer = new ModelAndViewContainer();
			mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
			modelFactory.initModel(webRequest, mavContainer, invocableMethod);
			mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);

			AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
			asyncWebRequest.setTimeout(this.asyncRequestTimeout);

			WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
			asyncManager.setTaskExecutor(this.taskExecutor);
			asyncManager.setAsyncWebRequest(asyncWebRequest);
			asyncManager.registerCallableInterceptors(this.callableInterceptors);
			asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);

			if (asyncManager.hasConcurrentResult()) {
				Object result = asyncManager.getConcurrentResult();
				mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
				asyncManager.clearConcurrentResult();
				LogFormatUtils.traceDebug(logger, traceOn -> {
					String formatted = LogFormatUtils.formatValue(result, !traceOn);
					return "Resume with async result [" + formatted + "]";
				});
				invocableMethod = invocableMethod.wrapConcurrentResult(result);
			}

			invocableMethod.invokeAndHandle(webRequest, mavContainer);
			if (asyncManager.isConcurrentHandlingStarted()) {
				return null;
			}

			return getModelAndView(mavContainer, modelFactory, webRequest);
		}
		finally {
			webRequest.requestCompleted();
		}
	}

ServletInvocableHandlerMethod#wrapConcurrentResult,封装

	ServletInvocableHandlerMethod wrapConcurrentResult(Object result) {
		return new ConcurrentResultHandlerMethod(result, new ConcurrentResultMethodParameter(result));
	}

ServletInvocableHandlerMethod.ConcurrentResultHandlerMethod,封装了Callable的处理器。

		private static final Method CALLABLE_METHOD = ClassUtils.getMethod(Callable.class, "call");


		public ConcurrentResultHandlerMethod(final Object result, ConcurrentResultMethodParameter returnType) {
			super((Callable<Object>) () -> {
				if (result instanceof Exception) {
					throw (Exception) result;
				}
				else if (result instanceof Throwable) {
					throw new NestedServletException("Async processing failed", (Throwable) result);
				}
				return result;
			}, CALLABLE_METHOD);

			if (ServletInvocableHandlerMethod.this.returnValueHandlers != null) {
				setHandlerMethodReturnValueHandlers(ServletInvocableHandlerMethod.this.returnValueHandlers);
			}
			this.returnType = returnType;
		}

ServletInvocableHandlerMethod#invokeAndHandle,获取返回值。

	public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer,
			Object... providedArgs) throws Exception {

		Object returnValue = invokeForRequest(webRequest, mavContainer, providedArgs);
		setResponseStatus(webRequest);

		if (returnValue == null) {
			if (isRequestNotModified(webRequest) || getResponseStatus() != null || mavContainer.isRequestHandled()) {
				disableContentCachingIfNecessary(webRequest);
				mavContainer.setRequestHandled(true);
				return;
			}
		}
		else if (StringUtils.hasText(getResponseStatusReason())) {
			mavContainer.setRequestHandled(true);
			return;
		}

		mavContainer.setRequestHandled(false);
		Assert.state(this.returnValueHandlers != null, "No return value handlers");
		try {
			this.returnValueHandlers.handleReturnValue(
					returnValue, getReturnValueType(returnValue), mavContainer, webRequest);
		}
		catch (Exception ex) {
			if (logger.isTraceEnabled()) {
				logger.trace(formatErrorForReturnValue(returnValue), ex);
			}
			throw ex;
		}
	}

InvocableHandlerMethod#invokeForRequest,就会调用Callable方法,获取结果。

	public Object invokeForRequest(NativeWebRequest request, @Nullable ModelAndViewContainer mavContainer,
			Object... providedArgs) throws Exception {

		Object[] args = getMethodArgumentValues(request, mavContainer, providedArgs);
		if (logger.isTraceEnabled()) {
			logger.trace("Arguments: " + Arrays.toString(args));
		}
		return doInvoke(args);
	}

	@Nullable
	protected Object doInvoke(Object... args) throws Exception {
		Method method = getBridgedMethod();
		try {
			if (KotlinDetector.isSuspendingFunction(method)) {
				return CoroutinesUtils.invokeSuspendingFunction(method, getBean(), args);
			}
			return method.invoke(getBean(), args);
		}
		catch (IllegalArgumentException ex) {
			assertTargetBean(method, getBean(), args);
			String text = (ex.getMessage() != null ? ex.getMessage() : "Illegal argument");
			throw new IllegalStateException(formatInvokeError(text, args), ex);
		}
		catch (InvocationTargetException ex) {
			// Unwrap for HandlerExceptionResolvers ...
			Throwable targetException = ex.getTargetException();
			if (targetException instanceof RuntimeException) {
				throw (RuntimeException) targetException;
			}
			else if (targetException instanceof Error) {
				throw (Error) targetException;
			}
			else if (targetException instanceof Exception) {
				throw (Exception) targetException;
			}
			else {
				throw new IllegalStateException(formatInvokeError("Invocation failure", args), targetException);
			}
		}
	}

总结一下

  • 控制器返回DeferredResult,返回值处理器会使用DeferredResultMethodReturnValueHandler,开启异步处理。
  • 另一个线程处理结束后会执行setResult方法,并进行请求转发。
  • 创建Callable对象,反射调用获取返回值。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/582007.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LAMP的运用

LAMP的运用 一、LAMP二、编译安装apache http服务三、编译安装mysqld服务四、编译安装PHP解析环境五、安装论坛 一、LAMP LAMP架构是目前成熟的企业网站应用模式之一&#xff0c;指的是协同工作的一整套系统和相关软件&#xff0c;能够提供动态Web站点服务及其应用开发环境。L…

《痞子衡嵌入式半月刊》 第 77 期

痞子衡嵌入式半月刊&#xff1a; 第 77 期 这里分享嵌入式领域有用有趣的项目/工具以及一些热点新闻&#xff0c;农历年分二十四节气&#xff0c;希望在每个交节之日准时发布一期。 本期刊是开源项目(GitHub: JayHeng/pzh-mcu-bi-weekly)&#xff0c;欢迎提交 issue&#xff0c…

【JavaSE】Java基础语法(二十八):HashSet集合

文章目录 1. HashSet集合概述和特点2. HashSet集合的基本应用3. 哈希值4. HashSet集合存储学生对象并遍历【应用】 1. HashSet集合概述和特点 底层数据结构是哈希表存取无序不可以存储重复元素没有索引,不能使用普通for循环遍历 2. HashSet集合的基本应用 存储字符串并遍历 …

Pytorch深度学习之神经网络入门详解

目录 Pytorch 入门 1.将每个图片的label作为txt文件写入另外一个文件夹&#xff08;txt文件名与图片文件名相同&#xff09; 2.tensorboard的summary writer 3.torchvision中的transforms 4.DataLoader 5.神经网络-卷积层Conv2d 6.最大池化层 7.非线性激活函数Relu 9.…

微信的大动作,很多人要颤抖了

4月25日&#xff0c;微信团队发布关于微信公众号营销内容合规规范通知&#xff0c;要求公众号在投放商业广告时需要标注广告字样。 刚开始觉得也没啥&#xff0c;无非就是加个广告的字样&#xff0c;让消费者可以及时识别出来&#xff0c; 但从效果来看&#xff0c;似乎效果并不…

华为OD机试真题B卷 Java 实现【猜密码】

一、题目描述 小杨申请了一个保密柜,但是他忘记了密码。只记得密码都是数字,而且所有数字都是不重复的。 请你根据他记住的数字范围和密码的最小数字数量,帮他算下有哪些可能的组合,规则如下: 输出的组合都是从可选的数字范围中选取的,且不能重复;输出的密码数字要按照…

意外的坚持,意外的收获!

前言&#xff1a; 转眼间&#xff0c;2023就快过了一半&#xff0c;回忆间感觉跟过完年没多久一样&#xff1b;时间是真的过的快...... 简单总结一下最近&#xff1a; 一、锻炼身体&#xff1a; 最近这段时间开始恢复锻炼身体&#xff0c;现在感觉一天下班回来&#xff0c;身体…

Java 21 新特性和改进

Java 21 是 Java 17 之后的下一个 LTS 版本。虚拟线程在 Java 21 中将成为正式功能。可以预期的是&#xff0c;Java 21 会成为一个很流行的 Java 版本。 Java 21 将在 2023 年 9 月 19 日发布。目前 Java 21 包含的内容已经基本确定了。下面来梳理一下 Java 21 中会包含的内容。…

【AI提示】ChatGPT提示工程课程(吴恩达OpenAI)转换文本(中文chatgpt版)

设置 翻译 通用翻译器 语调变换 格式转换 拼写检查/语法检查。 转换 在本笔记中&#xff0c;我们将探索如何使用大型语言模型进行文本转换任务&#xff0c;例如语言翻译、拼写和语法检查、语气调整和格式转换。 设置 import openai import osfrom dotenv import load_dotenv, f…

Maven初级

Maven初级 Maven简介 传统项目管理状态分析 jar包不统一&#xff0c;jar包不兼容工程升级维护过程操作繁琐 Maven是什么 Maven的本质是一个项目管理工具&#xff0c;将项目开发和管理过程抽象成一个项目对象模型&#xff08;POM&#xff09;POM&#xff1a;项目对象模型 Ma…

一图看懂 itsdangerous 模块:将受信任的数据传递到不受信任的环境的帮助工具,资料整理+笔记(大全)

本文由 大侠(AhcaoZhu)原创&#xff0c;转载请声明。 链接: https://blog.csdn.net/Ahcao2008 一图看懂 itsdangerous 模块&#xff1a;将受信任的数据传递到不受信任的环境的帮助工具&#xff0c;资料整理笔记&#xff08;大全&#xff09; &#x1f9ca;摘要&#x1f9ca;模块…

Ae:跟踪摄像机

在时间轴面板上选择要跟踪的素材图层&#xff0c;在跟踪器面板中单击跟踪摄像机 Track Camera按钮之后&#xff0c;会向素材图层添加“3D 摄像机跟踪器” 3D Camera Tracker效果&#xff0c;并立即对视频画面逐帧分析以反求原始摄像机运动。 还有其它几种添加 3D 摄像机跟踪器效…

GeForce RTX 3060 Ti+cuda 11.6+Anaconda3搭建Pytorch深度学习环境

室友新购入一个笔记本&#xff0c;笔记本的显卡是GeForce RTX 3060 Ti&#xff0c;记录一下使用GeForce RTX 3060 Ticuda 11.6Anaconda3搭建Pytorch深度学习环境。 安装很简单&#xff0c;当你有了Python环境时记住一个核心命令即可&#xff0c;显卡驱动因该在之前已经安装过了…

ChatGPT 与我合力开发 xargin blog archive 插件:曹大博客的新奇探险

之前写的批量删除 chatGPT 对话的插件[1]&#xff0c;最近我收到了一个五星好评&#xff1a; 虽然不赚钱&#xff0c;交个朋友嘛&#xff0c;还是挺高兴的。而且借助 chatGPT&#xff0c;我是在与全世界的用户交流&#xff0c;想想就激动。 最近我发现自己让 chatGPT 帮忙写前端…

jQuery其他方法及插件使用

1. 拷贝对象 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-width,…

跨域 + 四种解决办法

目录 同源策略 解决方案 1、jsonp 2、前端代理 3、后端解决 4、Nginx代理 同源策略 说到跨域就不得不提到同源策略&#xff0c;什么是同源策略&#xff1f; 请求的时候拥有相同的 协议 域名 端口 只要有一个不同就属于跨域 解决方案 解决跨域问题一共有四种方式&#xff…

chatgpt赋能python:Python中的Join方法详解:简化拼接字符串的操作

Python中的Join方法详解&#xff1a;简化拼接字符串的操作 在Python编程中&#xff0c;拼接字符串是一项常见的操作。在过去&#xff0c;我们通常使用“”符号来连接多个字符串。但是&#xff0c;这种方法不是很高效&#xff0c;特别是当需要拼接大量字符串时。这时候&#xf…

基于html+css的图展示98

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

python+django+vue学生在线选课系统93pul

随着信息化时代的到来&#xff0c;网络系统都趋向于智能化、系统化&#xff0c;选课系统也不例外&#xff0c;但目前国内的有些学校仍都使用人工管理&#xff0c;学校规模越来越大&#xff0c;同时信息量也越来越庞大&#xff0c;人工管理显然已无法应对时代的变化&#xff0c;…

初识Spring -- Spring入门保姆级教程(一)

文章目录 前言一、Spring是什么&#xff1f;1.概述2.了解spring家族3.spring系统概述4.spring优点5.spring学习路线 二、入门spring1.核心概念2.IOC入门案例3.DI入门案例4.bean的配置5.spring 中 bean的实例化--构造方法6.bean的实例化 -- 静态工厂实例化7.bean实例化--实例工厂…