Java线程池从入门到精通(线程池实战)

news2024/11/17 14:19:45

参考

java常用线程池及它们的适用场景(JDK1.8)
Java线程与线程池实战
线程池的拒绝策略_线程池 RejectedExecutionHandler 拒绝策略
ThreadPoolExecutor原理解析-关闭线程池
代码经验—java获取cpu个数-docker

一、概念

Java 中的线程池核心实现类是 ThreadPoolExecutor,本文基于 JDK 1.8 的源码来分析 Java 线程池的核心设计与实现。

1、核心类

线程池接口继承实现类图,最终使用ThreadPoolExecutor类来使用JDK提供的线程池功能
在这里插入图片描述

Executor

ThreadPoolExecutor 实现的顶层接口是 Executor。

Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供 Runnable 对象,将任务的运行逻辑提交到执行器(Executor)中,由 Executor 框架完成线程的调配和任务的执行部分。

ExecutorService

ExecutorService 接口增加了一些能力:

扩展了可异步跟踪执行任务生成返回值 Future 的方法,如 submit() 等方法。
提供了管控线程池生命周期的方法,如 shutDown(),shutDownNow() 等。

AbstractExecutorService

AbstractExecutorService 则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。

最下层的实现类 ThreadPoolExecutor 实现最复杂的运行部分,ThreadPoolExecutor 将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

ThreadPoolExecutor

线程池使用核心类各个属性如下:

  • corePool:核心线程池的大小,启动就会创建,这些线程一直保活,不受keepAliveTime的影响,不管它们创建以后是不是空闲的。线程池需要保持 corePoolSize 数量的线程,除非设置了 allowCoreThreadTimeOut。
  • maximumPool:最大线程池的大小,弹性扩容的最大线程数量 = 核心线程+非核心线程。
  • BlockingQueue:用来暂时保存任务的工作队列。
  • keepAliveTime:弹性扩容的线程的最大保活时间,如果弹性扩容的线程(非核心线程)处于空闲状态的时间超过keepAliveTime则会被销毁,
  • threadFactory:the factory to use when the executor creates a new thread。
    (线程工程:用来创建线程工厂。比如这里面可以自定义线程名称,当进行虚拟机栈分析时,看着名字就知道这个线程是哪里来的,不会懵逼。)
  • RejectedExecutionHandler:拒绝策略。(拒绝策略:当队列里面放满了任务、最大线程数的线程都在工作时,这时继续提交的任务线程池就处理不了,应该执行怎么样的拒绝策略。)

2、 RejectedExecutionHandler 拒绝策略

RejectedExecutionHandler是一个接口:

public interface RejectedExecutionHandler {    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);}

里面只有一个方法。当要创建的线程数量大于线程池的最大线程数的时候,新的任务就会被拒绝,就会调用这个接口里的这个方法。

可以自己实现这个接口,实现对这些超出数量的任务的处理。

ThreadPoolExecutor自己已经提供了四个拒绝策略,分别是CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy

这四个拒绝策略其实一看实现方法就知道很简单。

AbortPolicy

ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy。直接抛出异常。
很简单粗暴,直接抛出个RejectedExecutionException异常,也不执行这个任务了。

CallerRunsPolicy

CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程(main线程)去执行被拒绝的任务。

下面说他的实现:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30,        TimeUnit.SECONDS,        new LinkedBlockingDeque<Runnable>(2),        new ThreadPoolExecutor.AbortPolicy());

按上面的运行,输出

在这里插入图片描述

注意在添加第五个任务,任务5 的时候,同样被线程池拒绝了,因此执行了CallerRunsPolicy的rejectedExecution方法,这个方法直接执行任务的run方法。因此可以看到任务5是在main线程中执行的。

从中也可以看出,因为第五个任务在主线程中运行,所以主线程就被阻塞了,以至于当第五个任务执行完,添加第六个任务时,前面两个任务已经执行完了,有了空闲线程,因此线程6又可以添加到线程池中执行了。

这个策略的缺点就是可能会阻塞主线程。

DiscardPolicy

这个策略的处理就更简单了,看一下实现就明白了:

public static class DiscardPolicy implements RejectedExecutionHandler {    public DiscardPolicy() { }    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {    }}

这个东西什么都没干。因此采用这个拒绝策略,会让被线程池拒绝的任务直接抛弃,不会抛异常也不会执行。

DiscardOldestPolicy

DiscardOldestPolicy策略的作用是,当任务被拒绝添加时,会抛弃任务队列中最旧的任务也就是最先加入队列的,再把这个新任务添加进去。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {    public DiscardOldestPolicy() { }    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        if (!e.isShutdown()) {            e.getQueue().poll();            e.execute(r);        }    }}

