前言
1、JUC是指有关 java.util.concurrent包以及其子包,这些包都是有关线程操作的包
2、HTTPS服务请求中,WEB服务只负责创建主线程来接收外部的HTTPS请求,如果不做任何处理,默认业务逻辑是通过主线程来做的,如果业务执行时间较长且用户访问量较大的情况,WEB服务在单位时间内所能处理的用户请求就会有限,JUC并发编程的核心就是如何来释放主线成以及通过子线程来批量执行任务
3、JUC并发编程并不能提高执行任务所耗费的时间,但是可以极大的提高WEB容器的吞吐量
案例(模拟下单)
- 订单实体类
@Data
public class CreateOrderDto {
/** 订单id */
private String orderId;
/** 库存id */
private String stockId;
/** 积分id */
private String itegralId;
/** 耗时,单位毫秒 */
private String time;
}
- Service代码
@Service
public class OrderTaskService {
public CreateOrderDto create(CreateOrderDto createOrderDto) {
createOrderDto.setOrderId("OrderTaskService:" + Thread.currentThread().getName());
System.out.println("OrderTaskService:"+ JSONObject.toJSONString(createOrderDto));
return createOrderDto;
}
}
@Service
public class StockTaskService {
public CreateOrderDto operate(CreateOrderDto createOrderDto) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
createOrderDto.setStockId("StockFutureTaskService:" + Thread.currentThread().getName());
System.out.println("StockFutureTaskService:"+JSONObject.toJSONString(createOrderDto));
return createOrderDto;
}
}
@Service
public class ItegralTaskService {
public CreateOrderDto operate(CreateOrderDto createOrderDto) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
createOrderDto.setItegralId("ItegralFutureTaskService:" + Thread.currentThread().getName());
System.out.println("ItegralFutureTaskService:"+JSONObject.toJSONString(createOrderDto));
return createOrderDto;
}
}
同步处理
- 创建订单
@GetMapping("createOrderCommon")
public CreateOrderDto createOrder() {
long start = System.currentTimeMillis();
// 本地服务 - 创建订单
CreateOrderDto createOrderDto = new CreateOrderDto();
createOrderDto = orderTaskService.create(createOrderDto);
// 三方服务 - 处理库存
createOrderDto = stockTaskService.operate(createOrderDto);
// 三方服务 - 处理积分
createOrderDto = itegralTaskService.operate(createOrderDto);
long end = System.currentTimeMillis();
createOrderDto.setTime("createOrderCommon任务耗时:" + String.valueOf(end - start));
System.out.println("createOrderCommon主线程处理耗时:" + Thread.currentThread().getName() + " " + createOrderDto.getTime() + "毫秒");
return createOrderDto;
}
- 日志
OrderTaskService:{"orderId":"OrderTaskService:http-nio-0.0.0.0-8801-exec-1"}
StockFutureTaskService:{"orderId":"OrderTaskService:http-nio-0.0.0.0-8801-exec-1","stockId":"StockFutureTaskService:http-nio-0.0.0.0-8801-exec-1"}
ItegralFutureTaskService:{"itegralId":"ItegralFutureTaskService:http-nio-0.0.0.0-8801-exec-1","orderId":"OrderTaskService:http-nio-0.0.0.0-8801-exec-1","stockId":"StockFutureTaskService:http-nio-0.0.0.0-8801-exec-1"}
createOrderCommon主线程处理耗时:http-nio-0.0.0.0-8801-exec-1 createOrderCommon任务耗时:1074毫秒
异步处理(FutureTask)
- 创建订单
@GetMapping("createOrderFutureTask")
public CreateOrderDto createOrderFutureTask() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
CreateOrderDto createOrderDto = new CreateOrderDto();
// 本地服务 - 创建订单
createOrderDto = orderTaskService.create(createOrderDto);
// 三方服务 - 处理库存 TODO【异步】
CreateOrderDto finalCreateOrderDto = createOrderDto;
Callable<CreateOrderDto> callableForStock = new Callable<CreateOrderDto>() {
@Override
public CreateOrderDto call() throws Exception {
return stockTaskService.operate(finalCreateOrderDto);
}
};
// 三方服务 - 处理积分 TODO【异步】
CreateOrderDto finalCreateOrderDto1 = createOrderDto;
Callable<CreateOrderDto> callableForItegral = new Callable<CreateOrderDto>() {
@Override
public CreateOrderDto call() throws Exception {
return itegralTaskService.operate(finalCreateOrderDto1);
}
};
FutureTask<CreateOrderDto> futureForStock = new FutureTask<>(callableForStock);
new Thread(futureForStock).start();
FutureTask<CreateOrderDto> futureForItegral = new FutureTask<>(callableForItegral);
new Thread(futureForItegral).start();
createOrderDto.setStockId(futureForStock.get().getStockId()); //TODO 阻塞式获取执行结果,所以会占用web容器的主线程
createOrderDto.setItegralId(futureForStock.get().getItegralId()); //TODO 阻塞式获取执行结果,所以会占用web容器的主线程
long end = System.currentTimeMillis();
createOrderDto.setTime("createOrderFutureTask任务耗时:" + String.valueOf(end - start));
System.out.println("createOrderFutureTask主线程处理耗时:" + Thread.currentThread().getName() + " " + createOrderDto.getTime() + "毫秒");
return createOrderDto;
}
- 日志
OrderTaskService:{"orderId":"OrderTaskService:http-nio-0.0.0.0-8801-exec-2"}
StockFutureTaskService:{"itegralId":"ItegralFutureTaskService:Thread-16","orderId":"OrderTaskService:http-nio-0.0.0.0-8801-exec-2","stockId":"StockFutureTaskService:Thread-15"}
ItegralFutureTaskService:{"itegralId":"ItegralFutureTaskService:Thread-16","orderId":"OrderTaskService:http-nio-0.0.0.0-8801-exec-2","stockId":"StockFutureTaskService:Thread-15"}
createOrderFutureTask主线程处理耗时:http-nio-0.0.0.0-8801-exec-2 createOrderFutureTask任务耗时:517毫秒
分析1:通过FutureTask实现多线程并发处理时,如果WEB容器主线程fork出来的子线程没有返回执行结果,那么主线程是会一直阻塞,直到子线程返回结果
使用Callable对FutureTask优化
Asynchronous Requests
DeferredResult和Callable都是为了异步生成返回值提供基本支持。一个请求进来,在没有得到返回数据之前,DispatcherServlet和所有Filter就会退出Servlet容器线程,但响应保持打开状态,一旦返回数据有了,DispatcherServlet就会被再次调用并且处理,以异步产生的方式向请求端响应值。这么做的好处就是请求不会长时间占用服务连接池,提高服务器的吞吐量
- 创建订单
@GetMapping("createOrderCallable")
public Callable<CreateOrderDto> createOrderCallable() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
Callable<CreateOrderDto> callable = new Callable<CreateOrderDto>() {
@Override
public CreateOrderDto call() throws Exception {
return createOrderFutureTask();
}
};
long end = System.currentTimeMillis();
System.out.println("createOrderCallable主线程处理耗时:" + Thread.currentThread().getName() + " " +(end - start) + "毫秒");
return callable;
}
- 日志
createOrderCallable主线程处理耗时:http-nio-0.0.0.0-8801-exec-7 0毫秒
OrderTaskService:{"orderId":"OrderTaskService:MvcAsync2"}
StockFutureTaskService:{"itegralId":"ItegralFutureTaskService:Thread-20","orderId":"OrderTaskService:MvcAsync2","stockId":"StockFutureTaskService:Thread-19"}
ItegralFutureTaskService:{"itegralId":"ItegralFutureTaskService:Thread-20","orderId":"OrderTaskService:MvcAsync2","stockId":"StockFutureTaskService:Thread-19"}
createOrderFutureTask主线程处理耗时:MvcAsync2 createOrderFutureTask任务耗时:501毫秒
异步处理(CompletableFuture)
- 自定义线程池对象并纳入容器管理
/**
* @描述: TODO 自定义线程池来处理异步任务。在方法上添加:@Async("api-asyn-taskExecutor")
* @作者: lixing lixing_java@163.com
* @日期 2020/2/19 21:18
*/
@Component
@Configuration
public class ThreadPoolTaskExecutorConfig {
@Bean("api-asyn-taskExecutor")
public Executor taskExecutorPool() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("自定义异步线程池"); // 线程名字前缀
threadPoolTaskExecutor.setCorePoolSize(5); // 核心线程数
threadPoolTaskExecutor.setQueueCapacity(10); // 任务队列容量(缓存队列)
threadPoolTaskExecutor.setMaxPoolSize(10); //线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
threadPoolTaskExecutor.setKeepAliveSeconds(120); // 允许空闲时间,当超过了核心线程数之外的线程在空闲时间到达之后会被销毁
threadPoolTaskExecutor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() ); // 任务拒绝策略
// 关闭线程池时是否等待当前调度任务完成后才开始关闭
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown( true ); // 等待
threadPoolTaskExecutor.setAwaitTerminationSeconds( 60 ); // 等待时长
threadPoolTaskExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 异常统一处理......发送邮件、发送短信
}
});
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
- 创建订单
/** 注入自定义线程池 */
@Resource
private ThreadPoolTaskExecutorConfig threadPoolTaskExecutorConfig;
@GetMapping("createOrderCompletableFuture")
public CreateOrderDto createOrderCompletableFuture() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
CreateOrderDto createOrderDto = new CreateOrderDto();
/**
* 异步创建订单
*/
CompletableFuture<CreateOrderDto> supplyAsyncForOrder = CompletableFuture.supplyAsync(() -> {
return orderTaskService.create(createOrderDto);
}, threadPoolTaskExecutorConfig.taskExecutorPool());
/**
* 订单创建成功后,异步处理库存和积分
*/
CompletableFuture<CreateOrderDto> thenApplyForOrder = supplyAsyncForOrder.thenApply(createOrderDtoTemp -> {
CompletableFuture<CreateOrderDto> itegral = CompletableFuture.supplyAsync(() -> {
return itegralTaskService.operate(createOrderDtoTemp);
}, threadPoolTaskExecutorConfig.taskExecutorPool());
CompletableFuture<CreateOrderDto> stock = CompletableFuture.supplyAsync(() -> {
return stockTaskService.operate(createOrderDtoTemp);
}, threadPoolTaskExecutorConfig.taskExecutorPool());
return createOrderDtoTemp;
});
long end = System.currentTimeMillis();
System.out.println("createOrderCompletableFuture主线程处理耗时:" + Thread.currentThread().getName() + " " +(end - start) + "毫秒");
return thenApplyForOrder.get();
}
- 日志
createOrderCompletableFuture主线程处理耗时:http-nio-0.0.0.0-8801-exec-2 0毫秒
OrderTaskService:{"orderId":"OrderTaskService:自定义异步线程池4"}
StockFutureTaskService:{"itegralId":"ItegralFutureTaskService:自定义异步线程池5","orderId":"OrderTaskService:自定义异步线程池4","stockId":"StockFutureTaskService:自定义异步线程池4"}
ItegralFutureTaskService:{"itegralId":"ItegralFutureTaskService:自定义异步线程池5","orderId":"OrderTaskService:自定义异步线程池4","stockId":"StockFutureTaskService:自定义异步线程池4"}
异步非阻塞编程(CompletableFuture)
前言
CompletableFuture 是 java.util.concurrent 库在java 8中新增的主要工具
同传统的Future相比:
1、支持流式计算、函数式编程、完成通知、自定义异常处理等新特性。能够主动设置计算的结果值(主动终结计算过程,即completable),从而在某些场景下主动结束阻塞等待
2、而Future由于不能主动设置计算结果值,一旦调用get()进行阻塞等待,要么当计算结果产生,要么超时,才会返回
创建 CompletableFuture 对象
构造函数
/**
* 通过构造函数创建
*/
@GetMapping("createWayForConstructor")
public String createForConstructor() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture();
completableFuture.complete("通过构造函数创建(内部没有包含计算结果),所以需要通过complete来终止计算,否则会一直阻塞在这里");
return completableFuture.get();
}
supplyAsync(有返回值)和 runAsync(无返回值)
- 通过supplyAsync创建
@Resource
private ThreadPoolTaskExecutorConfig threadPoolTaskExecutorConfig;
@GetMapping("createWayForSupplyAsync")
public String createWayForSupplyAsync() throws ExecutionException, InterruptedException {
long startForMaster = System.currentTimeMillis();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->{
try {
long startForSalve = System.currentTimeMillis();
Thread.sleep(2000L);
long endForSalve = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + " 耗时 " + (endForSalve - startForSalve));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "supplyAsync创建CompletableFuture";
}, threadPoolTaskExecutorConfig.taskExecutorPool());
long endForMaster = System.currentTimeMillis();
System.out.println("主线程 " + Thread.currentThread().getName() + " 耗时 " + (endForMaster - startForMaster));
return completableFuture.get();
}
主线程 http-nio-0.0.0.0-8801-exec-1 耗时 4
自定义异步线程池1 耗时 2013
- 通过runAsync创建
@GetMapping("createWayForRunAsync")
public void createWayForRunAsync() throws ExecutionException, InterruptedException {
long startForMaster = System.currentTimeMillis();
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
long startForSalve = System.currentTimeMillis();
try {
Thread.sleep(2000L);
long endForSalve = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + " 耗时 " + (endForSalve - startForSalve));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, threadPoolTaskExecutorConfig.taskExecutorPool());
long endForMaster = System.currentTimeMillis();
System.out.println("主线程 " + Thread.currentThread().getName() + " 耗时 " + (endForMaster - startForMaster));
}
主线程 http-nio-0.0.0.0-8801-exec-3 耗时 0
自定义异步线程池2 耗时 2003
thenApply / thenAccept / thenRun
thenApply 提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果
thenAccept 提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果
thenRun 提交的任务类型需遵从Runnable签名,即没有入参也没有返回值
thenApplyAsync、thenAcceptAsync、thenRunAsync:
带Async后缀的函数表示需要连接的后置任务会被单独提交到线程池中,从而相对前置任务来说是异步运行的
- thenApply
@GetMapping("then")
public void then() throws ExecutionException, InterruptedException {
long startForMaster = System.currentTimeMillis();
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(MessageFormat.format("completableFuture1 = {0}", Thread.currentThread().getName()));
return "completableFuture1";
}, threadPoolTaskExecutorConfig.taskExecutorPool());
CompletableFuture<String> completableFuture2 = completableFuture1.thenApply((threadName) -> {
System.out.println(MessageFormat.format("completableFuture2 = {0}", Thread.currentThread().getName()));
return "completableFuture2";
});
long endForMaster = System.currentTimeMillis();
System.out.println("主线程 " + Thread.currentThread().getName() + " 耗时 " + (endForMaster - startForMaster));
}
主线程 http-nio-0.0.0.0-8801-exec-3 耗时 0
completableFuture1 = 自定义异步线程池2
completableFuture2 = 自定义异步线程池2
thenApply / thenAccept / thenRun 连接的2个任务是有前后依赖的,当且仅当前置任务计算完成时才会开始后置任务的计算
- thenApplyAsync
@GetMapping("then")
public void then() throws ExecutionException, InterruptedException {
long startForMaster = System.currentTimeMillis();
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(MessageFormat.format("completableFuture1 = {0}", Thread.currentThread().getName()));
return "completableFuture1";
}, threadPoolTaskExecutorConfig.taskExecutorPool());
CompletableFuture<String> completableFuture2 = completableFuture1.thenApplyAsync((threadName) -> {
System.out.println(MessageFormat.format("completableFuture2 = {0}", Thread.currentThread().getName()));
return "completableFuture2";
}, threadPoolTaskExecutorConfig.taskExecutorPool());
long endForMaster = System.currentTimeMillis();
System.out.println("主线程 " + Thread.currentThread().getName() + " 耗时 " + (endForMaster - startForMaster));
}
主线程 http-nio-0.0.0.0-8801-exec-4 耗时 0
completableFuture1 = 自定义异步线程池3
completableFuture2 = 自定义异步线程池4
thenCompose
thenCompose适用于两个任务之间有前后依赖关系,但是连接任务又是独立的CompletableFuture
thenComposeAsync:带Async后缀的函数表示需要连接的后置任务会被单独提交到线程池中,从而相对前置任务来说是异步运行的
- thenCompose
@GetMapping("then")
public String then() throws ExecutionException, InterruptedException {
long startForMaster = System.currentTimeMillis();
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(MessageFormat.format("completableFuture1 = {0}", Thread.currentThread().getName()));
return "completableFuture1";
}, threadPoolTaskExecutorConfig.taskExecutorPool());
CompletableFuture<String> completableFuture2 = completableFuture1.thenCompose((completableFuture1_r) ->
CompletableFuture.supplyAsync(() -> {
System.out.println(MessageFormat.format("completableFuture2 = {0}", Thread.currentThread().getName()));
return completableFuture1_r + ",completableFuture2";
}, threadPoolTaskExecutorConfig.taskExecutorPool())
);
long endForMaster = System.currentTimeMillis();
System.out.println("主线程 " + Thread.currentThread().getName() + " 耗时 " + (endForMaster - startForMaster));
return completableFuture2.get();
}
主线程 http-nio-0.0.0.0-8801-exec-1 耗时 0
completableFuture1 = 自定义异步线程池3
completableFuture2 = 自定义异步线程池4
thenCombine
thenCombineAsync:带Async后缀的函数表示需要连接的后置任务会被单独提交到线程池中,从而相对前置任务来说是异步运行的
- thenCombine
@GetMapping("then")
public String then() throws ExecutionException, InterruptedException {
long startForMaster = System.currentTimeMillis();
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(MessageFormat.format("completableFuture1 = {0}", Thread.currentThread().getName()));
return "completableFuture1";
}, threadPoolTaskExecutorConfig.taskExecutorPool());
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(MessageFormat.format("completableFuture2 = {0}", Thread.currentThread().getName()));
return "completableFuture2";
}, threadPoolTaskExecutorConfig.taskExecutorPool());
CompletableFuture<String> completableFuture3 = completableFuture1.thenCombine(completableFuture2, (r1, r2) -> r1 + " " + r2);
long endForMaster = System.currentTimeMillis();
System.out.println("主线程 " + Thread.currentThread().getName() + " 耗时 " + (endForMaster - startForMaster));
return completableFuture3.get();
}
主线程 http-nio-0.0.0.0-8801-exec-2 耗时 0
completableFuture1 = 自定义异步线程池3
completableFuture2 = 自定义异步线程池4
thenCombine连接的两个任务没有依赖关系,前后是并行执行的,只有当两个任务均完成时才会将其结果同时传递给下游处理任务
whenComplete 和 handle
1、whenComplete用于任务完成时的回调通知,解决了传统future在任务完成时无法主动发起通知的问题。前置任务会将计算结果或者抛出的异常作为入参传递给回调通知函数
2、handle与whenComplete的作用类似,handle接收的处理函数有返回值,而且返回值会影响最终获取的计算结果
whenCompleteAsync:带Async后缀的函数表示需要连接的后置任务会被单独提交到线程池中,从而相对前置任务来说是异步运行的
handleAsync:带Async后缀的函数表示需要连接的后置任务会被单独提交到线程池中,从而相对前置任务来说是异步运行的
- whenComplete
@GetMapping("then")
public String then() throws ExecutionException, InterruptedException {
long startForMaster = System.currentTimeMillis();
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(MessageFormat.format("completableFuture1 = {0}", Thread.currentThread().getName()));
System.out.println(0/0);
return "completableFuture1";
}, threadPoolTaskExecutorConfig.taskExecutorPool());
CompletableFuture<String> completableFuture2 = completableFuture1.whenComplete((r, e) -> {
if(e != null){
System.out.println("操作失败:" + e);
} else {
System.out.println("操作成功:" + r);
}
});
long endForMaster = System.currentTimeMillis();
System.out.println("主线程 " + Thread.currentThread().getName() + " 耗时 " + (endForMaster - startForMaster));
return completableFuture2.get();
}
主线程 http-nio-0.0.0.0-8801-exec-2 耗时 1
completableFuture1 = 自定义异步线程池2
操作失败:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
- handle
@GetMapping("then")
public String then() throws ExecutionException, InterruptedException {
long startForMaster = System.currentTimeMillis();
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(MessageFormat.format("completableFuture1 = {0}", Thread.currentThread().getName()));
System.out.println(0/0);
return "completableFuture1";
}, threadPoolTaskExecutorConfig.taskExecutorPool());
CompletableFuture<String> completableFuture2 = completableFuture1.handle((r, e) -> {
if(e != null){
return e.getMessage();
} else {
return "completableFuture1";
}
});
long endForMaster = System.currentTimeMillis();
System.out.println("主线程 " + Thread.currentThread().getName() + " 耗时 " + (endForMaster - startForMaster));
return completableFuture2.get();
}
主线程 http-nio-0.0.0.0-8801-exec-2 耗时 0
completableFuture1 = 自定义异步线程池2