服务容错之限流之 Tomcat 限流 Tomcat 线程池的拒绝策略

news2025/1/19 8:11:03

在文章开头,先和大家抛出两个问题:

  1. 每次提到服务限流为什么都不考虑基于 Tomcat 来做呢?
  2. 大家有遇到过 Tomcat 线程池触发了拒绝策略吗?

JUC 线程池

在谈 Tomcat 的线程池前,先看一下 JUC 中线程池的执行流程,这里使用《Java 并发编程的艺术》中的一张图:
在这里插入图片描述

即执行流程为:

  1. 收到提交任务
  2. 当前线程数小于核心线程数,创建一个新的线程来执行任务
  3. 当前线程数大于等于核心线程数,
    • 如果阻塞队列未满,将任务存储到队列
    • 如果阻塞队列已满
      • 如果当前线程数小于最大线程数,则创建一个线程来执行新提交的任务
      • 如果当前线程数大于等于最大线程数,执行拒绝策略

可以看到设计思想是任务可以等待执行,但要尽量少的创造过多线程。如果队列很大,则很难扩大到最大线程数,同时会有大量的任务等待。

Tomcat 线程池分析

Tomcat 线程池是在 LifeCycle 中创建的。跳过前面繁琐的流程,直接看 org.apache.tomcat.util.net.NioEndpoint#startInternal

    /**
     * Start the NIO endpoint, creating acceptor, poller threads.
     */
    @Override
    public void startInternal() throws Exception {

        if (!running) {
            running = true;
            paused = false;

            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getProcessorCache());
            eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getEventCache());
            nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getBufferPool());

            // Create worker collection
            if ( getExecutor() == null ) {
                createExecutor();
            }

            initializeConnectionLatch();

            // Start poller threads
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }

            startAcceptorThreads();
        }
    }

再看 org.apache.tomcat.util.net.AbstractEndpoint#createExecutor

    public void createExecutor() {
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue(); //无界队列
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }

要注意这里的 ThreadPoolExecutor 不是 JUC 里面的 java.util.concurrent.ThreadPoolExecutor,而是 Tomcat 的 org.apache.tomcat.util.threads.ThreadPoolExecutor,它继承了 JUC 的 java.util.concurrent.ThreadPoolExecutor

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
  ...
}

查看它的构造方法:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
        //提前启动核心线程
        prestartAllCoreThreads();
    }

可以发现它在构造的时候就会启动核心线程,而 java.util.concurrent.ThreadPoolExecutor 则是需要手动启动。而阻塞队列使用是 org.apache.tomcat.util.threads.TaskQueue

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = 1L;

    private volatile ThreadPoolExecutor parent = null;

    // No need to be volatile. This is written and read in a single thread
    // (when stopping a context and firing the  listeners)
    private Integer forcedRemainingCapacity = null;

    public TaskQueue() {
        super();
    }
  ...
}

而在创建 org.apache.tomcat.util.threads.TaskQueue 的时候,并没有传递 capacity,也就是说 Tomcat 的线程池使用的是无界队列。

接下来看一下最核心的org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable)

/**
     * {@inheritDoc}
     */
    @Override
    public void execute(Runnable command) {
        //重载 java.util.concurrent.ThreadPoolExecutor#execute
        execute(command,0,TimeUnit.MILLISECONDS);
    }

    public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.");
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }

        }
    }

本质上还是执行的 java.util.concurrent.ThreadPoolExecutor#execute 方法:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // Tomcat 中这块逻辑不会执行,因为构造时已经初始化了核心线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

    //强制入队
    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
    }

这里的 workQueueorg.apache.tomcat.util.threads.TaskQueueorg.apache.tomcat.util.threads.TaskQueue#offer

    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        //当前线程数达到最大,任务入队
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        //如果已提交未执行完的任务数小于当前线程数(来了任务先+1,再入队,执行完才-1,说明还有空闲的worker线程),任务入队
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        // 如果当前线程数小于最大线程数量,则直接返回false,java.util.concurrent.ThreadPoolExecutor#execute 会创建新的线程来执行任务
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        //任务入队(当前线程数大于最大线程数)
        return super.offer(o);
    }

