Java线程池使用与原理解析2(自定义线程池、合适的线程数量、线程池阻塞队列、线程拒绝策略)

news2025/1/9 14:37:47

在上篇我们学习了线程池各个参数的含义,线程池任务处理流程,使用线程池的好处等内容,本篇我们学习如何创建一个适合我们业务的线程池。为此,我们有必要先学习一下如何大概确定我们线程池核心线程数、怎么设置阻塞队列的类型与大小、当线程池没有能力处理任务了该如何使用拒绝策略等内容。

合适的线程数量

对于线程池来说,不同的任务类型可能采取不同的线程数量会取得更好的效果。这是因为有的线程任务对CPU消耗大,任务时间却短,有的任务需要访问网络,需要较长时间才能完成任务。如果只是笼统的定义一个固定大小的线程池,往往会出现CPU不繁忙,但线程池的线程已满,不能再接受任务了,此时却有可能有一些不耗时但耗费CPU的任务没机会执行,从而导致CPU资源浪费等情况。因此,我们需要根据业务特点来定义线程池。

对于CPU 密集型任务,比如加密、解密、压缩、计算等一系列需要大量耗费 CPU 资源的任务。对于这样的任务最佳的线程数为 CPU 核心数的 1~2 倍,如果设置过多的线程数,实际上并不会起到很好的效果。假设设置过多线程,首先会耗费更多资源,大量任务占用线程,但CPU资源有限,这就导致了很多线程上下文切换的成本,此时性能可能不升反降。

对耗时IO任务,比如数据库、文件的读写,网络通信等任务,这种任务的特点是并不会特别消耗 CPU 资源,但是 IO 操作很耗时,总体会占用比较多的时间。这种情况应该设置更多的线程来完成任务,当在等待IO的线程挂起后,此时这些线程并不占用CPU资源,但占用了线程资源,如果没有更多的线程,后续的任务没机会执行,CPU资源也白白浪费。此时如果有更多线程,则这些线程可以去执行其它任务,把CPU资源给利用起来,提高系统的总体性能。

《Java并发编程实战》的作者 Brain Goetz 推荐的计算方法:

线程数 = CPU 核心数 *(1+平均等待时间/平均工作时间)

可见,线程数和平均等待时间成正比,和平均工作时间成反比。总体按这个思路来设置线程池线程大小是可行的,实际还可以根据特定的服务器资源进行压测,根据压测结果来调整以达到更高的性能。

线程池阻塞队列

我们知道使用Executors工具类可以方便的创建线程池(不推荐),其内部实际就是使用ThreadPoolExecutor进行创建,只是提供了一些默认参数,不需要开发者去关注而已。这里不展开Executors给我们提供的那几种线程池,这里关注一下这些线程池里面的阻塞队列类型,为我们自定义线程池作参考。

FixedThreadPool
LinkedBlockingQueue
SingleThreadExecutor
LinkedBlockingQueue
CachedThreadPool
SynchronousQueue
SingleThreadScheduledExecutor
DelayedWorkQueue
ScheduledThreadPoolExecutor
DelayedWorkQueue

首先是FixedThreadPool和SingleThreadExecutor,它们用的阻塞队列都是LinkedBlockingQueue,容量是Integer.MAX_VALUE。因为它们的线程数都是固定的,不能创建非核心线程,因此,队列几乎无限大,核心线程都忙的情况只能把任务放队列,这也是固定线程数的关键。

然后是CachedThreadPool,看名字是一个可缓存的线程池。它的阻塞队列使用的是SynchronousQueue,这个阻塞队列本身不存储数据,它只起到任务中转的作用,当有任务被线程put进来后,必须需要另一线程take,前面的线程才会返回。CachedThreadPool利用这个队列特性来无限创建线程。我们看一下CachedThreadPool创建时传入的参数就可窥一二。

 /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

注意到核心线程数传了0,即没有核心线程数,最大线程数为Integer.MAX_VALUE,通常系统都不会有这么多线程,可以认为是可以无限创建线程了。线程空闲60秒后会被回收,因此这是可缓存线程,缓存时间60秒。队列则使用SynchronousQueue作中转,因为核心线程数为0,因此根据线程池内部的运转流程,任务一来,如果当前没可用线程,必定直接放队列,然后队列的任务被线程池take出来创建新线程以执行任务。

最后是SingleThreadScheduledExecutor和ScheduledThreadPoolExecutor,这两个线程池可以定时执行任务,其阻塞队列用了DelayedWorkQueue,看这名称我们即隐约知道为啥它能定时执行任务了。DelayedWorkQueue 的特点是内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构。延时队列执行内部的原理大致是按时间排序好任务,通过锁的await方法和signal方法来控制延时执行,有兴趣可以翻阅ScheduledThreadPoolExecutor源码

private final ReentrantLock lock = new ReentrantLock();

 /**
         * Condition signalled when a newer task becomes available at the
         * head of the queue or a new thread may need to become leader.
         */
private final Condition available = lock.newCondition();

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

线程拒绝策略

