用户请求流程
问题点
- tomcat 线程资源占满,由于tomcat线程资源有限,每个请求都会经由tomcat线程处理,阻塞至web层处理完才能回收再利用。
- web层分发至后端服务可能会扩大几倍甚至数百倍的,譬如用户发起请求1w/s,到后端时可能就会10w~100w/s,这时后端压力剧增。
使用Servlet3.0异步请求优化tomcat线程使用
用户发起请求打到web层,tomcat从线程池拿出一个线程处理,这线程会调用web应用,web应用在处理请求的过程中,该线程会一直阻塞,web应用处理完毕才能再输出响应,最后才回收该线程。
Servlet 3.0中引入异步Servlet。原理是web应用启用一个子线程,而Tomcat线程立即返回,不再等待Web应用将请求处理完,这样Tomcat线程可以立即被回收到线程池,用来响应其他请求。
- 直接上代码
使用springboot做例子,pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.8</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.cy</groupId>
<artifactId>thread-t</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>thread-t</name>
<description>thread-t</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-tomcat</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-jetty</artifactId>-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
线程池配置
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig {
@Bean
public AsyncTaskExecutor asyncTaskExecutor() {
log.info("init asyncTaskExecutor");
int coreSize = Runtime.getRuntime().availableProcessors() * 2;
int maximumPoolSize = coreSize * 2;
int keepAliveTime = 10;
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(coreSize);
executor.setMaxPoolSize(maximumPoolSize);
executor.setQueueCapacity(100000);
executor.setKeepAliveSeconds(keepAliveTime);
executor.setThreadNamePrefix("async-");
executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
log.info("asyncServiceExecutor params----corePoolSize:{},maxPoolSize:{},keepAliveSeconds:{}" ,
coreSize,maximumPoolSize,keepAliveTime);
return executor;
}
}
异步处理配置
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import javax.annotation.Resource;
@Configuration
public class WebMvcConfiguration implements WebMvcConfigurer {
@Resource
private AsyncTaskExecutor executor;
/**
* An Executor is required to handle java.util.concurrent.Callable return values.
* Please, configure a TaskExecutor in the MVC config under "async support".
* The SimpleAsyncTaskExecutor currently in use is not suitable under load.
* <p>
* 配置SpringMVC的支持
*/
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setTaskExecutor(executor);
}
}
在web层的异步写法,开启带返回结果的子线程来处理,tomcat线程可以立马回收
import com.cy.threadt.entity.MyResponse;
import com.cy.threadt.service.MyService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.Callable;
@RestController
@RequestMapping("/cy/")
public class MyController {
@Resource
private MyService myService;
@GetMapping(value = "/get/{orderId}")
public Callable<MyResponse> query(@PathVariable String orderId){
Callable<MyResponse> callable = new Callable() {
@Override
public Object call() throws Exception {
return myService.query(orderId);
}
};
return callable;
}
}
至此,压力给到后端服务,后端服务可能是各种第三方、远程调用、内部服务调用,那么后端服务该做什么处理?譬如根据id或编号查询,其实可以合并多个查询给到批查询。
使用批量调用方式
请求包装对象,根据orderId查询、将结果赋给CompletableFuture,通过CompletableFuture.get()获取到结果
import lombok.Data;
import java.util.concurrent.CompletableFuture;
@Data
public class MyRequest {
private String id;
private String orderId;
private CompletableFuture<MyResponse> future;
}
import lombok.Data;
import java.math.BigDecimal;
@Data
public class MyResponse {
private String id;
private String orderId;
private BigDecimal money;
}
import com.cy.threadt.entity.MyRequest;
import com.cy.threadt.entity.MyResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
@Service
public class MyService {
private final LinkedBlockingDeque<MyRequest> linkedBlockingDeque = new LinkedBlockingDeque<>(100000);
@PostConstruct
public void doBiz () {
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
threadPool.scheduleAtFixedRate(() -> {
if (linkedBlockingDeque.size() == 0) {
return;
}
List<MyRequest> requests = new ArrayList<>();
for (int i = 0; i < linkedBlockingDeque.size(); i++) {
requests.add(linkedBlockingDeque.poll());
}
batchQuery(requests);
log.info("批查询处理数量{}", requests.size());
}, 100, 50, TimeUnit.MILLISECONDS);
}
public MyResponse query(String orderId) throws ExecutionException, InterruptedException {
MyRequest request = new MyRequest();
request.setOrderId(orderId);
request.setId(UUID.randomUUID().toString());
CompletableFuture<MyResponse> objectCompletableFuture = new CompletableFuture<>();
request.setFuture(objectCompletableFuture);
linkedBlockingDeque.add(request);
return objectCompletableFuture.get();
}
public void batchQuery(List<MyRequest> list){
Map<String, List<MyRequest>> mapRequest = list.stream().collect(Collectors.groupingBy(MyRequest::getOrderId));
List<String> orderIds = list.stream().map(MyRequest::getOrderId).distinct().collect(Collectors.toList());
for (String orderId : orderIds) {
List<MyRequest> myRequests = mapRequest.get(orderId);
BigDecimal money = BigDecimal.valueOf(Math.random());
for (MyRequest myRequest : myRequests) {
MyResponse response = new MyResponse();
response.setOrderId(orderId);
response.setMoney(money);
myRequest.getFuture().complete(response);
}
}
}
}
单元测试,模拟10000并发查询
import com.cy.threadt.entity.MyResponse;
import com.cy.threadt.service.MyService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Slf4j
@SpringBootTest
class ThreadTApplicationTests {
@Resource
private MyService myService;
private final int count = 10000;
private final CountDownLatch countDownLatch = new CountDownLatch(count);
@Test
void contextLoads() throws InterruptedException {
for (int i = 1; i <= count; i++) {
int finalI = i;
new Thread( () -> {
try {
countDownLatch.countDown();
countDownLatch.await();
MyResponse query = myService.query(String.valueOf(finalI));
// log.info("查询{},结果{}", finalI , query);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}).start();
}
TimeUnit.SECONDS.sleep(10);
}
}
10000次查询最终可以缩减至数十次查询
代码地址
代码地址