再看下拒绝策略,结合 java.util.concurrent.ThreadPoolExecutor#execute 方法,需要 java.util.concurrent.ThreadPoolExecutor#addWorker 返回 false 才会触发,即达到了最大线程数才会触发,而 org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable) 在触发了拒绝策略后还有一个特殊处理:

					//如果是 TaskQueue
					if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    //强制入队
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.");
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else { //非 TaskQueue 直接触发拒绝策略
                submittedCount.decrementAndGet();
                throw rx;
            }

再看 org.apache.tomcat.util.threads.TaskQueue#force(java.lang.Runnable, long, java.util.concurrent.TimeUnit)

    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
    }

说白了就是直接入队(无界队列):

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) { //capacity是Integer最大值
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

这么看,Tomcat 的线程池基本上不会触发拒绝策略。可以写个例子试一下:

package blog.dongguabai.others.tomcat_threadpool;

import org.apache.tomcat.util.threads.TaskQueue;
import org.apache.tomcat.util.threads.TaskThreadFactory;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;

import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * @author dongguabai
 * @date 2023-11-18 22:04
 */
public class Demo {

    public static void main(String[] args) {
        //无界队列
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory("dongguabai_blog" + "-exec-", false, 2);
        final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, taskqueue, tf);
        taskqueue.setParent(executor);
        observe(executor);
        while (true) {
            executor.execute(new Runnable() {
                public void run() {
                    excuteForever();
                }
            });
        }

    }

    private static void observe(final ThreadPoolExecutor executor) {
        Runnable task = new Runnable() {
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(new Date().toLocaleString() + "->" + executor.getQueue().size());
                }
            }
        };
        new Thread(task).start();
    }

    public static void excuteForever() {
        while (true) {
        }
    }
}

输出:

2023-11-18 22:18:27->6541506
2023-11-18 22:18:34->14395417
2023-11-18 22:18:37->25708908
2023-11-18 22:18:50->32014458
2023-11-18 22:19:07->47236736
2023-11-18 22:19:10->65616058
2023-11-18 22:19:32->66856933
...

可以看到,队列里的任务都有六千多万了,还没有触发拒绝策略,线程池还是可以继续接收任务。

当然我们也是可以自定义的,只需要重写 org.apache.tomcat.util.net.AbstractEndpoint#getExecutor 即可:

    public Executor getExecutor() { return executor; }

org.apache.tomcat.util.net.NioEndpoint#startInternal 会进行判断:

@Override
    public void startInternal() throws Exception {

        if (!running) {
            ...
            if ( getExecutor() == null ) {
                createExecutor(); //如果没有自定义实现,就会使用默认实现
            }
        }
      ...
    }

Tomcat 默认线程池优先创建线程执行任务,达到了最大线程数,不会直接执行拒绝策略,而是尝试返回等待队列,但由于等待队列的容量是 Integer 最大值,所以几乎不会触发拒绝策略。

最后

最后再回过头看文章开头的两个问题:

  1. 每次提到服务限流为什么都不考虑基于 Tomcat 来做呢?

    Tomcat 的确可以用来做限流,比如可以控制最大线程数,这样后续的任务均会在队列等待,并不会执行。org.apache.tomcat.util.net.AbstractEndpoint#setMaxConnectionsConnector 的角度设置,这块不在本文探讨范围之内。

    虽然基于 Tomcat 的限流是一种可能的方案,但在实际应用中,我们通常会选择其他的层次来实现服务限流:

    • 可扩展性:基于 Tomcat 的限流方案通常只能在单个服务实例上工作,且只能针对HTTP/HTTPS协议的请。而在微服务或者分布式系统中,我们可能需要分布式限流方案和针对多协议的 限流。
    • 灵活性:在应用层或者分布式系统层实现的限流方案通常可以提供更多的配置选项和更精细的控制。例如,请求的资源、来源或者其他属性来进行限流。
  2. 大家有遇到过 Tomcat 线程池触发了拒绝策略吗?

    Tomcat 默认无限队列,难以触发拒绝策略,所以会有内存泄漏的风险。可以基于 org.apache.tomcat.util.net.AbstractEndpoint#getExecutor 自定义线程池进行控制。

