Tomcat实现ThreadPoolExecutor和JDK线程池区别

news2024/11/23 11:12:05

1.1 tomcat线程池和juc线程池流程

jdk 线程池策略:

  • 当线程池中线程数量小于 corePoolSize,每来一个任务,就会创建一个线程执行这个任务
  • 当前线程池线程数量大于等于 corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列中;若是添加成功,则该任务会等待线程将其取出去执行;若添加失败(一般来说任务缓存队列已满),则会尝试创建新的线程执行
  • 当前线程池线程数量等于 maximumPoolSize,则会采取任务拒绝策略进行处理

tomcat 线程池策略:

  • 当前线程数小于 corePoolSize,则去创建工作线程
  • 当前线程数大于 corePoolSize,但小于 maximumPoolSize,则去创建工作线程
  • 当前线程数大于 maximumPoolSize,则将任务放入到阻塞队列中,当阻塞队列满了之后,则调用拒绝策略丢弃任务

1.2 tomcat线程池和juc线程池的区别

tomcat 线程池是在 juc线程池的基础上,修改少量代码来实现,tomcat 线程池执行流程和 juc的线程池执行流程有着很大的区别,那么tomcat为什么要这样设计?

使用线程池的任务有两种:

  • IO 密集型任务(如调用接口、查询数据库)

  • CPU 密集型任务

  • JDK 的线程池 ThreadPoolExecutor 主要目的解决的便是CPU密集型任务的并发处理

  • Tomcat 若使用原生的JDK线程池,一旦接收的请求数量大于线程池的核心线程数,这些请求就会被放到队列中,等待核心线程处理,这样会降低请求的总体处理速度,所以 Tomcat并没有使用JDK原生线程池的策略 (例如:10个核心线程,20个最大线程,15个请求过来的时候,有5个请求放入队列进行等待处理,这也是为什么 tomcat 要使 work 线程优先 queue的原因)

  • JDK线程池:当线程数达到 corePoolSize 后,任务首先被放到 queue,发挥CPU多核的并行优势,减少多个线程导致的上下文切换,适合的场景是:CPU密集型任务

  • Tomcat线程池:当大量请求达到时,接收的请求数量大于核心线程池的 corePoolSize 时,会继续创建 worker 线程去处理请求,而后续请求量变少时,只会销毁 maximumPoolSize 线程数,适合的场景是:IO密集型


1.3 IO密集型和CPU密集型任务核心参数的设置

  • 场景:JDK线程池执行IO密集型的任务,解决方案:可以提高corePoolSize的大小,引入问题:系统中线程数过多

  • 线程数过多问题的解决方案:指定 ThreadPoolExecutor 的 allowCoreThreadTimeout=true,那么核心线程若处于闲置状态的话,超过一定的时间(KeepAliveTime),就会销毁掉

  • 引入问题:当核心线程数被销毁时,而有大量请求到达时系统重新创建 worker 线程,会使得前期接口响应时间变长(这也是为什么系统上线需要预热)

  • 所以JDK线程池无法完美的去处理IO密集型的任务,这也就是为什么Tomcat需要重写JDK线程池的原因


Tomcat线程池执行流程

  • Tomcat 重写 JDK 线程池(即改动的是少量的源码)实现的功能的增强

  • 思想:主要流程还是JDK线程池流程,即先开启 corePoolSize 线程,然后在 queue,最后在开启 maximumPoolSize 线程,Tomcat 重点改造的是 queue 的 offer(),即在向 queue 放入任务时,若发现未达到最大线程数,那么 offer() 返回 false,即放入队列失败,此时,便继续开启 maximumPoolSize 线程

自定义队列

Tomcat 主要是通过实现自定义队列来完成逻辑的改造。