这个相对简单,正所谓水满则溢。线程池也有无能为力的情况,当队列、最大线程数都满后,再有任务过来,又无空闲线程可执行任务,此时线程池必须拒绝任务,拒绝任务的做法可以有很多种,直接丟了任务不处理(通常不会用),丢弃最旧的任务,由提交任务的线程自行执行等。下面我们看看JUC包默认的拒绝策略都有哪些。

我们看ThreadPoolExecutor的构造函数可知,需要执行拒绝策略,实现RejectedExecutionHandler接口即可。 

可以看出,RejectedExecutionHandler默认有四个实现

  • DiscardOldestPolicy
  • AbortPolicy
  • CallerRunsPolicy
  • DiscardPolicy

DiscardOldestPolicy策略会将队列中最旧的一个任务丢弃,即直接将队头的任务移除即可

    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

注意Java阻塞队列的实现都是入队添加到队尾,从队头出队,所以上面的代码就是直接poll队头任务。

AbortPolicy实际就是直接拒绝执行任务,丢了个异常出来就不管了。

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

CallerRunsPolicy这个策略在线程池没能力执行任务时,要求提交任务的线程自行执行任务,这个策略也是相对安全的一种策略,建议使用。

    /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

DiscardPolicy策略将直接把任务丢弃,连异常都不抛,这种策略就比较粗暴,不建议使用。

    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

事实上上面四种策略,只有CallerRunsPolicy是推荐使用的,其它三种都有可能造成业务异常,数据丢失等问题,不建议使用。不管是丢弃任务还是抛异常,或是直接不管,都不是我们希望看到的。我们也可以自行实现这个接口,根据实际情况处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/392415.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Malware Dev 04 - 隐匿之 ETW(Event Tracing for Windows)Bypass

写在最前 如果你是信息安全爱好者&#xff0c;如果你想考一些证书来提升自己的能力&#xff0c;那么欢迎大家来我的 Discord 频道 Northern Bay。邀请链接在这里&#xff1a; https://discord.gg/9XvvuFq9Wb我拥有 OSCP&#xff0c;OSEP&#xff0c;OSWE&#xff0c;OSED&…

使用leangoo领歌单团队敏捷开发项目管理

概述单团队敏捷开发主要是针对10人以下、只有一个Scrum团队的小型产品或项目的敏捷开发。对于小型团队来说&#xff0c;在Leangoo中创建一个单团队敏捷开发项目就可以很好地支持团队产品或项目的开发。适用场景适用于单个团队进行Scrum敏捷开发协作&#xff0c;Leangoo项目内也…

Linux - 磁盘存储管理 磁盘引入

# 我们要介绍下 磁盘管理&#xff0c; 那不妨先来看一张图来简单 引入 &#xff1a;这张图呢&#xff0c;是我们 Windows 上的磁盘管理的显示 。根据这幅图呢&#xff0c;提出一个问题 &#xff1a;>>> 这幅图磁盘管理所显示的内容&#xff0c;你能判断出 该电脑 有几…

【FMCW 04】测角-Angle FFT

在之前的文章中&#xff0c;我们已经详尽讨论过FMCW雷达测距和测速的原理&#xff0c;现在来讲最后一块内容&#xff0c;测角。测角对于硬件设备具有要求&#xff0c;即要求雷达具有多发多收结构&#xff0c;从而形成多个空间信道&#xff08;channel&#xff09;&#xff0c;我…

css选择器详解

简单选择器&#xff08;根据名称、id、类来选取元素&#xff09;组合器选择器&#xff08;根据它们之间的特定关系来选取元素&#xff09;伪类选择器&#xff08;根据特定状态选取元素&#xff09;伪元素选择器&#xff08;选取元素的一部分并设置其样式&#xff09;属性选择器…

第六讲:ambari-web 模块二次开发

上述图片为 Ambari 部署及操作 hdp 集群相关的部分界面截图。这些页面如果想调整的话,比如汉化,二次开发等,则可以修改 ambari-web 模块的源码来实现。 一、介绍 ambari-web 模块涉及到的界面有: HDP 集群部署向导已安装服务的仪表板、配置界面等主机列表及详细信息告警列…

【Opencv项目实战】图像的像素值反转

文章目录一、项目思路二、算法详解2.1、获取图像信息2.2、新建模板2.3、图像通道顺序三、项目实战&#xff1a;彩图的像素值反转&#xff08;方法一&#xff09;四、项目实战&#xff1a;彩图的像素值反转&#xff08;方法二&#xff09;五、项目实战&#xff1a;彩图转换为灰图…

Java中class文件的格式

常见的class文件格式如下图所示&#xff0c;下面我将对一下格式一一作出解释。 一、magic 该部分主要是对语言类型的规范&#xff0c;只有magic这个部分是CAFEBABE时才能被检测为Java语言&#xff0c;否则则不是。 二、minor version和major version minor version主要表示了…

【微信小程序-原生开发】实用教程16 - 查看详情(含页面跳转的传参方法--简单传参 vs 复杂传参)

