线程池可以干什么:
- 帮助我们减少线程的创建和销毁
- 提高系统资源的利用率,同时控制并发执行的线程数量
- 便于管理且提高响应速度
线程池的工作流程:
1.创建线程池
线程池的创建是通过 Executors 工厂方法或直接使用 ThreadPoolExecutor
构造函数来完成
使用 Executors 创建线程池的⼏种⽅式
- newFixedThreadPool: 创建固定线程数的线程池
- newCachedThreadPool: 创建线程数⽬动态增⻓的线程池.
- newSingleThreadExecutor: 创建只包含单个线程的线程池
- newScheduledThreadPool: 设定 延迟时间后执⾏命令,或者定期执⾏命令. 是进阶版的 Timer. Executors 本质上是 ThreadPoolExecutor 类的封装.
直接使用ThreadPoolExecutor
构造函数来完成,代码的大致效果就是下面这段代码:
//使用 ThreadPoolExecutor 构造函数
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,//核心线程数
maximumPoolSize,//最大线程数
keepAliveTime,//非核心线程数在空闲时存在的最长时间
unit,//设置时间单位
workQueue,//任务队列
threadFactory,//线程工厂,可以自定义线程的创建过程,包括设置线程名称、优先级等(不是必须的)
RejectedExecutionHandler///拒绝策略
);
2.提交任务
一旦线程池创建完成,就可以向线程池提交任务。提交任务通常通过 ExecutorService
接口的 execute(Runnable command)
方法或使用 Lambda 表达式来完成。提交的任务会被封装成 Runnable
或 Callable
对象。
使用 Lambda 表达式提交任务:
ExecutorService service = Executors.newFixedThreadPool(4);//创建一个固定大小为4的线程池
service.execute(()->{
System.out.println("你需要提交的任务");
});
使用 Runnable 来提交任务:
ExecutorService service = Executors.newFixedThreadPool(4);//创建一个固定大小为4的线程池
service.execute(new Runnable() {
@Override
public void run() {
System.out.println("你需要提交的任务");
}
});
3.检查当前线程数
- 如果正在运行的线程数小于corePoolSize (核心线程数)则创建新的线程来执行任务,即使当前线程池中有空闲线程
- 如果正在运行的线程数等于corePoolSize 且存在空闲线程,则使用空闲线程来执行任务
- 如果正在运行的线程数等于corePoolSize 但是没有空闲线程,则将任务加入任务队列中等待执行
4.检查任务队列
- 如果任务队列没有满,则将新提交的任务加入到队列中等待执行
- 如果队列已满,则根据当前线程数与 maximumPoolSize (最大线程数) 来判断:
- 如果当前线程数小于maximumPoolSize,则创建新的线程来执行任务
- 如果当前线程数等于maximumPoolSize 且任务队列已满,则触发拒绝策略
- 如果当前线程数等于maximumPoolSize 但是任务队列没满,则将新的任务放入任务队列等待执行
5.执行拒绝策略
如果线程池无法接受更多的任务(即线程数量达到最大值且任务队列已满),则会根据RejectedExecutionHandler
实现来处理无法执行的任务
AbortPolicy
: 抛出RejectedExecutionException
异常。CallerRunsPolicy
: 由调用者线程执行新任务。(当前任务比较重要又不能被丢弃的时候就可以使用这个,但是可能会增加响应时间)DiscardOldestPolicy
: 丢弃队列中最老的任务。DiscardPolicy
: 丢弃新来的任务。
6.空闲线程的管理
我们可以理解成核心线程(corePoolSize)是公司里面的正式员工,非核心线程数(超过corePoolSize的部分)是公司里面的实习生,假设最大空闲时间(KeepAliveTime)为一个月,如果这一个月内正式员工没有任何工作(处于空闲状态),那么这种情况下是不会轻易被裁掉的(销毁),但是如果是实习生连续一个月没有工作,那么这种情况下是会面临被裁掉的风险(销毁)用于节约成本(资源)
如果非核心线程数大于 corePoolSize 且线程空闲时间超过 keepAliveTime ,则线程会被销毁,直到线程数等于 corePoolSize
7.关闭线程池
可以调用 ExecutorService
的 shutdown()
方法来关闭线程池。这将阻止新的任务提交,但允许正在执行的任务完成。如果需要立即终止所有任务并关闭线程池,可以使用 shutdownNow()
方法。
代码实现线程池
使用 Executors 类工厂方法
newSingleThreadExecutor()
作用:
- 创建一个单线程化的
ExecutorService
。 - 保证所有任务按照提交顺序执行,一次只执行一个任务。
默认配置:
corePoolSize
: 1maximumPoolSize
: 1keepAliveTime
: 无意义(因为只有一个线程,且总是活跃的)workQueue
:LinkedBlockingQueue
(无界队列)threadFactory
:Executors.defaultThreadFactory()
RejectedExecutionHandler
:AbortPolicy()
代码示例:
public static void main(String[] args) {
// 创建一个单线程的线程池
ExecutorService service = Executors.newSingleThreadExecutor();
//提交任务到线程池
for(int i = 0;i<5;i++){
final int id = i;
service.submit(()->{
System.out.println(Thread.currentThread().getName()+"正在执行任务"+id);
});
}
}
运行结果:
newFixedThreadPool(int nThreads)
作用:
- 创建一个固定大小的线程池。
- 线程池中的线程数量是固定的,一旦创建,线程池中的线程数量不会改变。
- 适用于处理大量短期任务,且希望限制线程数量的情况。
默认配置:
corePoolSize
:nThreads
maximumPoolSize
:nThreads
keepAliveTime
: 无意义(因为线程池大小固定)workQueue
:LinkedBlockingQueue
(无界队列)threadFactory
:Executors.defaultThreadFactory()
RejectedExecutionHandler
:AbortPolicy()
代码示例:
public static void main(String[] args) {
//创建一个固定大小为6的线程池
ExecutorService service = Executors.newFixedThreadPool(6);
//提交任务到线程池
for(int i = 0;i<10;i++){
final int id = i;
service.submit(()->{
System.out.println(Thread.currentThread().getName()+"正在执行任务"+id);
});
}
}
运行结果:
newCachedThreadPool()
作用:
- 创建一个可以根据需要创建新线程的线程池。
- 线程池会根据需要创建新线程,但会在任务完成后销毁空闲线程。
- 适用于执行很多短期异步任务的场合。
默认配置:
corePoolSize
: 0maximumPoolSize
:Integer.MAX_VALUE
keepAliveTime
: 60秒workQueue
:SynchronousQueue
(一个特殊的无界队列,队列本身不存储元素,每个插入操作必须等待另一个线程的移除操作)threadFactory
:Executors.defaultThreadFactory()
RejectedExecutionHandler
:AbortPolicy()
代码示例:
// 创建一个可缓存的线程池
ExecutorService service = Executors.newCachedThreadPool();
for(int i = 0;i<10;i++){
final int id = i;
service.submit(()->{
System.out.println(Thread.currentThread().getName()+"正在执行任务"+id);
});
}
运行结果:
newScheduledThreadPool(int corePoolSize)
作用:
- 创建一个可以安排定期或延迟任务执行的线程池。
- 线程池中的线程数量是固定的,可以根据需要扩展到
corePoolSize
的数量。 - 适用于需要定期执行任务的场景。
默认配置:
corePoolSize
:corePoolSize
maximumPoolSize
:Integer.MAX_VALUE
keepAliveTime
: 无意义(因为线程池大小固定)workQueue
:DelayedWorkQueue
(一个特殊类型的队列,用于存放延迟任务)threadFactory
:Executors.defaultThreadFactory()
RejectedExecutionHandler
:AbortPolicy()
代码示例:
public static void main(String[] args) {
//核心线程数为3
ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
//给定的任务在指定延迟后执行一次
service.schedule(()->{
System.out.println("任务1执行");
},5, TimeUnit.SECONDS);
//每隔2秒执行一次,初始延迟0秒
service.scheduleAtFixedRate(()->{
System.out.println("周期性任务开始执行");
},0,2,TimeUnit.SECONDS);
}
运行结果:
使用 ThreadPoolExecutor
构造函数
演示一个简单的
public static void main(String[] args) {
// 创建线程池
ExecutorService executor = new ThreadPoolExecutor(
2, // corePoolSize: 2 个正式员工
5, // maximumPoolSize: 最多5个线程(2个正式员工 + 3个临时工)
60, // keepAliveTime: 临时工空闲60秒后被销毁
TimeUnit.SECONDS, // 时间单位为秒
new ArrayBlockingQueue<>(10), // workQueue: 阻塞队列,最多容纳10个任务
Executors.defaultThreadFactory(), // threadFactory: 使用默认的线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // RejectedExecutionHandler: 由调用者线程执行新任务
);
}
自己实现一个简单的线程池
这么这段代码没有任何过多的考虑,只是单纯的实现了一个线程池
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class MyThreadPool{
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
public void submit(Runnable runnable) throws InterruptedException {
queue.put(runnable);
}
public MyThreadPool(int n){
for(int i = 0;i<n;i++){
Thread t = new Thread(()->{
while(true){
try {
Runnable runnable = queue.take();
runnable.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
}
}
}
public class demo1 {
public static void main(String[] args) throws InterruptedException {
MyThreadPool pool = new MyThreadPool(4);
for(int i = 0;i<1000;i++){
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" hello");
}
});
}
}
}