/**
 * 实现Tomcat特有逻辑的自定义队列
 */
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
    private static final long serialVersionUID = 1L;

    private transient volatile ThreadPoolExecutor parent = null;

    private static final int DEFAULT_FORCED_REMAINING_CAPACITY = -1;

    /**
     * 强制遗留的容量
     */
    private int forcedRemainingCapacity = -1;

    /**
     * 队列的构建方法
     */
    public TaskQueue() {
    }

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public TaskQueue(Collection<? extends Runnable> c) {
        super(c);
    }

    /**
     * 设置核心变量
     */
    public void setParent(ThreadPoolExecutor parent) {
        this.parent = parent;
    }

    /**
     * put:向阻塞队列填充元素,当阻塞队列满了之后,put时会被阻塞。
     * offer:向阻塞队列填充元素,当阻塞队列满了之后,offer会返回false。
     *
     * @param o 当任务被拒绝后,继续强制的放入到线程池中
     * @return 向阻塞队列塞任务,当阻塞队列满了之后,offer会返回false。
     */
    public boolean force(Runnable o) {
        if (parent == null || parent.isShutdown()) {
            throw new RejectedExecutionException("taskQueue.notRunning");
        }
        return super.offer(o);
    }

    /**
     * 带有阻塞时间的塞任务
     */
    @Deprecated
    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (parent == null || parent.isShutdown()) {
            throw new RejectedExecutionException("taskQueue.notRunning");
        }
        return super.offer(o, timeout, unit); //forces the item onto the queue, to be used if the task is rejected
    }

    /**
     * 当线程真正不够用时,优先是开启线程(直至最大线程),其次才是向队列填充任务。
     *
     * @param runnable 任务
     * @return false 表示向队列中添加任务失败,
     */
    @Override
    public boolean offer(Runnable runnable) {
        if (parent == null) {
            return super.offer(runnable);
        }
        //若是达到最大线程数,进队列。
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) {
            return super.offer(runnable);
        }
        //当前活跃线程为10个,但是只有8个任务在执行,于是,直接进队列。
        if (parent.getSubmittedCount() < (parent.getPoolSize())) {
            return super.offer(runnable);
        }
        //当前线程数小于最大线程数,那么直接返回false,去创建最大线程
        if (parent.getPoolSize() < parent.getMaximumPoolSize()) {
            return false;
        }
        //否则的话,将任务放入到队列中
        return super.offer(runnable);
    }

    /**
     * 获取任务
     */
    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        Runnable runnable = super.poll(timeout, unit);
        //取任务超时,会停止当前线程,来避免内存泄露
        if (runnable == null && parent != null) {
            parent.stopCurrentThreadIfNeeded();
        }
        return runnable;
    }

    /**
     * 阻塞式的获取任务,可能返回null。
     */
    @Override
    public Runnable take() throws InterruptedException {
        //当前线程应当被终止的情况下:
        if (parent != null && parent.currentThreadShouldBeStopped()) {
            long keepAliveTime = parent.getKeepAliveTime(TimeUnit.MILLISECONDS);
            return poll(keepAliveTime, TimeUnit.MILLISECONDS);
        }
        return super.take();
    }

    /**
     * 返回队列的剩余容量
     */
    @Override
    public int remainingCapacity() {
        if (forcedRemainingCapacity > DEFAULT_FORCED_REMAINING_CAPACITY) {
            return forcedRemainingCapacity;
        }
        return super.remainingCapacity();
    }


    /**
     * 强制设置剩余容量
     */
    public void setForcedRemainingCapacity(int forcedRemainingCapacity) {
        this.forcedRemainingCapacity = forcedRemainingCapacity;
    }

    /**
     * 重置剩余容量
     */
    void resetForcedRemainingCapacity() {
        this.forcedRemainingCapacity = DEFAULT_FORCED_REMAINING_CAPACITY;
    }
}

2.2 自定义线程池ThreadPoolExecutor

Tomcat 线程池 ThreadPoolExecutor 是继承的 AbstractExecutorService 类,但是很多代码依旧使用的是 JDK 的 ThreadPoolExecutor,只是稍微改造了一部分

    public void execute(Runnable command, long timeout, TimeUnit unit) {

        /**
         * 提交任务的数量+1
         */
        submittedCount.incrementAndGet();
        try {
            /**
             * 线程池内部方法,真正执行的方法。就是JDK线程池原生的方法。
             *
             * 因为重写了阻塞队列,才完成Tomcat特有逻辑的实现。
             *
             * 1. 重写队列方法;达到核心线程数,然后向阻塞队列中放置,阻塞队列直接返回false
             * 2. 返回false,则开启maximum pool size;
             * 3. maximum pool size到达极限时,会抛出RejectedExecutionException方法。
             *
             */
            executeInternal(command);
            /**
             * 任务被拒绝
             */
        } catch (RejectedExecutionException rx) {
            /**
             * 在将被拒绝的任务放入到队列中。
             */
            if (getQueue() instanceof TaskQueue) {

                //如果Executor接近最大线程数,应该将任务添加到队列中,而不是拒绝。
                final TaskQueue queue = (TaskQueue) getQueue();
                try {
                    //强制的将任务放入到阻塞队列中
                    if (!queue.force(command, timeout, unit)) {
                        //放入失败,则继续抛出异常
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("threadPoolExecutor.queueFull");
                    }
                } catch (InterruptedException x) {
                    //被中断也抛出异常
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                //不是这种队列,那么当任务满了之后,直接抛出去。
                submittedCount.decrementAndGet();
                throw rx;
            }
        }
    }
    
    /**
     * JDK线程池的任务执行的逻辑
     */
    private void executeInternal(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        int c = ctl.get();
        //未达到corePoolSize数量,则去开启线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) {
                return;
            }
            c = ctl.get();
        }
        //开启到corePoolSize数量的工作线程,则将任务放入队列。
        //但是Tomcat重写了阻塞队列。
        //当放入workQueue.offer(command)返回false,则继续开启线程数
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command)) {
                reject(command);
            } else if (workerCountOf(recheck) == 0) {
                addWorker(null, false);
            }
        } else if (!addWorker(command, false)) {
            //开启最大线程失败,则任务被拒绝。
            reject(command);
        }
    }

