dubbo线程池为什么耗尽

news2025/1/20 5:52:17

文章概述

大家可能都遇到过DUBBO线程池打满这个问题,报错如下,本文我们就一起分析DUBBO线程池打满这个问题。
cause: org.apache.dubbo.remoting.RemotingException: Server side(10.0.0.100,20881) thread pool is exhausted, detail msg:Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-10.0.0.100:20881, Pool Size: 800 (active: 800, core: 800, max: 800, largest: 800), Task: 50397601 (completed: 50396801), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://10.0.0.100:20881!

1 DUBBO线程模型

先看一张图大概了解
未命名文件 (5).png

** IO线程**

IO线程的工作实际上就是处理字节流的输入输出,对消息的读取,序列化,不涉及业务操作
NettyServer中启动netty服务端,初始化boss和work线程信息

  protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
                getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                "NettyServerWorker");

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NettyEventLoopFactory.serverSocketChannelClass())
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                            ch.pipeline().addLast("negotiation",
                                    SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                        }
                        ch.pipeline()
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

这里分别看线程数量

    bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
    workerGroup = NettyEventLoopFactory.eventLoopGroup(
                getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                "NettyServerWorker");

int DEFAULT_IO_THREADS = Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);

boss线程设置为1
主要看work线程(IO线程)
从url中获取线程数,如果没设置的话,设置当前机器的线程数,最少设置为32个
这个配置是iothreads,如果配置的这样配置。但是线程池耗尽并不是io线程数量不够的原因

provider:
    iothreads: 100
@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        handler.received(channel, msg);
    }
  @Override
    public void received(Channel channel, Object message) throws RemotingException {
        setReadTimestamp(channel);
        if (isHeartbeatRequest(message)) {
            Request req = (Request) message;
            if (req.isTwoWay()) {
                Response res = new Response(req.getId(), req.getVersion());
                res.setEvent(HEARTBEAT_EVENT);
                channel.send(res);
                if (logger.isInfoEnabled()) {
                    int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                    }
                }
            }
            return;
        }
        if (isHeartbeatResponse(message)) {
            if (logger.isDebugEnabled()) {
                logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
            }
            return;
        }
        handler.received(channel, message);
    }

消息的不同类型有不同的处理方式如果是心跳直接就发送回去了,
如果是业务请求那么交给业务线程池处理

  @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
        	if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

业务线程池

初始化
不同线程池策略会创建不同特性的线程池:
dubbo提供了不同的线程池类型

fixed
包含固定个数线程

cached
线程空闲一分钟会被回收,当新请求到来时会创建新线程

limited
线程个数随着任务增加而增加,但不会超过最大阈值。空闲线程不会被回收

eager
当所有核心线程数都处于忙碌状态时,优先创建新线程执行任务,而不是立即放入队列

一般实际使用的就是fixed

public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

这里主要看两个参数,分别是线程数,和队列长度。默认的线程数是200,queue默认使用SynchronousQueue
SynchronousQueue由于其独有的线程一一配对通信机制,由于内部没有使用AQS,而是直接使用CAS,其并没有存储任务的队列就是将任务与线程进行匹配,如果任务进来,没用可用线程,那么将直接拒绝,这也是我们碰到拒绝策略的原因
如果需要配置

dubbo:
  protocol:
    threads: 800
    queues: 10000

业务线程线程池拒绝

这里就可以看到线程池拒绝AbortPolicyWithReport

 @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "
                + "%d)," +
                " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
            threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
            e.getLargestPoolSize(),
            e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
            url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        dumpJStack();
        throw new RejectedExecutionException(msg);
    }

也就是开头的那个报错,这里在发生问题会自动dump stack信息

线程池中的 getTaskCount 和 getCompletedTaskCount 是两个重要的方法,它们用于获取线程池的任务和已完成任务的统计信息。

  1. getTaskCount: 这个方法返回线程池中的当前任务数。它包括正在执行的任务和等待执行的任务。换句话说,它返回的是线程池中所有任务的总数,包括那些尚未开始执行的任务。
  2. getCompletedTaskCount: 这个方法返回线程池已完成的任务数量。它只计算那些已经完成执行的任务,而不包括正在执行或等待执行的任务。

再回头我们的那个报错。
Pool Size: 800 (active: 800, core: 800, max: 800, largest: 800), Task: 50397601 (completed: 50396801)

2、估算合适的线程数,寻找慢业务

