编程开发里,使用java异步执行方法可以让程序同时处理多个请求业务,提升吞吐量来缩短业务的执行时间,在springboot的程序应用中,提供了@Async注解来实现异步执行方法。
在业务开发中,有些时候是不需要立即返回业务的处理结果,比如物联网设备上报的数据,当设备把数据上报物联网IOT平台后,订阅端通过MQ或者最简单也最常用的HTTP(S)方式订阅物联网数据,订阅端拿到数据后,一般是需要告知数据推送端(发布者)已经收到数据,在这种应用场景下,设备的数据有多种业务,报警策略的业务执行报警通知,入库时序数据库,转发到客户平台,把设备数据传入人工智能AI等。
如果这些操作使用同步的方式,那么处理完业务要花很长一段时间,如果使用的HTPPS方式订阅的,那么极有可能HTTP会话已经超时了,还没有得到返回确认,那数据推送端会认为你订阅端没收到,如果数据推送端有重发策略,那么订阅端可能会多次收到相同的数据消息,这样的话会影响具体的业务。
在springboot里使用@Async只需要两步
1.在主函数上加@EnableAsync 开启异步配置
2.在需要用到异步的地方使用@Async 申明该方法是一个异步任务
package boot.example.async;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* 蚂蚁舞
*/
@SpringBootApplication
@EnableAsync
public class ApplicationAsync {
public static void main( String[] args ) {
SpringApplication.run(ApplicationAsync.class, args);
System.out.println( "Hello Async!" );
}
}
package boot.example.async.controller;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* 蚂蚁舞
*/
@Service
public class BootAsyncService {
@Async
void doAsyncWork(int count){
System.out.println("当前异步方法的计数值:"+count);
}
}
使用http接口来测试异步
package boot.example.async.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping(value="/")
public class BootAsyncController {
@Resource
private BootAsyncService bootAsyncService;
public int count = 0;
@RequestMapping(value="/async")
public int async() {
count++;
bootAsyncService.doAsyncWork(count);
return count;
}
}
启动springboot后在浏览器访问对应的路径便可调用异步Service方法,
在这个方法里可以处理业务逻辑,可以直接插入数据库,可以通过HTTP RestTemplate调用其他的微服务接口,在demo或者一些简单的应用中,这样做,使用起来特别方便,而且还特别流畅,提高了程序处理消息的能力,那如果说并发量不断地上升,异步的线程将会达到很多很多。
实际上使用@Async注解是有一个隐藏的坑,我用的是SpringBoot2.x,开启注解后默认使用的线程池是SimpleAsyncTaskExecutor, 只要方法上有@Async注解(@Async在有些方法上不会执行异步子线程的),当主线程调用这个方法时,会开启一个新的异步子线程去处理业务逻辑,与调用它的主线程便没有了多少关联了,那就是说不断的调用此方法,不断地去创建异步子线程,无休无止的。
如果这个方法的业务出现问题,数据库断了连接,调用其他微服务也是超时的,子线程又在不断地创建,这种情况持续下去,是会造成OOM(Out Of Memory, 内存用完了)问题的。
如此,最终的结果可能是,该服务在线啊,能访问到呢,啥原因呢?为啥我的程序停了?云商更新给我重启了?崩了!
为了尽可能的避免内存用完,使用@Async注解需要用到自定义线程池
package boot.example.async.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class AsyncTaskExecutorBeanConfig {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // 设置核心线程数
executor.setMaxPoolSize(20); // 设置最大线程数
executor.setQueueCapacity(20); // 设置队列容量
executor.setKeepAliveSeconds(12); // 设置线程活跃时间(秒)
executor.setThreadNamePrefix("async-thread-"); // 设置默认线程名称
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 设置拒绝策略
executor.setWaitForTasksToCompleteOnShutdown(true); // 等待所有任务结束后再关闭线程池
executor.setAwaitTerminationSeconds(60);
return executor;
}
}
主要设置说明
executor.setCorePoolSize(10) 线程池创建的核心线程数,线程池维护线程的最少线程量,如果程序没有调用加@Async异步,线程池中的核心线程会一直存活,如果不设置默认是1
另外,这里有一个设置executor.setAllowCoreThreadTimeOut(true) 如果这样设置了,核心线程会超时关闭的,它的默认值是false
executor.setMaxPoolSize(20) 线程池内最大线程的数量,当线程数大于等于核心线程数(corePoolSize),并且任务队列(queueCapacity)满了,线程池会创建线程来处理新的业务,但是当队列满了,而且线程数已经达到了了最大线程数量(maxPoolSize),线程池只得拒绝处理业务了,根据设置的策略来丢弃子线程或者抛出异常,避免了不断创建线程造成内存用完的情况
executor.setQueueCapacity(20) 队列容量,只有当核心线程数(corePoolSize)达到最大时,新业务会放在队列中排队等待执行
executor.setKeepAliveSeconds(12) 线程活跃时间(秒)当线程空闲后,时间达到设置的时间时,线程会自动退出,直到线程数量等于核心线程数(corePoolSize) 如果设置了executor.setAllowCoreThreadTimeOut(true),那么核心线程数也要按照这个设置,超时后退出
executor.setThreadNamePrefix("async-thread-") 设置默认线程名称,这个名称会在日志内打印的,配置了日志的话,可以大概通过日志在某个时间区间内确定线程的数量,线程有没有使用到线程池的最大线程数量,还是说一直在核心线程数内,这样对于程序线上的运行有维护参考价值
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()) 设置拒绝策略,根据具体的策略执行拒绝,有效的避免了线程的不断创建
executor.setWaitForTasksToCompleteOnShutdown(true) 等待所有任务结束后再关闭线程池
executor.setAwaitTerminationSeconds(60) 等待时间,和executor.setWaitForTasksToCompleteOnShutdown(true)一起使用,在这个时间之后强制关闭所有任务了
这里备注下常用策略
.CallerRunsPolicy():交由调用方法的主线程运行,比如从controller接口层进入后的线程,如此这个主线程自己去执行该任务,那么变成了同步执行了。
.AbortPolicy():默认策略,如果线程池队列满了,丢掉后进来的线程任务,不让他们进入队列,同时抛出异常。
.DiscardPolicy():如果线程池队列满了,直接丢掉后进来的任务,不让他们进入队列,不会有任何异常抛出。
.DiscardOldestPolicy():线程队列有多大,那么队列里的线程都是最新的,老的全部丢掉,可以这样理解,两种情况,第一种情况是未进入队列的线程数小于队列中已存在的线程数,那么会丢掉队列中最前进入的的线程任务,使得未进入队列的线程数全部进入线程,第二种情况,未进入队列的线程数很多很多,队列里的线程数一直被丢掉,那么最终只留下了未进入队列线程最后的那些线程。
修改一下bean的配置,将核心线程数和队列还有异步的最大线程数降下来,方便测试
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2); // 设置核心线程数
//executor.setAllowCoreThreadTimeOut(true);
executor.setMaxPoolSize(4); // 设置最大线程数
executor.setQueueCapacity(2); // 设置队列容量
executor.setKeepAliveSeconds(6); // 设置线程活跃时间(秒)
executor.setThreadNamePrefix("async-thread-"); // 设置默认线程名称
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 设置拒绝策略
executor.setWaitForTasksToCompleteOnShutdown(true); // 等待所有任务结束后再关闭线程池
executor.setAwaitTerminationSeconds(60);
return executor;
}
修改service,假设work1异步方法执行要120s,work2异步方法执行要10s ,work3不用异步方法
@Service
public class BootAsyncService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Async
void doAsyncWork1(int count) {
log.info("1-开始-当前方法的计数值:"+count);
try {
// 假设需要120s处理业务
Thread.sleep(120*1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("1-结束-当前方法的计数值:"+count);
}
@Async
void doAsyncWork2(int count) {
log.info("2-开始-当前方法的计数值:"+count);
try {
// 假设需要10s处理业务
Thread.sleep(10*1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("2-结束-当前方法的计数值:"+count);
}
//@Async
void doAsyncWork3(int count) {
log.info("3-开始-当前方法的计数值:"+count);
try {
Thread.sleep(120*1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("3-结束-当前方法的计数值:"+count);
}
}
controller
@RestController
@RequestMapping(value="/")
public class BootAsyncController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Resource
private BootAsyncService bootAsyncService;
public int count = 0;
@PostMapping(value="/async1")
public int async1() {
count++;
log.info("1数据消息进入:"+count);
bootAsyncService.doAsyncWork1(count);
return count;
}
@PostMapping(value="/async2")
public int async2() {
count++;
log.info("2数据消息进入:"+count);
bootAsyncService.doAsyncWork2(count);
return count;
}
@PostMapping(value="/async3")
public int async3() {
count++;
log.info("3数据消息进入:"+count);
bootAsyncService.doAsyncWork3(count);
return count;
}
}
为了测试方便,使用了swagger2 + swagger-bootstrap-ui
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>swagger-bootstrap-ui</artifactId>
<version>1.9.2</version>
</dependency>
package boot.example.async.config;
import com.google.common.base.Predicates;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
/**
* 蚂蚁舞
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket createRestApi(){
return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
.apis(RequestHandlerSelectors.any()).paths(PathSelectors.any())
.paths(Predicates.not(PathSelectors.regex("/error.*")))
.paths(PathSelectors.regex("/.*"))
.build().apiInfo(apiInfo());
}
private ApiInfo apiInfo(){
return new ApiInfoBuilder()
.title("SpringBoot Async 异步Demo")
.description("SpringBoot Async 异步Demo")
.version("0.01")
.build();
}
/**
* http://localhost:xxxx/doc.html 地址和端口根据实际项目查看
*/
}
核心线程数 2 队列容量 2 最大线程数4
那么最大能够处理的业务是异步线程最大线程数和队列容量 4+2
如此测试9条线程 1, 2, 3, 4,5, 6, 7, 8, 9
.AbortPolicy()模式
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
访问浏览器进行测试,当该接口调用到第七次,第八次,第九次的时候,就抛出了异常
Executor [java.util.concurrent.ThreadPoolExecutor@2e57ce8f[Running, pool size = 4, active threads = 4, queued tasks = 2, completed tasks = 0]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$352/2047819523@4e39f9bb
从这个段异常信息得知,队列tasks(配置的2)用完了,最大线程数(配置的4)也用完了,那么异步线程不管是队列容量还是已经正在执行业务的异步子线程,都还没有空闲出来,此时,按照AbortPolicy()模式,丢掉后面进来的子线程,并且抛出异常,告诉你子线程用完了,别挤进来了。
再看看控制台
可以看到 外部通过浏览器访问了9次接口,那就是说,产生了9条接口接收执行的主线程,把业务交给@Async申明的异步任务,4个异步线程正在执行 = 自定义线程池最大4条异步线程(图中计数值1,2,5,6),同时队列里有2条线程正在等待执行(图中的3,4),另外还有三条线程只是在接口主线程调用了异步任务,但是抛出异常并且返回异常信息了,没有机会进入异步线程(图中的7,8,9)
分析可知,1,2,5,6,3,4是执行了业务的,7,8,9被抛弃了,抛出异常业务丢失
这里为什么线程5和6先执行业务,3和4反而等待,是因为2条核心线程数正在执行1和2,那么3和4就进入队列里,后面的5和6就线程池增加线程了,增加到最大值,4,那就是只能增加2条线程,刚好5和6去执行
.DiscardPolicy()模式
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
DiscardPolicy()模式和AbortPolicy()模式的区别是DiscardPolicy()模式不抛出异常,其他是一样的
.DiscardOldestPolicy()模式
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); // 设置拒绝策略
访问浏览器进行测试,执行九次,可以看到返回的9
再看看控制台
可以看到 外部通过浏览器访问了9次接口,产生了9条接口接收执行的主线程,把业务交给@Async申明的异步任务,4个异步线程正在执行(图中计数值1,2,5,6),按理来说队列里有2条线程正在等待执行(图中的3,4),但是,7 8 9三条异步线程来了,如此,最老最前的3和4会被丢掉,7和8会进入异步线程队列里,但是队列的容量只有2条,那么7也要被丢弃,如此只有8和9进入队列等待,直到之前的异步线程空闲后,再来执行8和9的线程任务了。
分析可知,执行了业务的线程只有1,2,5,6,8,9 其他的3,4,7被抛弃了,没有执行业务
.CallerRunsPolicy()模式
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 设置拒绝策略
访问浏览器进行测试,执行第七次,http请求接口就卡住保持会话等待返回了,这是我再访问另一个接口(这个接口异步任务的时间稍短,很快就返回8这个数据),再访问一次 async2 这样访问总共9次
控制台输出
可以看到 外部通过浏览器访问了访问2个接口,总共9次访问,产生了9条接口接收执行的主线程,把业务交给@Async申明的异步任务,4个异步线程正在执行(图中计数值1,2,5,6),队列里有2条线程正在等待执行(图中的3,4),这是第七次访问产生了的主线程没有拿到异步的子线程,因此直接使用主线程去执行业务了(demo默认阻塞了120s),这时这个访问放一边,在通过接口2访问,那就是第 8和第 9条,可以从图里看到8和9执行的也是主线程的同步方法(demo默认阻塞时间10s,因此可以看到先结束),然后7通过主线程同步执行完业务返回,异步的1,2, 5, 6执行业务完成后空出的线程给到队列等待的线程 3和4去执行业务。
分析,这种方式,1,2,3,4,5,6,7,8,9共9条线程都进入了业务,其中 7,8,9是异步的,如果说推送端要你订阅端确认返回的话,会话时间太长,可能推送端会以为你订阅端没收到,重新发送数据,那这样业务可能就会重复执行,所以,能够满足异步返回,可以用这种方式,有时候程序就是那么2-4s的事,能接受的
在使用@Async时,有时候和Springboot事件监听一起使用
@Resource
private ApplicationContext applicationContext;
@PostMapping(value="/async4")
public int async4() {
count++;
log.info("4数据消息进入:"+count);
AsyncEvent asyncEvent = new AsyncEvent();
asyncEvent.setCount(count);
applicationContext.publishEvent(asyncEvent);
return count;
}
AsyncEvent
package boot.example.async.controller;
/**
* 蚂蚁舞
*/
public class AsyncEvent {
private int count;
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
}
AsyncListenerService
package boot.example.async.controller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class AsyncListenerService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Async
@EventListener(AsyncEvent.class)
public void doAsyncWork4(AsyncEvent asyncEvent) {
log.info("4-开始-当前方法的计数值:"+asyncEvent.getCount());
try {
// 假设需要120s处理业务
Thread.sleep(120*1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("4-结束-当前方法的计数值:"+asyncEvent.getCount());
}
}