目录
线程池是什么
标准库中的线程池
描述线程池工作原理
为什么不推荐使用系统自带的线程池
实现线程池
线程池是什么
线程池就是在池子里放的线程本身,当程序启动时就创建出若干个线程,如果有任务就处理,没有任务就阻塞等待。
想象这么一个场景:在学校附近新开了一家快递店,老板很精明,想到一个与众不同的办法来经营。店里没有雇人,而是每次有业务来了,就现场找一名同学过来把快递送了,然后解雇同学。这个类比我们平时来一个任务,起一个线程进行处理的模式。很快老板发现问题来了,每次招聘 + 解雇同学的成本还是非常高的。老板还是很善于变通的,知道了为什么大家都要雇人了,所以指定了一个指标,公司业务人员会扩张到 3 个人,但还是随着业务逐步雇人。于是再有业务来了,老板就看,如果现在公司还没 3 个人,就雇一个人去送快递,否则只是把业务放到一个本本上,等着 3 个快递人员空闲的时候去处理。这个就是我们要带出的线程池的模式。
线程池最大的好处就是减少每次启动、销毁线程的损耗。
标准库中的线程池
Executors
创建线程池的几种方式
// 1. 用来处理大量短时间工作任务的线程池,如果池中没有可用的线程将创建新的线程,如果线程空闲60秒将收回并移出缓存
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 2. 创建一个操作无界队列且固定大小线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
// 3. 创建一个操作无界队列且只有一个工作线程的线程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 4. 创建一个单线程执行器,可以在给定时间后执行或定期执行。
ScheduledExecutorService singleThreadScheduledExecutor =Executors.newSingleThreadScheduledExecutor();
// 5. 创建一个指定大小的线程池,可以在给定时间后执行或定期执行。
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
// 6. 创建一个指定大小(不传入参数,为当前机器CPU核心数)的线程池,并行地处理任务,不保证处理顺序
Executors.newWorkStealingPool();
Executors
本质上是
ThreadPoolExecutor
类的封装
.
ThreadPoolExecutor
提供了更多的可选参数
,
可以进一步细化线程池行为的设定
.
- int corePoolSize:核心线程数,创建线程池包含的最小线程数量。
- int maximumPoolSize:最大线程数,也可以叫做临时线程数。
- long keepAliveTime:临时线程空闲时长。
- TimeUnit unit:空闲时间单位,和keepAliveTime一起使用。
- BlockingQueue<Runnable> workQueue:用来保存任务的阻塞队列。
- RejectedExecutionHandler handler:拒绝策略,触发的时机:当线程池处理不了过多的任务时。
描述线程池工作原理
- 用核心线程去执行任务;
- 多于任务会被保存到阻塞队列中;
- 当阻塞队列满了之后,就会创建临时线程;
- 当阻塞队列满了而且临时线程也创建完成,之后再提交的线程就会执行拒绝策略;
- 当任务被成功处理之后,临时线程经过一段空闲时间将会被系统收回。
拒绝策略:
为什么不推荐使用系统自带的线程池
实现线程池
核心操作为
submit,
将任务加入线程池中
public class MyThreadPool {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(3);
public MyThreadPool(int n){
if (n <= 0){
throw new RuntimeException("线程数量不能小于0.");
}
for (int i = 0; i < n; i++) {
Thread thread = new Thread(() -> {
while(true){
try {
Runnable test = queue.take();
test.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
}
}
public void submit(Runnable run) throws InterruptedException {
queue.put(run);
}
}
测试线程池
public class MyThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
// 创建自定义的线程池
MyThreadPool threadPool = new MyThreadPool(3);
// 往线程池中提交任务
for (int i = 0; i < 10; i++) {
int taskId = i;
threadPool.submit(() -> {
System.out.println("我是任务 " + taskId + ", " + Thread.currentThread().getName());
});
}
// 模拟等待任务
TimeUnit.SECONDS.sleep(3);
System.out.println("第二阶段开始");
// 提交任务到线程池
for (int i = 10; i < 20; i++) {
int taskId = i;
threadPool.submit(() -> {
System.out.println("我是任务 " + taskId + ", " + Thread.currentThread().getName());
});
}
}
}