我们知道DUBBO会选择线程池策略进行业务处理,那么如何估算可能产生的线程数呢?我们首先分析一个问题:一个公司有7200名员工,每天上班打卡时间是早上8点到8点30分,每次打卡时间系统耗时5秒。请问RT、QPS、并发量分别是多少?
RT表示响应时间,问题已经告诉了我们答案:
RT = 5

QPS表示每秒查询量,假设签到行为平均分布:
QPS = 7200 / (30 * 60) = 4

并发量表示系统同时处理的请求数量:
并发量 = QPS x RT = 4 x 5 = 20

根据上述实例引出如下公式:
并发量 = QPS x RT

如果系统为每一个请求分配一个处理线程,那么并发量可以近似等于线程数。基于上述公式不难看出并发量受QPS和RT影响,这两个指标任意一个上升就会导致并发量上升。
但是这只是理想情况,因为并发量受限于系统能力而不可能持续上升,例如DUBBO线程池就对线程数做了限制,超出最大线程数限制则会执行拒绝策略,而拒绝策略会提示线程池已满,这就是DUBBO线程池打满问题的根源。下面我们分别分析RT上升和QPS上升这两个原因。
注意上面仅仅是一个例子,实际上一个服务远比例子复杂,实践往往需要不断的调参数。才能找到合理的值
线程池耗尽,往往是因为某个业务慢导致,我们应该寻找执行缓慢的堆栈,例如使用arthas来监控。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1330859.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IDEA的facets和artifacts

在软件开发领域&#xff0c;IDEA 是指 JetBrains 公司的 IntelliJ IDEA&#xff0c;是一款流行的集成开发环境&#xff08;Integrated Development Environment&#xff09;。在 IntelliJ IDEA 中&#xff0c;"facets" 和 "artifacts" 是两个概念&#xff…

【学习代码】PyTorch代码 练习

文章目录 AttentionSEAttention【显式地建模特征通道之间的相互依赖关系】【easy】CBAM Attention【SEAttention通道注意力 空间注意力】【middle】SelfAttention【重点&#xff1a;理解意义 and 实践&#xff01;】【hard】 Attention SEAttention【显式地建模特征通道之间的…

找到字符串中所有字母异位词--滑动窗口

个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 本体题目链接https://leetcode.cn/problems/VabMRr/description/ 算法原理 滑动窗口其实就是种双指针&#xff0c;只是这种双指针只向后移动&#xff0c;不会回退&#xff0c;具有单调性&#xff0c;也就是说&#xff0c;…

C语言的分支和循环语句

各位少年&#xff0c;今天和大家分享的是分支语句循环体语句&#xff0c;C语言是结构体的程序设计语言&#xff0c;这里的结构指的是&#xff08;顺序结构&#xff09;&#xff08;选择结构&#xff09;&#xff08;循环结构&#xff09;C语言是能够实现这三种结构的&#xff0…

了解树和学习二叉树

1.树 1.1 概念 树是一种 非线性 的数据结构&#xff0c;它是由 n &#xff08; n>0 &#xff09;个有限结点组成一个具有层次关系的集合。 把它叫做树是因为它看 起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝下的 。 注意&#xff1a;树形结构中…

html网页编写语言

html是一门语言&#xff0c;所有的网页都是用html这门语言编写出来的。 HTML&#xff08;HyperText Markup Language&#xff09;&#xff1a;超文本标记语言。 超文本&#xff1a;超越了文本的限制&#xff0c;比普通文本更强大。除了文字信息&#xff0c;还可以定义图片&…

产品原型设计软件 Axure RP 9 mac支持多人写作设计

axure rp 9 mac是一款产品原型设计软件&#xff0c;它可以让你在上面任意构建草图、框线图、流程图以及产品模型&#xff0c;还能够注释一些重要地方&#xff0c;axure rp汉化版可支持同时多人写作设计和版本管理控制&#xff0c;这款交互式原型设计工具可以帮助设计者制作出高…

【YOLOV8预测篇】使用Ultralytics YOLO进行检测、分割、姿态估计和分类实践

目录 一 安装Ultralytics 二 使用预训练的YOLOv8n检测模型 三 使用预训练的YOLOv8n-seg分割模型 四 使用预训练的YOLOv8n-pose姿态模型 五 使用预训练的YOLOv8n-cls分类模型 <

linux运维的面试题一