References

  • 《Java 并发编程的艺术》

欢迎关注公众号:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1224746.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

十二、Docker的简介

目录 一、介绍 Docker 主要由以下三个部分组成&#xff1a; Docker 有许多优点&#xff0c;包括&#xff1a; 二、Docker和虚拟机的差异 三、镜像和容器 四、Docker Hub 五、Docker架构 六、总结 一、介绍 Docker 是一种开源的应用容器平台&#xff0c;可以在容器内部…

一键云端,AList 整合多网盘,轻松管理文件多元共享

hello&#xff0c;我是小索奇&#xff0c;本篇教大家如何使用AList实现网盘挂载 可能还是有小伙伴不懂&#xff0c;所以简单介绍一下哈 AList 是一款强大的文件管理工具&#xff0c;为用户提供了将多种云存储服务和文件共享协议集成在一个平台上的便利性。它的独特之处在于&am…

MSYS2介绍及工具安装

0 Preface/Foreword 1 MSYS2 官网&#xff1a;MSYS2

基于Adapter用CLIP进行Few-shot Image Classification

文章目录 【ECCV 2022】《Tip-Adapter: Training-free Adaption of CLIP for Few-shot Classification》【NeuIPS 2023】《Meta-Adapter: An Online Few-shot Learner for Vision-Language Model》 【ECCV 2022】《Tip-Adapter: Training-free Adaption of CLIP for Few-shot C…

什么是缓存雪崩、击穿、穿透?

背景 数据一般是存储于数据库中&#xff0c;数据库中的数据都是存在磁盘上的&#xff0c;磁盘读写的速度相较于内存或者CPU中的寄存器来说是非常慢的了。 如果用户的请求都直接访问数据库的话&#xff0c;请求数量一上来&#xff0c;数据库很容易就崩溃了&#xff0c;所以为了…

LeetCode704.二分查找及二分法

每日一题&#xff1a;LeetCode704.二分查找 LeetCode704.二分查找知识点&#xff1a;二分法解题代码 LeetCode704.二分查找 问题描述&#xff1a;给定一个 n 个元素有序的&#xff08;升序&#xff09;整型数组 nums 和一个目标值 target &#xff0c;写一个函数搜索 nums 中…

Vue3-shallowRef 和 shallowReactive函数(浅层次的响应式)

Vue3-shallowRef 和 shallowReactive函数&#xff08;浅层次的响应式&#xff09; shallowRef函数 功能&#xff1a;只给基本数据类型添加响应式。如果是对象&#xff0c;则不会支持响应式&#xff0c;层成也不会创建Proxy对象。ref和shallowRef在基本数据类型上是没有区别的…

CronExpression

CronTrigger配置格式: 格式: [秒] [分] [小时] [日] [月] [周] [年]序号 说明 是否必填 允许填写的值 允许的通配符 1 秒 是 0-59 , - * / 2 分 是 0-59 , - * / 3 小时 是 0-23 , - * / 4 日 是 1-31 , - * ? / L W 5 月 是 1-12 or JA…

解决终端自暂停的问题 关闭快速编辑模式 python代码暂停

问题描述&#xff1a; 在windows10终端中运行的python出现自暂停或者是点击鼠标后出现暂停&#xff0c;敲回车或空格它才继续运行。 解决方法 注意&#xff1a;windows不能针对所有命令行统一设置&#xff0c;只能单个程序去设置。 半永久 在终端右键点击属性 取消勾选快速…

os.path.join函数用法

os.path.join()是Python中用于拼接文件路径的函数&#xff0c;它可以将多个字符串拼接成一个路径&#xff0c;并且会根据操作系统的规则自动使用合适的路径分隔符。 注&#xff1a;Linux用的是/分隔符&#xff0c;而Windows才用的是\。 该函数属于os.path模块&#xff0c;因此在…

新增文章分类