在rejectedExecution先从任务队列总弹出最先加入的任务,空出一个位置,然后再次执行execute方法把任务加入队列。

3、shutdown()和shutdownNow()关闭

  • shutdown:将线程池状态置为SHUTDOWN并拒绝接受新任务,等到线程池Worker数量为0,任务阻塞队列为空时,关闭线程池。意味着已经加入队列的任务还是会被执行完毕
  • shutdownNow:线程池中的所有Worker都会被中断(有可能任务执行到一半也被中断),包括正在执行任务的Worker,等到所有Worker都被删除之后,线程池即被终止,也就是说,不会保证当前时刻正在执行的任务会被安全的执行完,并且会放弃执行任务阻塞队列中的所有任务

4、线程池核心逻辑

在这里插入图片描述

二、常用线程池

1、FixedThreadPool

进入Executors类,第一个引入眼帘的就是FixedThreadPool,内部核心线程一直保活直至整个程序关闭
在这里插入图片描述
可以看到,它的核心线程数和最大线程数是一样的。也就意味着它没有非核心线程,第三个参数表示非核心线程数无任务占用时的存活时间,因为没非核心线程,所以没有意义。第四个参数表示存活时间单位。第五个参数表示阻塞队列类型。通过该阻塞队列长度可知这是个无界队列。

在这里插入图片描述

无界的阻塞队列,就意味着它会有内存溢出的风险。适用于任务量固定耗时长的任务。

2、singleThreadPool

在这里插入图片描述

核心线程和最大线程都为1,表示同一时刻最多只会有一个线程。适用于多个任务顺序使用的场景。

3、CachedThreadPool

在这里插入图片描述

没有核心线程,最大线程数无界。意味着所有线程都是非核心线程,线程闲置60秒后会销毁线程。SynchronousQueue是个长度为0的阻塞队列。如果存在大量长时的任务,会导致cpu占用率百分百,所以它适合任务量大但耗时少的任务。

4、ScheduledThreadPool

在这里插入图片描述

在这里插入图片描述
可以看到,ScheduledThreadPool核心线程数为用户自定义,最大线程数无限制,非核心线程数一空闲立刻销毁。队列采用了一个可对任务按延时时间进行排序的队列,延时时间越短排在最前。这也是个无界队列。这种线程池适合固定周期的定时任务或重复任务。但无界的队列和无限制的最大线程数,意味着它会出现内存溢出和cpu占用率百分百的问题。
点入super()
在这里插入图片描述

可以看出,FixedThreadPool、singleThreadPool、CachedThreadPool和ScheduledThreadPool都是返回的threadpoolexecutor对象。

5、SingleThreadScheduledExecutor

在这里插入图片描述

是个核心线程数为1的ScheduledThreadPool,也就是ScheduledThreadPool的一个特例。用于定期或延时执行任务。

6、对比分析

FixedThreadPool适用于任务量固定耗时长的任务。singleThreadPool适用于多个任务顺序使用的场景。CachedThreadPool适合任务量大但耗时少的任务。ScheduledThreadPool适合固定周期的定时任务或重复任务。singleThreadScheduledExecutor用于定期或延时执行任务。

三、常用队列

SynchronousQueue

SynchronousQueue是无界的,是一种无缓冲的等待队列,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加;它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败 。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

LinkedBlockingQueue

LinkedBlockingQueue是无界的,是一个无界缓存的等待队列。

基于链表的阻塞队列,内部维持着一个数据缓冲队列(该队列由链表构成)。当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。

LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

ArrayListBlockingQueue

ArrayListBlockingQueue是有界的,是一个有界缓存的等待队列。
基于数组的阻塞队列,同LinkedBlockingQueue类似,内部维持着一个定长数据缓冲队列(该队列由数组构成)。ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。
ArrayBlockingQueue和LinkedBlockingQueue是两个最普通、最常用的阻塞队列,一般情况下,处理多线程间的生产者消费者问题,使用这两个类足以。

对比分析

SynchronousQueue(无缓存):相当于队列中不缓存任何数据,如果任务超过maximumPool,则会触发异常RejectedExecutionException,无法加入队列,可以保存顺序执行
LinkedBlockingQueue(无限缓存):如果任务超过maximumPool,则会放入队列,知道队列满;如果程序异常退出,队列中的数据则会丢失。

四、实战

前置准备-生产者

