在一些场景下,线程会被频繁创建和销毁,但他们却始终在完成相似的任务
这个场景下我们回去引入一个线程池的概念
可以简单总结为:
任务提交 → 核心线程执行 → 任务队列缓存 → 非核心线程执行 → 拒绝策略处理。
话不多说先看一个简单的线程池代码
通过 ThreadPoolExecutor
自定义(推荐)
先创建一个线程池
int corePoolSize = 5; // 核心线程数
int maxPoolSize = 10; // 最大线程数
long keepAliveTime = 60; // 线程空闲时间
TimeUnit unit = TimeUnit.SECONDS; // 时间单位
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100); // 任务队列
ThreadFactory threadFactory = Executors.defaultThreadFactory(); // 线程工厂
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); // 拒绝策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler
);
提交线程池任务
// 提交Runnable任务
executor.execute(() -> {
System.out.println("Task executed by " + Thread.currentThread().getName());
});
// 提交Callable任务(获取返回值)
Future<String> future = executor.submit(() -> {
return "Result from Callable";
});
try {
String result = future.get(); // 阻塞等待结果
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
但是在springboot的项目中我们一般不会像上面这样做,我们会像下面这样去做
这是一个项目结构
src/main/java
└── com.example.demo
├── AdminApplication.java (启动类)
├── config
│ └── ExecutorConfig.java (线程池配置)
├── controller
│ └── AsyncController.java
└── service
└── AsyncService.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication(scanBasePackages = {"com.example"})
@EnableAsync(proxyTargetClass = true) // 启用异步支持
public class AdminApplication {
public static void main(String[] args) {
SpringApplication.run(AdminApplication.class, args);
}
}
-
YAML 配置文件 (
application.yml
)
async:
executor:
thread:
core_pool_size: 5 # 核心线程数
max_pool_size: 5 # 最大线程数
queue_capacity: 99999 # 队列容量
name:
prefix: 异步执行线程池 # 线程名称前缀
-
线程池配置类
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync // 启用异步支持
public class ExecutorConfig {
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix(namePrefix + "-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
-
在 Service 层方法使用
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
public class AsyncService {
// 使用指定线程池执行异步方法
@Async("asyncServiceExecutor") // 必须指定配置的线程池名称
public void asyncMethod() {
System.out.println("异步执行开始 - 线程名:" + Thread.currentThread().getName());
try {
// 模拟耗时操作
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("异步执行结束");
}
}
-
在 Controller 中调用异步方法
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class AsyncController {
private final AsyncService asyncService;
public AsyncController(AsyncService asyncService) {
this.asyncService = asyncService;
}
@GetMapping("/execute-async")
public String executeAsync() {
System.out.println("主线程开始处理请求 - 线程名:" + Thread.currentThread().getName());
asyncService.asyncMethod(); // 触发异步执行
System.out.println("主线程继续处理其他任务");
return "异步任务已提交";
}
}
这样我们就会去异步执行这个线程池的任务
但是线程池内部是怎么执行的呢
任务提交 → 核心线程执行 → 任务队列缓存 → 非核心线程执行 → 拒绝策略处理。
第一步,线程池通过 submit()
提交任务。
ExecutorService threadPool = Executors.newFixedThreadPool(5);threadPool.submit(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "办理业务");});
第二步,线程池会先创建核心线程来执行任务。
if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) { return; }}
第三步,如果核心线程都在忙,任务会被放入任务队列中。
workQueue.offer(task);
第四步,如果任务队列已满,且当前线程数量小于最大线程数,线程池会创建新的线程来处理任务。
if (!addWorker(command, false))
第五步,如果线程池中的线程数量已经达到最大线程数,且任务队列已满,线程池会执行拒绝策略。
handler.rejectedExecution(command, this);
线程池的参数
线程池有 7 个参数,需要重点关注的有核心线程数、最大线程数、等待队列、拒绝策略。
一、核心线程数(corePoolSize)
作用
-
线程池常驻线程数量:即使空闲也不会被销毁(除非设置
allowCoreThreadTimeOut
) -
任务处理的基础保障
设置建议
场景类型 | 推荐值 | 原理说明 |
CPU密集型 | CPU核数 + 1 | 避免线程切换开销 |
IO密集型 | CPU核数 × 2 | 利用等待IO的时间处理其他任务 |
混合型 | 根据任务比例动态调整 | 需监控CPU和IO等待时间 |
代码示例(Spring Boot)
async: executor: core-pool-size: 10 # CPU核数为4时,IO密集型设为8
二、最大线程数(maxPoolSize)
作用
-
线程池扩容上限:应对突发流量高峰
-
与核心线程数的差值决定弹性能力
设置建议
场景 | 计算公式 | 典型值 |
秒杀场景 | maxPoolSize = corePoolSize × 2 | 50 → 100 |
普通Web服务 | maxPoolSize = corePoolSize + 50 | 20 → 70 |
批处理任务 | maxPoolSize = 总任务数 / 单线程耗时 | 按需计算 |
注意事项
-
内存限制:每个线程占用约 1MB 栈空间,100线程需 100MB
-
JVM限制:通过
-Xss
调整栈大小(如-Xss256k
)
三、等待队列(workQueue)
类型对比与选型
队列类型 | 特性 | 适用场景 |
无界队列 | 容量无限(LinkedBlockingQueue) | 任务量可控,防止OOM |
有界队列 | 容量固定(ArrayBlockingQueue) | 严格控制内存使用 |
同步移交队列 | 容量0(SynchronousQueue) | 实时处理,拒绝策略配合使用 |
①、ArrayBlockingQueue:一个有界的先进先出的阻塞队列,底层是一个数组,适合固定大小的线程池。
②、LinkedBlockingQueue:底层是链表,如果不指定大小,默认大小是 Integer.MAX_VALUE,几乎相当于一个无界队列。
③、PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。任务按照其自然顺序或 Comparator 来排序。
适用于需要按照给定优先级处理任务的场景,比如优先处理紧急任务。
④、DelayQueue:类似于 PriorityBlockingQueue,由二叉堆实现的无界优先级阻塞队列。
Executors 中的 newScheduledThreadPool()
就使用了 DelayQueue 来实现延迟执行。
⑤、SynchronousQueue:每个插入操作必须等待另一个线程的移除操作,同样,任何一个移除操作都必须等待另一个线程的插入操作。
Executors.newCachedThreadPool()
就使用了 SynchronousQueue,这个线程池会根据需要创建新线程,如果有空闲线程则会重复使用,线程空闲 60 秒后会被回收。
四、拒绝策略(RejectedExecutionHandler)
策略对比与选择
策略 | 行为 | 使用场景 |
AbortPolicy | 抛出异常(默认) | 需要快速失败场景 |
CallerRunsPolicy | 提交线程执行任务 | 控制提交速率,平滑削峰 |
DiscardPolicy | 直接丢弃任务 | 允许任务丢失(日志等场景) |
DiscardOldestPolicy | 丢弃最旧任务后重试 | 实时性要求高场景 |
配置实践
// 自定义拒绝策略(记录日志并丢弃)
executor.setRejectedExecutionHandler(runnable -> {
log.warn("Task rejected: {}", runnable);
});
// Spring Boot 配置示例
@Bean
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
五、参数联动配置方案
场景驱动配置模板
场景 | corePoolSize | maxPoolSize | queueCapacity | 拒绝策略 |
电商秒杀 | 50 | 100 | 100 | CallerRunsPolicy |
微服务API网关 | 20 | 50 | 1000 | AbortPolicy |
数据清洗服务 | 10 | 20 | 无界队列 | DiscardOldestPolicy |
后台批处理 | 4 | 8 | 500 | DiscardPolicy |