背景:
在java中异步线程很重要,比如在业务流处理时,需要通知硬件设备,发短信通知用户,或者需要上传一些图片资源到其他服务器这种耗时的操作,在主线程里处理会阻塞整理流程,而且我们也不需要等待处理结果之后再进行下一步操作,这时候就可以使用异步线程进行处理,这样主线程不会因为这些耗时的操作而阻塞,保证主线程的流程可以正常进行。
异步编程在对响应时间近乎严苛的今天,受到了越来越多的关注,尤其是在IO密集型业务中。对比传统的同步模式,异步编程可以提高服务器的响应时间和处理业务的能力,从而达到快速给用户响应的效果。
注:博主只是拿三方的接口举例,并不是说只能和三方接口交互才能做异步,具体看你的业务。
正常操作我们需要web发起请求调用 ,等到三方接口返回后然后将结果返给前端应用,但是在某些操作中,如果某一个业务非常耗时,如果一直等其他业务响应后再给前端,那不仅给用户的体验极差,而且可能会出现服务卡死的情况,因此在这里做一下相关线程操作的记录,以供后续参考!
线程的操作,是java中最重要的部分之一,实现线程操作也有很多种方法,这里仅介绍几种常用的。在springboot框架中,可以使用注解简单实现线程的操作,还有AsyncManager的方式,如果需要复杂的线程操作,可以使用线程池实现。下面根据具体方法进行介绍。
一、注解@Async
springboot框架的注解,使用时也有一些限制,这个在网上也有很多介绍,@Async注解不能在类本身直接调用,在springboot框架中,可以使用单独的Service实现异步方法,然后在其他的类中调用该Service中的异步方法即可,具体如下:
1. 添加注解
在springboot的config中添加 @EnableAsync注解,开启异步线程功能
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* MyConfig
*/
@Configuration
@EnableAsync
public class MyConfig {
// 自己配置的Config
}
2. 创建异步方法Service和实现类
使用service实现耗时的方法
Service类:
import com.thcb.execute.service.IExecuteService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
* ExecuteService业务层处理
*/
@Service
public class ExecuteServiceImpl implements IExecuteService {
private static final Logger log = LoggerFactory.getLogger(ExecuteServiceImpl.class);
@Override
public void sleepingTest() {
log.info("SleepingTest start");
try {
Thread.sleep(5000);
} catch (Exception e) {
log.error("SleepingTest:" + e.toString());
}
log.info("SleepingTest end");
}
}
3. 调用异步方法
这里根据Springboot的框架,在controller层调用,并使用log查看是否时异步结果。
controller:
import com.thcb.execute.service.IExecuteService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* TestController
*/
@RestController
public class TestController {
private static final Logger log = LoggerFactory.getLogger(TestController.class);
@Autowired
private IExecuteService executeService;
@RequestMapping("/test")
public String test() {
return "spring boot";
}
@RequestMapping("/executeTask")
public String executeTask() {
log.info("executeTask Start!");
executeService.sleepingTest();
log.info("executeTask End!");
return "executeTask";
}
}
接口直接返回了executeTask,并log出executeTask End!在5秒之后,log打出SleepingTest end,说明使用了异步线程处理了executeService.sleepingTest的方法。
二、线程池
使用线程池可以设定更多的参数,线程池在网上也有很多详细的介绍,在这我只介绍一种,带拒绝策略的线程池。
1. 创建线程池
创建带有拒绝策略的线程池,并设定核心线程数,最大线程数,队列数和超出核心线程数量的线程存活时间:
/**
* 线程池信息: 核心线程数量5,最大数量10,队列大小20,超出核心线程数量的线程存活时间:30秒, 指定拒绝策略的
*/
private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(20), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("有任务被拒绝执行了");
}
});
2. 创建一个耗时的操作类
由于线程池需要传入一个Runnable,所以此类继承Runnable,还是用sleep模拟耗时操作。
/**
* 耗时操作
*/
static class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在执行task " + taskNum);
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task " + taskNum + "执行完毕");
}
}
3. 执行线程池
开启线程池,这里通过一个for循环模拟一下,可以看一下log输出,有兴趣的可以修改一下for循环和sleep的数值,看看线程池具体的操作和拒绝流程。
for (int i = 0; i < 20; i++) {
MyTask myTask = new MyTask(i);
threadPoolExecutor.execute(myTask);
System.out.println("线程池中线程数目:" + threadPoolExecutor.getPoolSize() + ",队列中等待执行的任务数目:" +
threadPoolExecutor.getQueue().size() + ",已执行完别的任务数目:" + threadPoolExecutor.getCompletedTaskCount());
}
threadPoolExecutor.shutdown();
在此写一些线程操作需要注意的地方:
- 线程数量和cpu有关,使用线程时一定要注意线程的释放,否则会导致cpu线程数量耗尽;
- 使用注解完成的线程操作,不可以在自己的类中实现调用,因为注解最后也是通过代理的方式完成异步线程的,最好时在单独的一个service中写;
- 线程池最好单独写,使用static和final修饰,保证所有使用该线程池的地方使用的是一个线程池,而不能每次都new一个线程池出来,每次都new一个就没有意义了。
三、线程池 ExecutorService
1.创建使用的service
@Service
@Slf4j
public class ExecuteServiceImpl implements IExecuteService {
@Override
public String testExecute(JSONObject jsonObject) {
log.info("接口调用开始");
//JSONObject rtnMap = new JSONObject();
//异步执行其他业务
runningSync(rtnMap);
log.info("接口调用结束");
return "成功";
}
public void runningSync(JSONObject rtnMap){
Runnable runnable = new Thread(new Thread() {
@Override
public void run() {
try {
//你的复杂业务
Thread.sleep(5000);
} catch (Exception exception) {
} finally {
}
}
});
ServicesExecutor.getServicesExecutorServiceService().execute(runnable);
}
}
2.创建定义ServicesExecutor
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Slf4j
public class ServicesExecutor {
/* Define an executor of the execution thread for the service */
private static ExecutorService servicesExecutorService;
/* Define an executor of the execution thread for the log */
private static ExecutorService logExecutorService;
private static ScheduledExecutorService scheduledThreadPool;
/* Initialize the thread pool */
static {
initServicesExecutorService();
//initLogExecutorService();
//initScheduledThreadPool();
}
private synchronized static void initServicesExecutorService() {
/**
* Create a thread pool with a fixed number of fixed threads. Each time a task is submitted, a thread is created until the thread reaches the maximum size of the thread pool. The size of the thread pool will remain the same once it reaches its maximum value. If a thread ends because of an exception, the thread pool will be replenished with a new thread.
* Create a fixed-length thread pool that controls the maximum number of concurrent threads, and the excess threads wait in the queue. The size of the fixed-length thread pool is best set according to system resources. Such as "Runtime.getRuntime().availableProcessors()"
*/
servicesExecutorService = null;
servicesExecutorService = Executors.newFixedThreadPool(2);
log.info("Asynchronous synchronization thread pool is initialized...");
}
private synchronized static void initLogExecutorService() {
/**
* Create a thread pool with a fixed number of fixed threads. Each time a task is submitted, a thread is created until the thread reaches the maximum size of the thread pool. The size of the thread pool will remain the same once it reaches its maximum value. If a thread ends because of an exception, the thread pool will be replenished with a new thread.
*/
logExecutorService = null;
logExecutorService = Executors.newFixedThreadPool(4);
log.info("Asynchronous log processing thread pool initialization completed...");
// Create a cacheable thread pool. If the thread pool length exceeds the processing requirements, you can flexibly reclaim idle threads. If there is no reclaimable, create a new thread.
// ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
}
private synchronized static void initScheduledThreadPool() {
/**
* Create a fixed-length thread pool that supports scheduled and periodic task execution
*/
scheduledThreadPool = null;
scheduledThreadPool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 1);
}
/**
* Get an executor instance that is called when the executor is closed or called elsewhere
*
* @return ExecutorService
*/
public static ExecutorService getServicesExecutorServiceService() {
if (null == servicesExecutorService || servicesExecutorService.isShutdown()) {
initServicesExecutorService();
}
return servicesExecutorService;
}
public static ExecutorService getLogExecutorService() {
if (null == logExecutorService || logExecutorService.isShutdown()) {
initLogExecutorService();
}
return logExecutorService;
}
public static ScheduledExecutorService getScheduledServicesExecutorServiceService() {
if (null == scheduledThreadPool || scheduledThreadPool.isShutdown()) {
initScheduledThreadPool();
}
return scheduledThreadPool;
}
}
我使用的是第三种方式,仅参考供!
https://blog.csdn.net/weixin_45565886/article/details/129838403