1.linux启动过程 1加电 2加载主板bios设置 3加载多重操作系统启动管理器grub 4加载内核系统到内存当中 5加载配置文件 6加载内核模块 7完成相应的初始化工作和启动相应的服务 8启动系统进程 9出现登录界面 10开机启动完成 2.安装过操作系统吗&#xff1f;怎么安装? 1.小批量设…

波奇学Linux:进程替换

单进程替换 excel使得能够在文件中运行系统指令 int excel (系统文件地址&#xff0c;系统指令&#xff0c;指令参数&#xff0c;NULL)&#xff1b; 成功时无返回值&#xff0c;失败时返回-1 如图进程成功运行第一个printf后再运行指令&#xff0c;但没有输出第二个printf的内容…

Jenkins 插件管理指南

目录 常用插件 插件安装 已安装插件 installed plugins 常用插件 Docker Plugin&#xff1a; 这个插件让Jenkins能够与Docker容器平台进行集成。它允许在Jenkins构建过程中创建、管理和销毁Docker容器&#xff0c;为需要Docker化的项目提供了极大的便利性。对于需要在容器中…

Leetcode—1491.去掉最低工资和最高工资后的工资平均值【简单】

2023每日刷题&#xff08;六十八&#xff09; Leetcode—1491.去掉最低工资和最高工资后的工资平均值 实现代码 class Solution { public:double average(vector<int>& salary) {double sum 0;int n salary.size();sort(salary.begin(), salary.end());for(int i…

全部没有问题(三)

C语言 吞空格 一共三种办法&#xff1a; fflush(stdin), while(getchar()!\n), scanf("%*c") p.s visual studio已经移除gets函数了&#xff0c;要换编译器/函数&#xff0c;visual C可以的 #include <stdio.h>int main() {char x[100], y[100], z[100];i…

贪吃蛇(十)贪吃蛇吃食物

上节讲到限制蛇身回头&#xff0c;本节要实现吃食物功能 实现思路 在存储上食物方面可以复用蛇的结构体。初始化食物的时候&#xff0c;我们设置食物的坐标&#xff0c;每次调用这个函数的时候&#xff0c;坐标发生一些规律的变化。另外我们需要扫描食物的函数&#xff0c;这…

数据大模型与低代码开发:赋能技术创新的黄金组合

在当今技术领域&#xff0c;数据大模型和低代码开发已经成为两个重要的趋势。数据大模型借助庞大的数据集和强大的计算能力&#xff0c;助力我们从海量数据中挖掘出有价值的洞见和预测能力。与此同时&#xff0c;低代码开发通过简化开发流程和降低编码需求&#xff0c;使得更多…

淘宝商品评论API:电商行业的战略资源与制胜之道

在电商行业&#xff0c;数据是金。而其中&#xff0c;用户评论数据更是无价之宝。它不仅仅反映了商品的质量和卖家的服务态度&#xff0c;更是消费者在决策时的关键参考。正因如此&#xff0c;获得淘宝商品评论API的重要性不言而喻。 一、数据背后的无尽宝藏 淘宝&#xff0c;…

java的XWPFDocument3.17版本学习

maven依赖 <dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>3.17</version> </dependency> 测试类&#xff1a; import org.apache.poi.openxml4j.exceptions.InvalidFormatExcep…

人工智能_机器学习072_SVM支持向量机_人脸识别模型训练_训练时间过长解决办法_数据降维_LFW人脸数据建模与C参数选择---人工智能工作笔记0112

我们先来看一下之前的代码: import numpy as np 导入数学计算库 from sklearn. svm import SVC 导入支持向量机 线性分类器 import matplotlib.pyplot as plt 加载人脸图片以后,我们用pyplot把人脸图片数据展示一下 from sklearn.model_selection import train_test_split 人脸…

3. 结构型模式 - 组合模式

亦称&#xff1a; 对象树、Object Tree、Composite 意图 组合模式是一种结构型设计模式&#xff0c; 你可以使用它将对象组合成树状结构&#xff0c; 并且能像使用独立对象一样使用它们 问题 如果应用的核心模型能用树状结构表示&#xff0c; 在应用中使用组合模式才有价值。 …

优化模型:MATLAB整数规划

一、整数规划介绍 1.1 整数规划的定义 若规划模型的所有决策变量只能取整数时&#xff0c;称为整数规划。若在线性规划模型中&#xff0c;变量限制为整数&#xff0c;则称为整数线性规划。 1.2 整数规划的分类 整数规划模型大致可分为两类&#xff1a; &#xff08;1&…