注意事项

在这里插入图片描述


tomcat 的优化

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) {
            decrementWorkerCount();
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                //获取剩余的最小线程数,若配置了允许最小线程数关闭的参数,min=0
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                //当配置了允许最小线程关闭参数,且队列不为空的情况下,允许保留的最小线程数为1
                /**
                 * 这是关闭空闲核心线程数时的判断。
                 */
                if (min == 0 && !workQueue.isEmpty()) {
                    min = 1;
                }
                // https://bz.apache.org/bugzilla/show_bug.cgi?id=65454
                // If the work queue is not empty, it is likely that a task was
                // added to the work queue between this thread timing out and
                // the worker count being decremented a few lines above this
                // comment. In this case, create a replacement worker so that
                // the task isn't held in the queue waiting for one of the other
                // workers to finish.
                /**
                 * 若当前工作数量>允许的最小线程数,那么关闭改线程。
                 * Tomcat对此处进行了优化。
                 */
                if (workerCountOf(c) >= min && workQueue.isEmpty()) {
                    return; // replacement not needed
                }
            }
            /**
             * 没有终止线程,又重新开启线程
             */
            addWorker(null, false);
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/630990.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入理解深度学习——注意力机制(Attention Mechanism):位置编码(Positional Encoding)

分类目录&#xff1a;《深入理解深度学习》总目录 相关文章&#xff1a; 注意力机制&#xff08;AttentionMechanism&#xff09;&#xff1a;基础知识 注意力机制&#xff08;AttentionMechanism&#xff09;&#xff1a;注意力汇聚与Nadaraya-Watson核回归 注意力机制&#…

【CSS---定位基础篇】

CSS---定位基础篇 CSS-----基础定位:一 、 学习定位原因&#xff1a;&#xff08;定位的作用&#xff09;二 、定位组成&#xff1a;2.1 四种定位模式&#xff1a;&#xff08;1&#xff09;静态定位&#xff08;了解&#xff09;&#xff1a;&#xff08;2&#xff09;相对定位…

8.1 正弦波振荡电路(1)

正弦波振荡电路是在没有外加输入信号的情况下&#xff0c;依靠电路自激振荡而产生正弦波输出电压的电路。它广泛地应用于测量、遥控、通讯、自动控制、热处理和超声波电焊等加工设备之中&#xff0c;也作为模拟电子电路的测试信号。 一、概述、 1、产生正弦波振荡的条件 在负…

“不擅长开发,所以选择测试”,软件测试面试时这么说就凉了...

见过不少软件测试岗位&#xff0c;在面试时&#xff0c;问到选择软件测试作为职业的原因时 有不少测试工程师会回答&#xff1a;因为不擅长或不喜欢开发的工作。 这个想法&#xff0c;这个回答&#xff0c;就已经在面试官眼里打低分了。 现在网上也有不少类似观点 “不喜欢开…

TiDB亿级数据亚秒响应查询将MySql数据同步到TiDB

目录 1 MySQL环境准备1.1 MySQL测试数据库1.1.1 导入数据1.1.2 验证是否成功 2 DM数据迁移2.1 TiUP DM 组件简介2.1.1 基本功能2.1.2 使用限制 2.2 TiUP安装 DM组件2.2.1 安装TiUP DM 组件2.2.2 更新 TiUP DM 组件 2.3 TiUP部署 DM组件2.3.1 编辑初始化配置2.3.2 部署命令格式2…

PID算法:增量式PID 位置式PID

前面的文章已经介绍过什么是pid了&#xff0c;现在再回顾一下&#xff1a; PID&#xff1a;是过程控制中常用的一种针对某个对象或者参数进行自动控制的一种算法。 这一篇分享不打算再深究pid的理论知识&#xff0c;如果有不懂或者对pid感兴趣的朋友&#xff0c;可以自行查阅资…

【docker桌面版】windows使用docker搭建nginx

1.拉取nginx镜像 docker pull nginx 2.运行容器 docker run -d -p 80:8081 --name nginx nginx 3.本地磁盘创建nginx目录 D:\Docker\project\nginx 4.复制docker中的nginx配置文件 查看运行的容器docker ps -a docker cp 8f18d58bc77b:/etc/nginx/nginx.conf D:\Docker…

【Leetcode】DP | 打家劫舍,当一个机灵的小偷

