参考
java常用线程池及它们的适用场景(JDK1.8)
Java线程与线程池实战
线程池的拒绝策略_线程池 RejectedExecutionHandler 拒绝策略
ThreadPoolExecutor原理解析-关闭线程池
代码经验—java获取cpu个数-docker
一、概念
Java 中的线程池核心实现类是 ThreadPoolExecutor,本文基于 JDK 1.8 的源码来分析 Java 线程池的核心设计与实现。
1、核心类
线程池接口继承实现类图,最终使用ThreadPoolExecutor类来使用JDK提供的线程池功能
Executor
ThreadPoolExecutor 实现的顶层接口是 Executor。
Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供 Runnable 对象,将任务的运行逻辑提交到执行器(Executor)中,由 Executor 框架完成线程的调配和任务的执行部分。
ExecutorService
ExecutorService 接口增加了一些能力:
扩展了可异步跟踪执行任务生成返回值 Future 的方法,如 submit() 等方法。
提供了管控线程池生命周期的方法,如 shutDown(),shutDownNow() 等。
AbstractExecutorService
AbstractExecutorService 则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。
最下层的实现类 ThreadPoolExecutor 实现最复杂的运行部分,ThreadPoolExecutor 将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
ThreadPoolExecutor
线程池使用核心类各个属性如下:
- corePool:核心线程池的大小,启动就会创建,这些线程一直保活,不受keepAliveTime的影响,不管它们创建以后是不是空闲的。线程池需要保持 corePoolSize 数量的线程,除非设置了 allowCoreThreadTimeOut。
- maximumPool:最大线程池的大小,弹性扩容的最大线程数量 = 核心线程+非核心线程。
- BlockingQueue:用来暂时保存任务的工作队列。
- keepAliveTime:弹性扩容的线程的最大保活时间,如果弹性扩容的线程(非核心线程)处于空闲状态的时间超过keepAliveTime则会被销毁,
- threadFactory:the factory to use when the executor creates a new thread。
(线程工程:用来创建线程工厂。比如这里面可以自定义线程名称,当进行虚拟机栈分析时,看着名字就知道这个线程是哪里来的,不会懵逼。) - RejectedExecutionHandler:拒绝策略。(拒绝策略:当队列里面放满了任务、最大线程数的线程都在工作时,这时继续提交的任务线程池就处理不了,应该执行怎么样的拒绝策略。)
2、 RejectedExecutionHandler 拒绝策略
RejectedExecutionHandler是一个接口:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor);}
里面只有一个方法。当要创建的线程数量大于线程池的最大线程数的时候,新的任务就会被拒绝,就会调用这个接口里的这个方法。
可以自己实现这个接口,实现对这些超出数量的任务的处理。
ThreadPoolExecutor自己已经提供了四个拒绝策略
,分别是CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy
这四个拒绝策略其实一看实现方法就知道很简单。
AbortPolicy
ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy。直接抛出异常。
很简单粗暴,直接抛出个RejectedExecutionException异常,也不执行这个任务了。
CallerRunsPolicy
CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程(main线程)去执行被拒绝的任务。
下面说他的实现:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(2), new ThreadPoolExecutor.AbortPolicy());
按上面的运行,输出
注意在添加第五个任务,任务5 的时候,同样被线程池拒绝了,因此执行了CallerRunsPolicy的rejectedExecution方法,这个方法直接执行任务的run方法。因此可以看到任务5是在main线程中执行的。
从中也可以看出,因为第五个任务在主线程中运行,所以主线程就被阻塞了,以至于当第五个任务执行完,添加第六个任务时,前面两个任务已经执行完了,有了空闲线程,因此线程6又可以添加到线程池中执行了。
这个策略的缺点就是可能会阻塞主线程。
DiscardPolicy
这个策略的处理就更简单了,看一下实现就明白了:
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }}
这个东西什么都没干。因此采用这个拒绝策略,会让被线程池拒绝的任务直接抛弃,不会抛异常也不会执行。
DiscardOldestPolicy
DiscardOldestPolicy策略的作用是,当任务被拒绝添加时,会抛弃任务队列中最旧的任务也就是最先加入队列的,再把这个新任务添加进去。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }}
在rejectedExecution先从任务队列总弹出最先加入的任务,空出一个位置,然后再次执行execute方法把任务加入队列。
3、shutdown()和shutdownNow()关闭
- shutdown:将线程池状态置为SHUTDOWN并拒绝接受新任务,等到线程池Worker数量为0,任务阻塞队列为空时,关闭线程池。意味着已经加入队列的任务还是会被执行完毕
- shutdownNow:线程池中的所有Worker都会被中断(有可能任务执行到一半也被中断),包括正在执行任务的Worker,等到所有Worker都被删除之后,线程池即被终止,也就是说,不会保证当前时刻正在执行的任务会被安全的执行完,并且会放弃执行任务阻塞队列中的所有任务
4、线程池核心逻辑
二、常用线程池
1、FixedThreadPool
进入Executors类,第一个引入眼帘的就是FixedThreadPool,内部核心线程一直保活直至整个程序关闭
可以看到,它的核心线程数和最大线程数
是一样的。也就意味着它没有非核心线程,第三个参数表示非核心线程数无任务占用时的存活时间,因为没非核心线程,所以没有意义。第四个参数表示存活时间单位。第五个参数表示阻塞队列类型。通过该阻塞队列长度可知这是个无界队列。
无界的阻塞队列
,就意味着它会有内存溢出的风险。适用于任务量固定耗时长的任务。
2、singleThreadPool
核心线程和最大线程都为1,表示同一时刻最多只会有一个线程。适用于多个任务顺序使用的场景。
3、CachedThreadPool
没有核心线程,最大线程数无界
。意味着所有线程都是非核心线程,线程闲置60秒后会销毁线程。SynchronousQueue是个长度为0的阻塞队列。如果存在大量长时的任务,会导致cpu占用率百分百,所以它适合任务量大但耗时少的任务。
4、ScheduledThreadPool
可以看到,ScheduledThreadPool核心线程数为用户自定义,最大线程数无限制,非核心线程数一空闲立刻销毁。队列采用了一个可对任务按延时时间进行排序的队列,延时时间越短排在最前。这也是个无界队列。这种线程池适合固定周期的定时任务或重复任务。但无界的队列和无限制的最大线程数,意味着它会出现内存溢出和cpu占用率百分百的问题。
点入super()
可以看出,FixedThreadPool、singleThreadPool、CachedThreadPool和ScheduledThreadPool都是返回的threadpoolexecutor对象。
5、SingleThreadScheduledExecutor
是个核心线程数为1的ScheduledThreadPool,也就是ScheduledThreadPool的一个特例。用于定期或延时执行任务。
6、对比分析
FixedThreadPool适用于任务量固定耗时长的任务。singleThreadPool适用于多个任务顺序使用的场景。CachedThreadPool适合任务量大但耗时少的任务。ScheduledThreadPool适合固定周期的定时任务或重复任务。singleThreadScheduledExecutor用于定期或延时执行任务。
三、常用队列
SynchronousQueue
SynchronousQueue是无界的,是一种无缓冲的等待队列,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加;它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败
。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
LinkedBlockingQueue
LinkedBlockingQueue是无界的,是一个无界缓存的等待队列。
基于链表的阻塞队列,内部维持着一个数据缓冲队列(该队列由链表构成)。当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。
LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
ArrayListBlockingQueue
ArrayListBlockingQueue是有界的,是一个有界缓存的等待队列。
基于数组的阻塞队列,同LinkedBlockingQueue类似,内部维持着一个定长数据缓冲队列(该队列由数组构成)。ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。
ArrayBlockingQueue和LinkedBlockingQueue是两个最普通、最常用的阻塞队列,一般情况下,处理多线程间的生产者消费者问题,使用这两个类足以。
对比分析
SynchronousQueue(无缓存):相当于队列中不缓存任何数据,如果任务超过maximumPool,则会触发异常RejectedExecutionException,无法加入队列,可以保存顺序执行
LinkedBlockingQueue(无限缓存):如果任务超过maximumPool,则会放入队列,知道队列满;如果程序异常退出,队列中的数据则会丢失。
四、实战
前置准备-生产者
@SneakyThrows
public static void producer(){
new Thread(()->{
int i = 0;
while(i<TASK_NUM ){
i++;
Task task = new Task();
task.setValue(i);
try{
Future t = getPool().submit(task);
}catch (RejectedExecutionException e){
System.out.printf("队列已经满,无法消费:%d, 原因:%s, 引起:%s\r\n",i,e.getMessage(),e.getClass());
// throw e;
}
try {
Thread.sleep(PRODUCER_SLEEP);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
}
前置准备
private static ExecutorService EXECUTOR;
/**
* type=1用hutool的工具类,2用自定义
*/
private static int type;
/**
* 任务数量
*/
private static int TASK_NUM = 5;
/**
* 生产者间隔
*/
private static int PRODUCER_SLEEP = 1000;
/**
* 任务好使
*/
private static int TASK_CONSUMING = 3000;
/**
* 创建线程池
* @param corePoolSize 核心线程池的大小
* @param maxPoolSize 最大线程池的大小
* @param keepAliveTime 弹性扩容的线程的最大保活时间
* @param unit 单位
* @param workQueue 工作队列
* @param handler 异常处理
*/
public static void createPool(int corePoolSize,int maxPoolSize, int keepAliveTime, TimeUnit unit,BlockingQueue workQueue,RejectedExecutionHandler handler){
ThreadFactory threadFactory = Executors.defaultThreadFactory();
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) ObjectUtil.defaultIfNull(handler, ThreadPoolExecutor.AbortPolicy::new);
EXECUTOR = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit,workQueue, threadFactory, rejectedExecutionHandler);
}
/**
* 监控
*/
@SneakyThrows
public static void minitor(){
while(true) {
Thread.sleep(1000);
int coreSize = getPool().getPoolSize();
int activeCount = getPool().getActiveCount();
int size = getPool().getQueue().size();
System.out.printf("%s 剩余:%s coreSize:%d 活跃线程:%s\r\n", DateUtil.getDateTimeLongStr(), size, coreSize, activeCount);
if (coreSize == 0) {
break;
}
}
}
/**
* 任务,模拟运行3秒
*/
@Data
public static class Task implements Runnable{
private int value;
@Override
public void run() {
String threadName = Thread.currentThread().getName();
long threadId = Thread.currentThread().getId();
int size = getPool().getQueue().size();
int activeCount = getPool().getActiveCount();
System.out.printf("%s 线程名:%s 线程id:%s 消费::%d ; 剩余:%s 活跃线程:%s\r\n", DateUtil.getDateTimeLongStr(),threadName, threadId,value, size, activeCount);
try {
Thread.sleep(TASK_CONSUMING );
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
/**
* 获取线程池
* @return ExecutorService
*/
public static ExecutorService getExecutor(){
if(type == 1){
return GlobalThreadPool.getExecutor();
}else{
return EXECUTOR;
}
}
/**
* 获取线程池
* @return ExecutorService
*/
public static ThreadPoolExecutor getPool(){
return (ThreadPoolExecutor)getExecutor();
}
使用LinkedBlockingQueue
该例子中,任务执行时间设置3秒
场景一 (核心线程:2,最大线程:2,队列大小:1,keepAliveTime:1)
public static void test2(){
type = 2;
TASK_NUM = 5;
PRODUCER_SLEEP = 10;
TASK_CONSUMING = 3000;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(1);
createPool(2,2,1,TimeUnit.SECONDS,workQueue,null);
producer();
minitor();
}
结果如下,可以看出只有3个任务被消费,其他任务因队列满无法消费,并且核心线程一直存活。
2023-04-13 12:14:55:725 线程名:pool-2-thread-1 线程id:24 消费::1 ; 剩余:0 活跃线程:1
2023-04-13 12:14:55:741 线程名:pool-2-thread-2 线程id:25 消费::2 ; 剩余:0 活跃线程:2
队列已经满,无法消费:4, 原因:Task java.util.concurrent.FutureTask@5dd27c4c rejected from java.util.concurrent.ThreadPoolExecutor@22b44f44[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
队列已经满,无法消费:5, 原因:Task java.util.concurrent.FutureTask@8a80be8 rejected from java.util.concurrent.ThreadPoolExecutor@22b44f44[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
2023-04-13 12:14:56:739 剩余:1 coreSize:2 活跃线程:2
2023-04-13 12:14:57:743 剩余:1 coreSize:2 活跃线程:2
2023-04-13 12:14:58:738 线程名:pool-2-thread-1 线程id:24 消费::3 ; 剩余:0 活跃线程:2
2023-04-13 12:14:58:753 剩余:0 coreSize:2 活跃线程:2
2023-04-13 12:14:59:759 剩余:0 coreSize:2 活跃线程:1
2023-04-13 12:15:00:769 剩余:0 coreSize:2 活跃线程:1
2023-04-13 12:15:01:776 剩余:0 coreSize:2 活跃线程:0
2023-04-13 12:15:02:785 剩余:0 coreSize:2 活跃线程:0
2023-04-13 12:15:03:794 剩余:0 coreSize:2 活跃线程:0
2023-04-13 12:15:04:795 剩余:0 coreSize:2 活跃线程:0
2023-04-13 12:15:05:800 剩余:0 coreSize:2 活跃线程:0
场景二 (核心线程:0,最大线程:2,队列大小:1,keepAliveTime:1)
可以发现,如果任务1到达时线程还没有创建,先触发创建1个线程,并执行任务1;接着任务2到达,发现没有可用线程则放入队列;接着任务3到达,发现队列满了,核心线程只有1,且没有到达最大线程,则创建新的线程执行3。所以效果就是先执行1,3。等任务1执行完,线程空闲之后执行任务2。最后线程都空闲之后到达keepAliveTime的时间,全部销毁
,
此例子中核心线程只要<2,都是先执行1,3。后执行2
2023-04-13 12:17:35:835 线程名:pool-2-thread-1 线程id:24 消费::1 ; 剩余:0 活跃线程:1
2023-04-13 12:17:35:867 线程名:pool-2-thread-2 线程id:25 消费::3 ; 剩余:1 活跃线程:2
队列已经满,无法消费:4, 原因:Task java.util.concurrent.FutureTask@4569badd rejected from java.util.concurrent.ThreadPoolExecutor@5e13665e[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
队列已经满,无法消费:5, 原因:Task java.util.concurrent.FutureTask@565a128b rejected from java.util.concurrent.ThreadPoolExecutor@5e13665e[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
2023-04-13 12:17:36:843 剩余:1 coreSize:2 活跃线程:2
2023-04-13 12:17:37:851 剩余:1 coreSize:2 活跃线程:2
2023-04-13 12:17:38:846 线程名:pool-2-thread-1 线程id:24 消费::2 ; 剩余:0 活跃线程:2
2023-04-13 12:17:38:862 剩余:0 coreSize:2 活跃线程:2
2023-04-13 12:17:39:873 剩余:0 coreSize:2 活跃线程:1
2023-04-13 12:17:40:886 剩余:0 coreSize:1 活跃线程:1
2023-04-13 12:17:41:890 剩余:0 coreSize:1 活跃线程:0
2023-04-13 12:17:42:895 剩余:0 coreSize:0 活跃线程:0
场景三、 (核心线程:0,最大线程:2,队列大小:2,keepAliveTime:1)
2023-04-13 15:05:04:744 线程名:pool-2-thread-1 线程id:24 消费::1 ; 剩余:0 活跃线程:1
2023-04-13 15:05:04:796 线程名:pool-2-thread-2 线程id:25 消费::4 ; 剩余:2 活跃线程:2
队列已经满,无法消费:5, 原因:Task java.util.concurrent.FutureTask@37c7d0ff rejected from java.util.concurrent.ThreadPoolExecutor@3c74ff6e[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
2023-04-13 15:05:05:758 剩余:2 coreSize:2 活跃线程:2
2023-04-13 15:05:06:773 剩余:2 coreSize:2 活跃线程:2
2023-04-13 15:05:07:756 线程名:pool-2-thread-1 线程id:24 消费::2 ; 剩余:1 活跃线程:2
2023-04-13 15:05:07:787 剩余:1 coreSize:2 活跃线程:2
2023-04-13 15:05:07:802 线程名:pool-2-thread-2 线程id:25 消费::3 ; 剩余:0 活跃线程:2
2023-04-13 15:05:08:799 剩余:0 coreSize:2 活跃线程:2
2023-04-13 15:05:09:809 剩余:0 coreSize:2 活跃线程:2
2023-04-13 15:05:10:819 剩余:0 coreSize:2 活跃线程:0
2023-04-13 15:05:11:829 剩余:0 coreSize:0 活跃线程:0
场景四、 (核心线程:2,最大线程:5,队列大小:3,keepAliveTime:1)
2023-04-13 15:19:35:107 线程名:pool-2-thread-1 线程id:24 消费::1 ; 剩余:0 活跃线程:1
2023-04-13 15:19:35:620 线程名:pool-2-thread-2 线程id:25 消费::2 ; 剩余:0 活跃线程:2
2023-04-13 15:19:36:120 剩余:0 coreSize:2 活跃线程:2
2023-04-13 15:19:37:129 剩余:2 coreSize:2 活跃线程:2
2023-04-13 15:19:37:675 线程名:pool-2-thread-3 线程id:26 消费::6 ; 剩余:3 活跃线程:3
2023-04-13 15:19:38:112 线程名:pool-2-thread-1 线程id:24 消费::3 ; 剩余:2 活跃线程:3
2023-04-13 15:19:38:143 剩余:2 coreSize:3 活跃线程:3
2023-04-13 15:19:38:625 线程名:pool-2-thread-2 线程id:25 消费::4 ; 剩余:2 活跃线程:3
2023-04-13 15:19:39:153 剩余:3 coreSize:3 活跃线程:3
2023-04-13 15:19:39:201 线程名:pool-2-thread-4 线程id:27 消费::9 ; 剩余:3 活跃线程:4
2023-04-13 15:19:39:716 线程名:pool-2-thread-5 线程id:28 消费::10 ; 剩余:3 活跃线程:5
2023-04-13 15:19:40:164 剩余:3 coreSize:5 活跃线程:5
队列已经满,无法消费:11, 原因:Task java.util.concurrent.FutureTask@168a604 rejected from java.util.concurrent.ThreadPoolExecutor@6db52332[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 2], 引起:class java.util.concurrent.RejectedExecutionException
2023-04-13 15:19:40:678 线程名:pool-2-thread-3 线程id:26 消费::5 ; 剩余:2 活跃线程:5
2023-04-13 15:19:41:114 线程名:pool-2-thread-1 线程id:24 消费::7 ; 剩余:2 活跃线程:5
2023-04-13 15:19:41:176 剩余:2 coreSize:5 活跃线程:5
2023-04-13 15:19:41:631 线程名:pool-2-thread-2 线程id:25 消费::8 ; 剩余:2 活跃线程:5
2023-04-13 15:19:42:177 剩余:3 coreSize:5 活跃线程:5
2023-04-13 15:19:42:209 线程名:pool-2-thread-4 线程id:27 消费::12 ; 剩余:2 活跃线程:5
2023-04-13 15:19:42:728 线程名:pool-2-thread-5 线程id:28 消费::13 ; 剩余:2 活跃线程:5
总结
只有当队列中任务数量>corePoolSize时,才会触发创建非核心线程
,并且根据任务执行的长短,可能会导致无法顺序执行。并且使用队列如果系统异常关闭,队列中数据则会丢失
使用SynchronousQueue
场景一(核心线程:0,最大线程:2,keepAliveTime:1)
type = 2;
PRODUCER_SLEEP = 500;
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
createPool(0,2,1,TimeUnit.SECONDS,workQueue,null);
producer();
minitor();
执行效果,只有<= 最大线程数量的任务执行完,其他失败
2023-04-13 15:42:56:248 线程名:pool-2-thread-1 线程id:24 消费::1 ; 剩余:0 活跃线程:1
2023-04-13 15:42:56:755 线程名:pool-2-thread-2 线程id:25 消费::2 ; 剩余:0 活跃线程:2
2023-04-13 15:42:57:254 剩余:0 coreSize:2 活跃线程:2
队列已经满,无法消费:3, 原因:Task java.util.concurrent.FutureTask@512c2e56 rejected from java.util.concurrent.ThreadPoolExecutor@a9d0ad[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
队列已经满,无法消费:4, 原因:Task java.util.concurrent.FutureTask@23d10861 rejected from java.util.concurrent.ThreadPoolExecutor@a9d0ad[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
2023-04-13 15:42:58:264 剩余:0 coreSize:2 活跃线程:2
队列已经满,无法消费:5, 原因:Task java.util.concurrent.FutureTask@13bfccd3 rejected from java.util.concurrent.ThreadPoolExecutor@a9d0ad[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
2023-04-13 15:42:59:277 剩余:0 coreSize:2 活跃线程:1
2023-04-13 15:43:00:290 剩余:0 coreSize:1 活跃线程:0
2023-04-13 15:43:01:302 剩余:0 coreSize:0 活跃线程:0
场景二(核心线程:2,最大线程:2,keepAliveTime:1)
执行效果,只有<= 最大线程数量的任务执行完,其他失败
2023-04-13 15:45:16:637 线程名:pool-2-thread-1 线程id:24 消费::1 ; 剩余:0 活跃线程:1
2023-04-13 15:45:16:689 线程名:pool-2-thread-2 线程id:25 消费::2 ; 剩余:0 活跃线程:2
队列已经满,无法消费:3, 原因:Task java.util.concurrent.FutureTask@3d4c4479 rejected from java.util.concurrent.ThreadPoolExecutor@6c25d219[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
队列已经满,无法消费:4, 原因:Task java.util.concurrent.FutureTask@50b6e96 rejected from java.util.concurrent.ThreadPoolExecutor@6c25d219[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
队列已经满,无法消费:5, 原因:Task java.util.concurrent.FutureTask@5abb8dc2 rejected from java.util.concurrent.ThreadPoolExecutor@6c25d219[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
场景二(核心线程:2,最大线程:5,keepAliveTime:1,生产者500ms,消费者:1000ms)
type = 2;
PRODUCER_SLEEP = 500;
TASK_CONSUMING = 1000;
TASK_NUM = 10;
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
createPool(2,5,1,TimeUnit.SECONDS,workQueue,null);
producer();
minitor();
执行效果,任务全部被执行完,
2023-04-13 15:47:15:536 线程名:pool-2-thread-1 线程id:24 消费::1 ; 剩余:0 活跃线程:1
2023-04-13 15:47:16:046 线程名:pool-2-thread-2 线程id:25 消费::2 ; 剩余:0 活跃线程:2
2023-04-13 15:47:16:544 剩余:0 coreSize:2 活跃线程:2
2023-04-13 15:47:16:559 线程名:pool-2-thread-1 线程id:24 消费::3 ; 剩余:0 活跃线程:2
2023-04-13 15:47:17:072 线程名:pool-2-thread-2 线程id:25 消费::4 ; 剩余:0 活跃线程:2
2023-04-13 15:47:17:558 剩余:0 coreSize:2 活跃线程:2
2023-04-13 15:47:17:574 线程名:pool-2-thread-3 线程id:26 消费::5 ; 剩余:0 活跃线程:2
2023-04-13 15:47:18:090 线程名:pool-2-thread-1 线程id:24 消费::6 ; 剩余:0 活跃线程:2
2023-04-13 15:47:18:575 剩余:0 coreSize:3 活跃线程:1
2023-04-13 15:47:18:590 线程名:pool-2-thread-3 线程id:26 消费::7 ; 剩余:0 活跃线程:2
2023-04-13 15:47:19:105 线程名:pool-2-thread-1 线程id:24 消费::8 ; 剩余:0 活跃线程:2
2023-04-13 15:47:19:586 剩余:0 coreSize:2 活跃线程:2
2023-04-13 15:47:19:616 线程名:pool-2-thread-3 线程id:26 消费::9 ; 剩余:0 活跃线程:2
2023-04-13 15:47:20:131 线程名:pool-2-thread-1 线程id:24 消费::10 ; 剩余:0 活跃线程:2
总结
SynchronousQueue相当于没有任何缓存的队列,也就是队列中无法放入任何元素,只要没有空余的线程执行任务,后续的任务都会被拒绝
,也就是这种队列最多只能跑满所有线程,无法缓存
五、动态修改线程池大小
参考:Java 线程池及参数动态调节详解
/**
* 动态设置线程池大小
* @param executor 线程池
* @param corePoolSize 核心线程数量
* @param maxPoolSize 最大线程数量
*/
private static void dynamicModifyExecutor(ExecutorService executor,int corePoolSize, int maxPoolSize){
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)executor;
threadPoolExecutor.setCorePoolSize(corePoolSize);
threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
threadPoolExecutor.prestartCoreThread();
}
测试
/**
* 动态线程池
*/
@SneakyThrows
public static void test4(){
type = 2;
TASK_NUM = 20;
PRODUCER_SLEEP = 500;
TASK_CONSUMING = 2000;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(1);
createPool(0,2,1,TimeUnit.SECONDS,workQueue,new NamedThreadFactory("测试"),null);
producer();
Thread.sleep(2000);
// 修改大小
dynamicModifyExecutor(getExecutor(),1,4);
minitor();
}
六、kafka消费实践
使用SynchronousQueue来处理业务,
message_task消费任务表
- messageId:消息id
- messageBody:消息内容
- status:0未消费,2消费中,3已经消费,4消费失败
kafka设置自动提交,
1)、消费者收到消息之后先判断线程池是否空闲,是否可以插入新的任务;如果可以则插入message_task表,status =1。消息自动提交。
2)、如果线程池已经满,则while循环等待线程池空闲,可以通过getPool().getActiveCount()判断休眠sleep(50),kafka消费阻塞
3)、线程执行任务完成,则修改status = 3,或者直接从该任务表中删掉
4)、线程如果执行失败,则修改status = 4,等待定时任务唤醒,或者重新丢回队列