pojo.Category package com.lin.springboot01.pojo;import jakarta.validation.constraints.NotEmpty; import lombok.Data;import java.time.LocalDateTime;Data public class Category {private Integer id;//主键NotEmptyprivate String categoryName;//分类名称NotEmptypr…

【Java 进阶篇】唤醒好运:JQuery 抽奖案例详解

在现代社交网络和电商平台中&#xff0c;抽奖活动成为吸引用户、提升用户参与度的一种常见手段。通过精心设计的抽奖页面&#xff0c;不仅可以增加用户的互动体验&#xff0c;还能在一定程度上提高品牌的知名度。本篇博客将通过详细解析 JQuery 抽奖案例&#xff0c;带领你走进…

Visio是什么软件,有哪些好用的Visio平替软件推荐?

1. 什么是Visio&#xff1f; Visio是一款由微软开发的流程图和矢量绘图软件&#xff0c;它可以帮助用户创建各种类型的图表、图示和流程图&#xff0c;从而更好地呈现和传达信息。Visio的功能强大&#xff0c;适用于各种领域&#xff0c;如项目管理、系统设计、流程优化等。…

【Java 进阶篇】插上翅膀:JQuery 插件机制详解

在前端开发中&#xff0c;JQuery 作为一个广泛应用的 JavaScript 库&#xff0c;为开发者提供了丰富的工具和方法&#xff0c;简化了 DOM 操作、事件处理等繁琐的任务。而在这个庞大的生态系统中&#xff0c;插件机制是 JQuery 的一项重要特性&#xff0c;使得开发者能够轻松地…

OpenAI的多函数调用(Multiple Function Calling)简介

我在六月份写了一篇关于GPT 函数调用&#xff08;Function calling) 的博客https://blog.csdn.net/xindoo/article/details/131262670&#xff0c;其中介绍了函数调用的方法&#xff0c;但之前的函数调用&#xff0c;在一轮对话中只能调用一个函数。就在上周&#xff0c;OpenAI…

【Java 进阶篇】深入理解 JQuery 事件绑定:标准方式

在前端开发中&#xff0c;处理用户与页面的交互是至关重要的一部分。JQuery作为一个广泛应用的JavaScript库&#xff0c;为我们提供了简便而强大的事件绑定机制&#xff0c;使得我们能够更加灵活地响应用户的行为。本篇博客将深入解析 JQuery 的标准事件绑定方式&#xff0c;为…

MySQL数据库下的Explain命令深度解析

Explain是一个非常有的命令&#xff0c;可以用来获取关于查询执行计划的信息&#xff0c;以及如何解释输出。Explain命令是查看查询优化器如何决定执行查询的主要方法。这个功能有一定的局限性&#xff0c;并不总是会说出真相&#xff0c;但是它的输出是可以获取的最好信息&…

无需公网IP、简单3步,直连远程NAS实现高速访问

面对NAS远程访问难题 蒲公英旁路组网盒子X1 一招搞定&#xff01; 无需公网IP、无需设置原有路由 简单3步&#xff0c;即可实现异地组网 更有点对点直连&#xff08;P2P&#xff09;模式 不限流量、不限速 传输速率取决于实际网络带宽 贝锐蒲公英X1&#xff0c;无需改变原…

2023年【危险化学品经营单位安全管理人员】考试题及危险化学品经营单位安全管理人员模拟试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 危险化学品经营单位安全管理人员考试题是安全生产模拟考试一点通总题库中生成的一套危险化学品经营单位安全管理人员模拟试题&#xff0c;安全生产模拟考试一点通上危险化学品经营单位安全管理人员作业手机同步练习。…

JVM虚拟机:CMS垃圾回收器的日志分析

本文重点 本文我们将学习CMS垃圾回收器的日志 使用CMS java -Xms20M -Xmx20M -XX:PrintGCDetails -XX:UseConcMarkSweepGC 类名 日志格式 分析 上面的日志我们分为了两部分&#xff0c;上面表示新生代&#xff0c;下面表示老年代。 ParNew表示年轻代收集器&#xff0c;6144…