198 打家劫舍 令 D [ i ] D[i] D[i]表示前 i i i间房子的最大收益&#xff1a; D [ i ] max ⁡ ( D [ i − 1 ] , D [ i − 2 ] n u m s [ i ] ) D [ 0 ] n u m s [ 0 ] D [ 1 ] max ⁡ ( n u m s [ 0 ] , n u m s [ 1 ] ) D[i] \max(D[i -1], D[i-2]nums[i]) \\ D[0] …

软件测试工程师如何从功能测试转成自动化测试?

功能测试转成自动化测试&#xff0c;答案就三个字&#xff1a;“靠学习”。 学习自动化的方法无非是三种&#xff1a; 一、靠培训&#xff08;下方有如何选择培训机构&#xff09; 在相对有氛围的学习环境中来学习自动化测试&#xff0c;这是一个较快学习的方法。二、靠自学自…

js文件引入vue $notify组件,自定义添加按钮功能!

import Vue from vue; //新创建一个vue实例 let v new Vue(); const h v.$createElement console.log(h) v.$notify({ dangerouslyUseHTMLString: true, message: h( div, {}, [ h(p, {}, [ h(p, {style:"font-size: 15px;"}, 车架号&#xff1a;${res.rows[0].…

C# Winform 多个程序之间的通信(非Scoket)

效果 功能&#xff1a;打开窗体自动连接主程序&#xff0c;并自动添加到列表&#xff0c;可以向子程序群发消息 可以向单个程序单独发送消息 在退出程序后&#xff0c;添加的程序列表会自动移除 一、概述 参考&#xff1a;C# Winfrom程序之间通讯_c# sendmessege copydatastr…

x宝评论抓取

#某宝评论接口sign参数逆向 1.接口速览 多次请求发现&#xff0c;t为时间戳&#xff0c;sign为加密参数&#xff0c;盲猜和data、t有关&#xff0c;sign为32位&#xff0c;盲猜是字符串的32位的MD5 2.搜索js代码 这里为搜索的是appKey&#xff0c;就找到了sign&#xff0c;然…

如何实现APP自动化测试?

APP测试&#xff0c;尤其是APP的自动化测试&#xff0c;在软件测试工程师的面试中越来越会被问到了。为了更好的回答这个问题&#xff0c;我今天就给大家分享一下&#xff0c;如何进行APP的自动化测试。 一、为了实现JavaAppiumJunit技术用于APP自动化测试&#xff0c;所以需要…

使用AIGC工具提升论文阅读效率

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

接口测试实战工具如何选择?这6个工具首选(建议收藏)

常见接口类型 • HTTP/HTTPS 类型接口 基于HTTP协议开发的接口现在应用是最为广泛的&#xff0c;这类API使用起来简单明了&#xff0c;因为它是轻量级的、跨平台、跨语言的&#xff0c; 但凡是第三方提供的API都会有HTTP版本的接口。 RESTful API也是基于HTTP协议的&#xff0c…

Android滴滴路由框架DRouter原理解析

作者&#xff1a;linversion 前言 最近的一个新项目使用了Clean Architecture模块化MVVM架构&#xff0c;将首页每个tab对应的功能都放到单独的模块且不相互依赖&#xff0c;这时就有了模块间页面跳转的问题&#xff0c;经过一番研究选择了滴滴的DRouter&#xff0c;因为其出色…

Image captioning中自定义文本数据整理为类似Flickr8k.token.txt的格式--->助力后期生成JSON格式用于训练

手把手实现Image captioning,将自定义文本数据整理为类似Flickr8k.token.txt的格式,助力后期生成JSON格式用于训练。如果感觉有用,不妨给博主来个一键三连,白天科研,晚上肝文,实属不易~ ~ ](https://imgse.com/i/p9FmMDK) 这里写目录标题 1. 任务需求2. 程序实现2.1 读取…

vistual studio 2017中导入pthread.h的配置方法

1.下载pthread.h的相关库文件 下载路径 https://www.mirrorservice.org/sites/sourceware.org/pub/pthreads-win32/pthreads-w32-2-9-1-release.zip 加压后得到两种系统版本的三个文件夹 pthreads.2 : 包含了pthread的源文件 Pre-built2 :包含了pthreads for win32的头文件…

软件测试04:软件测试流程和软件测试过程

软件测试04&#xff1a;软件测试流程和软件测试过程 软件测试流程 软件测试流程&#xff1a;获取测试需求->编写测试计划->制造测试方案->开发与设计测试用例->执行测试->提交缺陷报告->测试分析与评审->提交测试总结->准备下一版本测试 软件测试过…

网页JS自动化脚本(九)创建一键导出数据库到桌面的功能按钮

我们获取到了数据库,当然我们希望能把这个数据库给保存到本地电脑上进行一些数据的处理,我们这一节就添加一个按钮把数据一次性导出 保存到桌面为json格式 我们直接用TXT打开它如下图所示 然后再使用json转EXCEL的小工具规整之后如下图 好了我们下面上代码 // UserScript // n…