需在实现列表的基础上开发 【微信小程序-原生开发】实用教程15 - 列表的排序、搜索&#xff08;含云数据库常用查询条件的使用方法&#xff0c;t-search 组件的使用&#xff09;_朝阳39的博客-CSDN博客 https://sunshinehu.blog.csdn.net/article/details/129356909 效果预览 …

【计算机网络】数据链路层可靠传输机制的三大协议:停止等待协议SW、后退N帧协议GBN、选择重传协议SR

一、可靠传输实现机制 1.停止等待协议SW case1、确认与否认 在发送端发送数据出现误码时&#xff0c;接收端回复一个NAK否认码&#xff0c;并要求发送端再发送一次。 case2、超时重传 接收端接收不到数据分组时&#xff0c;发送端就会一直处于等待接受端回复ACK或NAK的状态…

32 文件操作

目录 一、文件的概念 二、文件的分类&#xff08;分类依据&#xff1a;能否使用文本编辑器打开文件&#xff09; 1、文本文件 2、二进制文件 三、文件操作的步骤 1、打开文件&#xff1a;open()函数 2、打开文件的另一种写法&#xff08;推荐&#xff09;&#xff1a;with open…

21- PyTorch通过CNN实现手写数字识别 (PyTorch系列) (项目二十一)

项目要点 torch 版本: torch.__version__ # 1.13.1cpu 设置GPU: device torch.device(cuda:0 if torch.cuda.is_available() else cpu) train_ds datasets.MNIST(./, train True, transformtransformation, download True) # 数据导入 transformation transforms.…

Spring高频面试问题汇总

1 什么是spring?Spring是一个轻量级Java开发框架&#xff0c;最早有Rod Johnson创建&#xff0c;目的是为了解决企业级应用开发的业务逻辑层和其他各层的耦合问题。它是一个分层的JavaSE/JavaEE full-stack&#xff08;一站式&#xff09;轻量级开源框架&#xff0c;为开发Jav…

Day909.MySQL 不同的自增 id 达到上限以后的行为 -MySQL实战

MySQL 不同的自增 id 达到上限以后的行为 Hi&#xff0c;我是阿昌&#xff0c;今天学习记录的是关于MySQL 不同的自增 id 达到上限以后的行为的内容。 MySQL 里有很多自增的 id&#xff0c;每个自增 id 都是定义了初始值&#xff0c;然后不停地往上加步长。 虽然自然数是没有…

TSP 问题求解的最好方法 LKH

目前可以查到的最好的方法求解TSP问题是 LKH&#xff0c;所以本篇文章介绍如何使用Matlab 调用LKH 参考文档&#xff1a;用matlab调用迄今为止最强悍的求解旅行商&#xff08;TSP&#xff09;的算法-LKH算法_wx6333e948c3602的技术博客_51CTO博客 【LKH算法体验】用matlab调用…

游戏玩的多,陪玩你了解的多吗?用Python来采集陪玩数据,看看行情和美照

前言 (&#xff61;&#xff65;∀&#xff65;)&#xff89;&#xff9e;嗨 大家好 现在应该每个人都玩过游戏吧&#xff0c;有些的上瘾&#xff0c;天天玩停不下来&#xff0c;有些的倒是没啥感觉 有游戏就肯定有陪玩啊&#xff0c;毕竟当朋友忙的时候&#xff0c;自己一个…

天琊超级进程监视器的应用试验(19)

实验目的 1、了解进程概念及其基本原理&#xff1b; 2、掌握天琊超级进程监视器的安装与使用。预备知识 本实验要求实验者具备如下的相关知识。 操作系统的安全配置是整个系统安全审计策略核心&#xff0c;其目的就是从系统根源构筑安全防护体系&#xff0c;通过用户的一…

Linux系统安装Tomcat

&#xff08;1&#xff09;登录Tomcat网址 https://tomcat.apache.org/&#xff0c;下载Tomcat安装包&#xff08;2&#xff09;登录我们的虚拟机&#xff0c;使用Linux系统中的“rz”命令上传压缩包&#xff08;注意&#xff1a;如果使用“rz”上传压缩包出现错误时&#xff0…

STM32模拟SPI时序控制双路16位数模转换(16bit DAC)芯片DAC8552电压输出

STM32模拟SPI时序控制双路16位数模转换&#xff08;16bit DAC&#xff09;芯片DAC8552电压输出 STM32部分芯片具有12位DAC输出能力&#xff0c;要实现16位及以上DAC输出需要外挂DAC转换ASIC。 DAC8552是双路16位DAC输出芯片&#xff0c;通过SPI三线总线进行配置控制输出。这里…

苹果新卫星专利公布,苹果Find My功能知多少

根据美国商标和专利局&#xff08;USPTO&#xff09;公示的清单&#xff0c;苹果公司获得了一项新的卫星专利&#xff0c;可在非地面网络&#xff08;Non-Terrestrial Networks&#xff0c;NTN&#xff09;中定位用户设备&#xff08;iDevice、MacBook 等&#xff09;。 在专利…