@SneakyThrows
    public static void producer(){
        new Thread(()->{
            int i = 0;
            while(i<TASK_NUM ){
                i++;
                Task task = new Task();
                task.setValue(i);
                try{
                   Future t = getPool().submit(task);
                }catch (RejectedExecutionException e){

                    System.out.printf("队列已经满,无法消费:%d, 原因:%s, 引起:%s\r\n",i,e.getMessage(),e.getClass());
//                    throw e;
                }

                try {
                    Thread.sleep(PRODUCER_SLEEP);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

    }

前置准备


    private static ExecutorService EXECUTOR;
/**
     * type=1用hutool的工具类,2用自定义
     */
    private static int type;
/**
     * 任务数量
     */
    private static int TASK_NUM = 5;

    /**
     * 生产者间隔
     */
    private static int PRODUCER_SLEEP = 1000;

    /**
     * 任务好使
     */
    private static int TASK_CONSUMING = 3000;
    
/**
     * 创建线程池
     * @param corePoolSize 核心线程池的大小
     * @param maxPoolSize 最大线程池的大小
     * @param keepAliveTime 弹性扩容的线程的最大保活时间
     * @param unit 单位
     * @param workQueue 工作队列
     * @param handler 异常处理
     */
public static void createPool(int corePoolSize,int maxPoolSize, int keepAliveTime, TimeUnit unit,BlockingQueue workQueue,RejectedExecutionHandler handler){
        ThreadFactory threadFactory =  Executors.defaultThreadFactory();
        RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) ObjectUtil.defaultIfNull(handler, ThreadPoolExecutor.AbortPolicy::new);
        EXECUTOR = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit,workQueue, threadFactory, rejectedExecutionHandler);
    }
    
/**
     * 监控
     */
@SneakyThrows
    public static void minitor(){
        while(true) {
            Thread.sleep(1000);
            int coreSize = getPool().getPoolSize();
            int activeCount = getPool().getActiveCount();
            int size = getPool().getQueue().size();
            System.out.printf("%s 剩余:%s coreSize:%d 活跃线程:%s\r\n", DateUtil.getDateTimeLongStr(), size, coreSize, activeCount);
            if (coreSize == 0) {
                break;
            }
        }
    }
/**
     * 任务,模拟运行3秒
     */
    @Data
    public static class Task implements Runnable{

        private int value;

        @Override
        public void run() {
            String threadName = Thread.currentThread().getName();
            long threadId = Thread.currentThread().getId();
            int size = getPool().getQueue().size();
            int activeCount = getPool().getActiveCount();
            System.out.printf("%s 线程名:%s 线程id:%s 消费::%d ; 剩余:%s 活跃线程:%s\r\n", DateUtil.getDateTimeLongStr(),threadName, threadId,value, size, activeCount);
            try {
                Thread.sleep(TASK_CONSUMING );
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

        }
    }

    /**
     * 获取线程池
     * @return ExecutorService
     */
    public static ExecutorService getExecutor(){
        if(type == 1){
            return GlobalThreadPool.getExecutor();
        }else{
            return EXECUTOR;
        }
    }
    /**
     * 获取线程池
     * @return ExecutorService
     */
    public static ThreadPoolExecutor getPool(){
        return (ThreadPoolExecutor)getExecutor();
    }

使用LinkedBlockingQueue

该例子中,任务执行时间设置3秒

场景一 (核心线程:2,最大线程:2,队列大小:1,keepAliveTime:1)

public static void test2(){
        type = 2;
        TASK_NUM = 5;
        PRODUCER_SLEEP = 10;
        TASK_CONSUMING = 3000;
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(1);
        createPool(2,2,1,TimeUnit.SECONDS,workQueue,null);
        producer();
        minitor();
    }

结果如下,可以看出只有3个任务被消费,其他任务因队列满无法消费,并且核心线程一直存活。

2023-04-13 12:14:55:725 线程名:pool-2-thread-1 线程id:24 消费::1 ; 剩余:0 活跃线程:1
2023-04-13 12:14:55:741 线程名:pool-2-thread-2 线程id:25 消费::2 ; 剩余:0 活跃线程:2
队列已经满,无法消费:4, 原因:Task java.util.concurrent.FutureTask@5dd27c4c rejected from java.util.concurrent.ThreadPoolExecutor@22b44f44[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
队列已经满,无法消费:5, 原因:Task java.util.concurrent.FutureTask@8a80be8 rejected from java.util.concurrent.ThreadPoolExecutor@22b44f44[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
2023-04-13 12:14:56:739 剩余:1 coreSize:2 活跃线程:2
2023-04-13 12:14:57:743 剩余:1 coreSize:2 活跃线程:2
2023-04-13 12:14:58:738 线程名:pool-2-thread-1 线程id:24 消费::3 ; 剩余:0 活跃线程:2
2023-04-13 12:14:58:753 剩余:0 coreSize:2 活跃线程:2
2023-04-13 12:14:59:759 剩余:0 coreSize:2 活跃线程:1
2023-04-13 12:15:00:769 剩余:0 coreSize:2 活跃线程:1
2023-04-13 12:15:01:776 剩余:0 coreSize:2 活跃线程:0
2023-04-13 12:15:02:785 剩余:0 coreSize:2 活跃线程:0
2023-04-13 12:15:03:794 剩余:0 coreSize:2 活跃线程:0
2023-04-13 12:15:04:795 剩余:0 coreSize:2 活跃线程:0
2023-04-13 12:15:05:800 剩余:0 coreSize:2 活跃线程:0

场景二 (核心线程:0,最大线程:2,队列大小:1,keepAliveTime:1)

可以发现,如果任务1到达时线程还没有创建,先触发创建1个线程,并执行任务1;接着任务2到达,发现没有可用线程则放入队列;接着任务3到达,发现队列满了,核心线程只有1,且没有到达最大线程,则创建新的线程执行3。所以效果就是先执行1,3。等任务1执行完,线程空闲之后执行任务2。最后线程都空闲之后到达keepAliveTime的时间,全部销毁
此例子中核心线程只要<2,都是先执行1,3。后执行2

2023-04-13 12:17:35:835 线程名:pool-2-thread-1 线程id:24 消费::1 ; 剩余:0 活跃线程:1
2023-04-13 12:17:35:867 线程名:pool-2-thread-2 线程id:25 消费::3 ; 剩余:1 活跃线程:2
队列已经满,无法消费:4, 原因:Task java.util.concurrent.FutureTask@4569badd rejected from java.util.concurrent.ThreadPoolExecutor@5e13665e[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
队列已经满,无法消费:5, 原因:Task java.util.concurrent.FutureTask@565a128b rejected from java.util.concurrent.ThreadPoolExecutor@5e13665e[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
2023-04-13 12:17:36:843 剩余:1 coreSize:2 活跃线程:2
2023-04-13 12:17:37:851 剩余:1 coreSize:2 活跃线程:2
2023-04-13 12:17:38:846 线程名:pool-2-thread-1 线程id:24 消费::2 ; 剩余:0 活跃线程:2
2023-04-13 12:17:38:862 剩余:0 coreSize:2 活跃线程:2
2023-04-13 12:17:39:873 剩余:0 coreSize:2 活跃线程:1
2023-04-13 12:17:40:886 剩余:0 coreSize:1 活跃线程:1
2023-04-13 12:17:41:890 剩余:0 coreSize:1 活跃线程:0
2023-04-13 12:17:42:895 剩余:0 coreSize:0 活跃线程:0

场景三、 (核心线程:0,最大线程:2,队列大小:2,keepAliveTime:1)

2023-04-13 15:05:04:744 线程名:pool-2-thread-1 线程id:24 消费::1 ; 剩余:0 活跃线程:1
2023-04-13 15:05:04:796 线程名:pool-2-thread-2 线程id:25 消费::4 ; 剩余:2 活跃线程:2
队列已经满,无法消费:5, 原因:Task java.util.concurrent.FutureTask@37c7d0ff rejected from java.util.concurrent.ThreadPoolExecutor@3c74ff6e[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
2023-04-13 15:05:05:758 剩余:2 coreSize:2 活跃线程:2
2023-04-13 15:05:06:773 剩余:2 coreSize:2 活跃线程:2
2023-04-13 15:05:07:756 线程名:pool-2-thread-1 线程id:24 消费::2 ; 剩余:1 活跃线程:2
2023-04-13 15:05:07:787 剩余:1 coreSize:2 活跃线程:2
2023-04-13 15:05:07:802 线程名:pool-2-thread-2 线程id:25 消费::3 ; 剩余:0 活跃线程:2
2023-04-13 15:05:08:799 剩余:0 coreSize:2 活跃线程:2
2023-04-13 15:05:09:809 剩余:0 coreSize:2 活跃线程:2
2023-04-13 15:05:10:819 剩余:0 coreSize:2 活跃线程:0
2023-04-13 15:05:11:829 剩余:0 coreSize:0 活跃线程:0

场景四、 (核心线程:2,最大线程:5,队列大小:3,keepAliveTime:1)

2023-04-13 15:19:35:107 线程名:pool-2-thread-1 线程id:24 消费::1 ; 剩余:0 活跃线程:1
2023-04-13 15:19:35:620 线程名:pool-2-thread-2 线程id:25 消费::2 ; 剩余:0 活跃线程:2
2023-04-13 15:19:36:120 剩余:0 coreSize:2 活跃线程:2
2023-04-13 15:19:37:129 剩余:2 coreSize:2 活跃线程:2
2023-04-13 15:19:37:675 线程名:pool-2-thread-3 线程id:26 消费::6 ; 剩余:3 活跃线程:3
2023-04-13 15:19:38:112 线程名:pool-2-thread-1 线程id:24 消费::3 ; 剩余:2 活跃线程:3
2023-04-13 15:19:38:143 剩余:2 coreSize:3 活跃线程:3
2023-04-13 15:19:38:625 线程名:pool-2-thread-2 线程id:25 消费::4 ; 剩余:2 活跃线程:3
2023-04-13 15:19:39:153 剩余:3 coreSize:3 活跃线程:3
2023-04-13 15:19:39:201 线程名:pool-2-thread-4 线程id:27 消费::9 ; 剩余:3 活跃线程:4
2023-04-13 15:19:39:716 线程名:pool-2-thread-5 线程id:28 消费::10 ; 剩余:3 活跃线程:5
2023-04-13 15:19:40:164 剩余:3 coreSize:5 活跃线程:5
队列已经满,无法消费:11, 原因:Task java.util.concurrent.FutureTask@168a604 rejected from java.util.concurrent.ThreadPoolExecutor@6db52332[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 2], 引起:class java.util.concurrent.RejectedExecutionException
2023-04-13 15:19:40:678 线程名:pool-2-thread-3 线程id:26 消费::5 ; 剩余:2 活跃线程:5
2023-04-13 15:19:41:114 线程名:pool-2-thread-1 线程id:24 消费::7 ; 剩余:2 活跃线程:5
2023-04-13 15:19:41:176 剩余:2 coreSize:5 活跃线程:5
2023-04-13 15:19:41:631 线程名:pool-2-thread-2 线程id:25 消费::8 ; 剩余:2 活跃线程:5
2023-04-13 15:19:42:177 剩余:3 coreSize:5 活跃线程:5
2023-04-13 15:19:42:209 线程名:pool-2-thread-4 线程id:27 消费::12 ; 剩余:2 活跃线程:5
2023-04-13 15:19:42:728 线程名:pool-2-thread-5 线程id:28 消费::13 ; 剩余:2 活跃线程:5

总结

只有当队列中任务数量>corePoolSize时,才会触发创建非核心线程,并且根据任务执行的长短,可能会导致无法顺序执行。并且使用队列如果系统异常关闭,队列中数据则会丢失

使用SynchronousQueue

场景一(核心线程:0,最大线程:2,keepAliveTime:1)

type = 2;
PRODUCER_SLEEP = 500;
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
createPool(0,2,1,TimeUnit.SECONDS,workQueue,null);
producer();
minitor();

执行效果,只有<= 最大线程数量的任务执行完,其他失败

2023-04-13 15:42:56:248 线程名:pool-2-thread-1 线程id:24 消费::1 ; 剩余:0 活跃线程:1
2023-04-13 15:42:56:755 线程名:pool-2-thread-2 线程id:25 消费::2 ; 剩余:0 活跃线程:2
2023-04-13 15:42:57:254 剩余:0 coreSize:2 活跃线程:2
队列已经满,无法消费:3, 原因:Task java.util.concurrent.FutureTask@512c2e56 rejected from java.util.concurrent.ThreadPoolExecutor@a9d0ad[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
队列已经满,无法消费:4, 原因:Task java.util.concurrent.FutureTask@23d10861 rejected from java.util.concurrent.ThreadPoolExecutor@a9d0ad[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
2023-04-13 15:42:58:264 剩余:0 coreSize:2 活跃线程:2
队列已经满,无法消费:5, 原因:Task java.util.concurrent.FutureTask@13bfccd3 rejected from java.util.concurrent.ThreadPoolExecutor@a9d0ad[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
2023-04-13 15:42:59:277 剩余:0 coreSize:2 活跃线程:1
2023-04-13 15:43:00:290 剩余:0 coreSize:1 活跃线程:0
2023-04-13 15:43:01:302 剩余:0 coreSize:0 活跃线程:0

场景二(核心线程:2,最大线程:2,keepAliveTime:1)

执行效果,只有<= 最大线程数量的任务执行完,其他失败

2023-04-13 15:45:16:637 线程名:pool-2-thread-1 线程id:24 消费::1 ; 剩余:0 活跃线程:1
2023-04-13 15:45:16:689 线程名:pool-2-thread-2 线程id:25 消费::2 ; 剩余:0 活跃线程:2
队列已经满,无法消费:3, 原因:Task java.util.concurrent.FutureTask@3d4c4479 rejected from java.util.concurrent.ThreadPoolExecutor@6c25d219[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
队列已经满,无法消费:4, 原因:Task java.util.concurrent.FutureTask@50b6e96 rejected from java.util.concurrent.ThreadPoolExecutor@6c25d219[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException
队列已经满,无法消费:5, 原因:Task java.util.concurrent.FutureTask@5abb8dc2 rejected from java.util.concurrent.ThreadPoolExecutor@6c25d219[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0], 引起:class java.util.concurrent.RejectedExecutionException

场景二(核心线程:2,最大线程:5,keepAliveTime:1,生产者500ms,消费者:1000ms)

type = 2;
PRODUCER_SLEEP = 500;
TASK_CONSUMING = 1000;
TASK_NUM = 10;
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
createPool(2,5,1,TimeUnit.SECONDS,workQueue,null);
producer();
minitor();

执行效果,任务全部被执行完,

2023-04-13 15:47:15:536 线程名:pool-2-thread-1 线程id:24 消费::1 ; 剩余:0 活跃线程:1
2023-04-13 15:47:16:046 线程名:pool-2-thread-2 线程id:25 消费::2 ; 剩余:0 活跃线程:2
2023-04-13 15:47:16:544 剩余:0 coreSize:2 活跃线程:2
2023-04-13 15:47:16:559 线程名:pool-2-thread-1 线程id:24 消费::3 ; 剩余:0 活跃线程:2
2023-04-13 15:47:17:072 线程名:pool-2-thread-2 线程id:25 消费::4 ; 剩余:0 活跃线程:2
2023-04-13 15:47:17:558 剩余:0 coreSize:2 活跃线程:2
2023-04-13 15:47:17:574 线程名:pool-2-thread-3 线程id:26 消费::5 ; 剩余:0 活跃线程:2
2023-04-13 15:47:18:090 线程名:pool-2-thread-1 线程id:24 消费::6 ; 剩余:0 活跃线程:2
2023-04-13 15:47:18:575 剩余:0 coreSize:3 活跃线程:1
2023-04-13 15:47:18:590 线程名:pool-2-thread-3 线程id:26 消费::7 ; 剩余:0 活跃线程:2
2023-04-13 15:47:19:105 线程名:pool-2-thread-1 线程id:24 消费::8 ; 剩余:0 活跃线程:2
2023-04-13 15:47:19:586 剩余:0 coreSize:2 活跃线程:2
2023-04-13 15:47:19:616 线程名:pool-2-thread-3 线程id:26 消费::9 ; 剩余:0 活跃线程:2
2023-04-13 15:47:20:131 线程名:pool-2-thread-1 线程id:24 消费::10 ; 剩余:0 活跃线程:2

总结

SynchronousQueue相当于没有任何缓存的队列,也就是队列中无法放入任何元素,只要没有空余的线程执行任务,后续的任务都会被拒绝,也就是这种队列最多只能跑满所有线程,无法缓存

五、动态修改线程池大小

参考:Java 线程池及参数动态调节详解

/**
 * 动态设置线程池大小
 * @param executor 线程池
 * @param corePoolSize 核心线程数量
 * @param maxPoolSize 最大线程数量
 */
private static void dynamicModifyExecutor(ExecutorService executor,int corePoolSize, int maxPoolSize){
    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)executor;
    threadPoolExecutor.setCorePoolSize(corePoolSize);
    threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
    threadPoolExecutor.prestartCoreThread();
}

测试

/**
 * 动态线程池
 */
@SneakyThrows
public static void test4(){
    type = 2;
    TASK_NUM = 20;
    PRODUCER_SLEEP = 500;
    TASK_CONSUMING = 2000;
    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(1);
    createPool(0,2,1,TimeUnit.SECONDS,workQueue,new NamedThreadFactory("测试"),null);
    producer();
    Thread.sleep(2000);
    // 修改大小
    dynamicModifyExecutor(getExecutor(),1,4);
    minitor();
}

六、kafka消费实践

使用SynchronousQueue来处理业务,
message_task消费任务表

  • messageId:消息id
  • messageBody:消息内容
  • status:0未消费,2消费中,3已经消费,4消费失败
    kafka设置自动提交,
    1)、消费者收到消息之后先判断线程池是否空闲,是否可以插入新的任务;如果可以则插入message_task表,status =1。消息自动提交。
    2)、如果线程池已经满,则while循环等待线程池空闲,可以通过getPool().getActiveCount()判断休眠sleep(50),kafka消费阻塞
    3)、线程执行任务完成,则修改status = 3,或者直接从该任务表中删掉
    4)、线程如果执行失败,则修改status = 4,等待定时任务唤醒,或者重新丢回队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/455679.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

鉴源论坛 · 观模丨面向界面的图形化测试技术

作者 | 熊一衡 华东师范大学软件工程学院博士 苏亭 华东师范大学软件工程学院教授 版块 | 鉴源论坛 观模 01 什么是面向界面的图形化测试&#xff08;GUI Testing&#xff09; 图形用户界面(GUI) 是一种通过图形化方式呈现信息、数据、功能和操作的用户界面&#xff0c;旨在…

一起学 WebGL:三角形加上渐变色

大家好&#xff0c;我是前端西瓜哥。之前教大家绘制一个红色的三角形&#xff0c;这次我们来画个有渐变的三角形。 本文为系列文章&#xff0c;请先阅读如何绘制红色三角形的文章&#xff1a; 《一起学 WebGL&#xff1a;绘制三角形》 原来的写法&#xff0c;颜色是在片元着色器…

移动边缘计算意味着真正的5G时代已经来临

5G的承诺尚未实现&#xff0c;但现在宣布其失败还为时过早。DataBank首席执行官劳尔k马丁尼克(Raul K. Martynek)表示 &#xff0c;真正的5G正在通过移动边缘计算实现&#xff0c;而数据中心将成为其中的核心。 在美国所有主要的移动运营商都在大力宣传他们在全美范围内提供无…

STM32-移植RTT

目录 Cubemx引入RTT资源新建工程生成工程 时钟选择选单片机引脚引脚搜索快速选中取消引脚选中引脚命名IO普通模式设置 串口串口基本配置串口DMA ADC采集ADC基本应用ADC_DMA RTT-shell指令定义RTTCOM调试串口J-Link RTT调试 教程shell指令RTT外设驱动使用1--串口添加 STM32_pwm …

玩元宇宙血亏后 蓝色光标梭哈AI也挺悬

蓝色光标2022年年度报告出炉&#xff0c;巨亏21.75 亿元&#xff0c;其中20.38亿亏损因商誉、无形资产及其他资产减值造成&#xff0c;而在实际亏损业务中&#xff0c;元宇宙占比不小。 蓝色光标在元宇宙领域的布局&#xff0c;主要通过三家子公司实施&#xff0c;分别为蓝色宇…

分布式文件系统HDFS的多问多答

分布式文件系统HDFS 简述HDFS的优缺点简述HDFS的体系结构请论述HDFS中SecondaryNameNode的作用和工作原理请论述HDFS写数据原理 简述HDFS的优缺点 HDFS的优良特性&#xff1a; ①兼容廉价的硬件设备。在成百上千台廉价服务器中存储数据&#xff0c;常会出现节点失效的情况&…

从浏览器输入url到页面加载(四)协议栈和套接字以及三次握手确认对于通信的作用

前言 上一节我们说到了域名对用户记忆的优点&#xff0c;但是IP对于路由器的优点&#xff0c;所以需要有DNS服务器提供域名与IP地址的转换&#xff0c;还说到了在前端开发中dns-prefetch域名预解析的好处。 本小节呢&#xff0c;我们会说一些不常用的知识点&#xff0c;如协议…

【社区图书馆】读《悲惨世界》有感

文章目录 故事简介经典重现价值取向我的思想 故事简介 《悲惨世界》是一部充满了悲剧的小说&#xff0c;故事首先由教堂展开&#xff0c;然后主要围绕着主人公冉阿让进行一系列的生动形象的描写&#xff0c;讲述了冉阿让悲惨的一生。 主人公冉阿让是一个诚实、善良的工人&…

100天涨薪4k,从功能测试到自动化测试,我整理的3000字超全学习指南

去年6月份&#xff0c;由于经济压力让我下定决心进阶自动化测试&#xff0c;已经24的我做了3年功能测试&#xff0c;坐标广州薪资定格在8k&#xff0c;可能是生活过的太安逸&#xff0c;觉得8000的工资也够了&#xff0c;但是生活总是多变的&#xff0c;女朋友的突然怀孕&#…

SpringBoot 整合WebService详解

1. 概述 WebService服务端是以远程接口为主的&#xff0c;在Java实现的WebService技术里主要依靠CXF开发框架&#xff0c;而这个CXF开发框架可以直接将接口发布成WebService。 CXF又分为JAX-WS和JAX-RS&#xff0c;JAX-WS是基于xml协议&#xff0c;而JAX-RS是基于Restful风格&…

OCR卡证识别

文章目录 前言一、DBNet多分类二、步骤1.训练、训练模型推理、模型转换2.通过推理模型进行推理 三、解决思路1、查看模型2、tools/infer/predict_det.py修改3、utility.py修改 总结 前言 最近涉及到了身份证识别&#xff0c;为了便于匹配识别结果的属性&#xff0c;如姓名、身…

(二) AIGC—Stable Difussion (1)

1. 前置知识 目前通用的图像生成模型一般包含三个组件&#xff1a; Text Encoder 根据文字生成向量生成模型 根据向量和Noise 生成 缩小版本的图像Image Decoder 根据小分辨率图像生成大分辨率图像 2. Text Encoder 文字的Encoder对于结果的影响很大&#xff0c;增大Diffusio…

华为p60系列超级快充 Turbo技术,轻松搞定充电困扰!

随着手机的功能越来越丰富&#xff0c;电量消耗也越来越快&#xff0c;当手机电量剩余20%时&#xff0c;是否有电量焦虑。为了满足大家快速充电的需求&#xff0c;华为P60系列配备了超级快充Turbo充电技术&#xff0c;让我们手机充电更快&#xff0c;用的更久&#xff0c;从此告…

Python爬虫解读

爬虫&#xff1a; Python爬虫是指利用计算机程序或者脚本自动抓取网站数据的一种行为&#xff0c;通常是为了提取网站数据或者进行数据分析等目的。 Python 爬虫可以分为手动爬虫和自动爬虫两种。手动爬虫是指完全由人工编写代码来实现的爬虫&#xff0c;这种方式需要编写大量的…

ES使用小结

ES使用总结 1.查询es全部索2.根据es索引查询文档3.查看指定索引mapping文件4.默认查询总数10000条5.删除指定索引文档6.删除所有数据包括索引7.設置窗口值8. logstash简单配置Logstash配置&#xff1a;logstash 控制台输出 9. filebenat配置 1.查询es全部索 localhost:9200/_c…

为什么说网络安全行业是IT行业最后的红利?

前言 2023年网络安全行业的前景看起来非常乐观。根据当前的趋势和发展&#xff0c;一些趋势和发展可能对2023年网络安全行业产生影响&#xff1a; 5G技术的广泛应用&#xff1a;5G技术的普及将会使互联网的速度更快&#xff0c;同时也将带来更多的网络威胁和安全挑战。网络安全…

DHCP 给内网客户端分配ip地址

~ 为 InsideCli 客户端网络分配地址&#xff0c;地址池范围&#xff1a; 192.168.0.110-192.168.0.190/24&#xff1b; ~ 域名解析服务器&#xff1a;按照实际需求配置 DNS 服务器地址选项&#xff1b; ~ 网关&#xff1a;按照实际需求配置网关地址选项&#xff1b; ~ 为…

JAVAWeb08-手动实现 Tomcat 底层机制+ 自己设计 Servlet

1. 前言 先看一个小案例&#xff0c; 引出对 Tomcat 底层实现思考 1.1 完成小案例 ● 快速给小伙伴完成这个小案例 0. 我们准备使用 Maven 来创建一个 WEB 项目, 老师先简单给小伙伴介绍一下 Maven 是什么, 更加详细的使用&#xff0c;我们还会细讲, 现在先使用一把 先创建…

【MySQL】带你了解MySQL 如何学习MySQL以及MySQL的用途以及意义

目录 1 MySQL的起源和发展 1.0.1 数据库管理系统 1.1 MySQL的起源 命名由来&#xff1a; 1.2 MySQL的发展历程 2 什么是MySQL&#xff1f; 2.1 数据库 2.1.1 我们之前存储数据的格式&#xff1a; 2.1.2 使用数据库的目的&#xff1a; 2.1.3 数据库分类 2.2 SQL语句 2…

STM32-HAL-串口的printf重定向

一、C语言的格式化输出 C语言的printf是一个标准库函数&#xff0c;用于将格式化的数据输出到标准的输出设备&#xff08;通常是终端&#xff09; 基本语法&#xff1a; int printf(const char *format, ...);其中的第一个参数const char *format表示输出格式&#xff0c;后面…