rocketmq中ConsumeThreadMax不生效原因探究

news2025/1/22 18:09:53

在项目中设置了ConsumeThreadMax但是消息消费时仍是单线程消费,故而进行追踪排查。

其根本原因是rocketmq中,消费者线程池队列使用的是LinkedBlockingQueue。

rocket版本:rocketmq-client-4.9.3

代码追踪:从DefaultMQPushConsumer入手,org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start方法启动消费者

    /**
     * This method gets internal infrastructure readily to serve. Instances must call this method after configuration.
     *
     * @throws MQClientException if there is any client error.
     */
    @Override
    public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        this.defaultMQPushConsumerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

在defaultMQPushConsumerImpl.start方法中继续跟踪

public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();

                this.copySubscription();

                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }

                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();

                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                this.consumeMessageService.start();

                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
    }

因为大部分情况下使用的是ConsumeMessageConcurrentlyService并发服务为例,查看new ConsumeMessageConcurrentlyService构造方法。

    public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerConcurrently messageListener) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListener;

        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

        String consumeThreadPrefix = null;
        if (consumerGroup.length() > 100) {
            consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup.substring(0, 100)).append("_").toString();
        } else {
            consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString();
        }
        this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl(consumeThreadPrefix));

        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
    }

从构造方法中,很明显看出创建了一个线程池,ThreadPoolExecutor构造方法详细如下图。

 所以很清晰知道默认队列就是this.consumeRequestQueue,也就是LinkedBlockingQueue,而LinkedBlockingQueue默认大小Integer.MAX_VALUE,所以队列一直没满,也就不会创建新线程。

参考spring-rocket里的解决方案,就是把ConsumeThreadMax和ConsumeThreadMin都设置为同一个数,默认20.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/553864.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++设计模式之抽象工厂模式(Abstract Factory)

文章目录 定义前言问题解决方案 结构适用场景实现方式优点缺点与其他模式的关系实例 [C]22种设计模式的C实现大纲 定义 抽象工厂是一种创建型设计模式&#xff0c;它能创建一系列相关的对象&#xff0c;而无需指定其具体类。 前言 问题 假设你正在开发一款家具商店模拟器。…

Multi-Head Attention和Transformer Decoder(GPT)详解

文章目录 一、Transformer的Attention1. Self-Attention2. Masked Self-Attention3. Multi-Head Attention 二、Transformer Decoder&#xff08;GPT&#xff09;1. GPT的网络结构2. GPT的计算原理 一、Transformer的Attention 1. Self-Attention 如前篇文章所述&#xff08;ht…

如何开发一个人人爱的组件?

组件&#xff0c;是前端最常打交道的东西&#xff0c;对于 React、Vue 等应用来说&#xff0c;万物皆组件毫不为过。 有些工作经验的同学都知道&#xff0c;组件其实也分等级的&#xff0c;有的组件可以被上万开发者复用&#xff0c;有些组件就只能在项目中运行&#xff0c;甚…

Springboot +spring security,配置多个数据源:验证不同用户表

一.简介 上篇文章写到&#xff0c;我们在配置jdbc和mybatis 来源&#xff0c;进行登录后&#xff0c;出现了如下错误! 后面解决方案是&#xff1a;屏蔽了其中一个来源&#xff0c;登陆成功&#xff0c;也分析了其原因。 但是&#xff0c;但是如果需要配置多个数据来源&#…

2023年认证杯SPSSPRO杯数学建模B题(第一阶段)考订文本全过程文档及程序

2023年认证杯SPSSPRO杯数学建模 B题 考订文本 原题再现&#xff1a; 古代文本在传抄过程中&#xff0c;往往会出现种种错误&#xff0c;以至于一部书可能流传下来多种版本。在文献学中&#xff0c;错误往往被总结成“讹”、“脱”、“衍”、“倒”等形式&#xff0c;也可能同…

cda 1级模拟题错题知识点总结

Sql truncate函数 格式&#xff1a;TRUNCATE(number, decimals) number: the number to be truncated decimals:the number of decimal places to truncate to 截断到的小数位数&#xff0c;如果为0则表示不保留小数 例如: select truncate(2.83,0) 结果为2 select truncate(…

解读kubernetes部署:配置docker私服密钥与SSL证书创建

为k8s配置docker私服密钥 为了kubernetes有权访问您的docker私服&#xff0c;需要在kubernetes的凭证中建立docker私服的密钥&#xff1a; kubectlcreatesecretdocker-registryaliyun-secret--docker-server--docker-username--docker-password--docker-email--namespacens-jav…

2.golang的变量、常量、数据类型、循环和条件判断

一、变量 变量&#xff08;Variable&#xff09;的功能是存储数据。Go语言中的每一个变量都有自己的类型&#xff0c;并且变量必须经过声明才能开始使用。 Go语言的变量声明格式为&#xff1a; var 变量名 变量类型 例如&#xff1a; var name string var age int var isOk b…

线上问题处理案例:出乎意料的数据库连接池 | 京东云技术团队

导读 本文是线上问题处理案例系列之一&#xff0c;旨在通过真实案例向读者介绍发现问题、定位问题、解决问题的方法。本文讲述了从垃圾回收耗时过长的表象&#xff0c;逐步定位到数据库连接池保活问题的全过程&#xff0c;并对其中用到的一些知识点进行了总结。 一、问题描述…

LabVIEWCompactRIO 开发指南29 数据通信

LabVIEWCompactRIO 开发指南29 数据通信 LabVIEW FPGA中的数据通信分为两类&#xff1a;进程间和目标间。进程间通信通常对应于FPGA目标上的两个或多个环路之间的数据共享。目标间数据通信是在FPGA目标和主机处理器之间共享数据。对于这两种情况&#xff0c;在决定使用哪种机…

扩散能垒计算在电池材料领域的革新应用

扩散能垒计算在电池材料领域的革新应用 随着能源需求的增长和环境意识的提高&#xff0c;电池技术成为解决可再生能源存储和移动电子设备需求的关键。电池材料的研究和开发变得日益重要&#xff0c;而扩散能垒计算作为一种先进的计算方法&#xff0c;为电池材料领域带来了革新的…

设计模式之【观察者模式】,MQ的单机实现雏形

文章目录 一、什么是观察者模式1、观察者模式应用场景2、观察者模式的四大角色3、观察者模式优缺点 二、实例1、观察者模式的一般写法2、微信公众号案例3、鼠标响应事件API案例 三、实现一个异步非阻塞框架1、EventBus2、使用MQ 四、源码中的观察者模式1、Observable/Observer2…

pygam第3课——画图小程序

前言&#xff1a;我们前两节课已经学习了&#xff0c;界面的设计、图片的加载、那么今天我们将继续学习pygame的基础知识&#xff0c;我们的今天学习的内容是&#xff1a;鼠标滑动时坐标的实时获取、鼠标的移动事件、鼠标的点击事件、图形绘制等。希望大家能 搭建界面&#xf…

firewalld防火墙(又到了可以看日落和晚霞的日子了)

文章目录 一、firewalld概述二、firewalld和iptables的关系三、firewalld区域的概念四、firewalld数据处理流程五、firewalld检查数据包源地址的规则六、firewalld防火墙的配置种类1.运行时配置2.永久配置 七、firewalld防火墙的配置方法八、使用命令配置firewalld防火墙1.获取…

Ventoy 多合一启动盘制作工具神器 - 将多个系统 Win/PE/Linux 镜像装在1个U盘里

最近很多操作系统都纷纷发布了新版本&#xff0c;比如 Windows 11、Ubuntu、Deepin、优麒麟、CentOS、Debian 等等&#xff0c;对喜欢玩系统的人来说绝对是盛宴。 不过一般用 Rufus 等工具&#xff0c;一个 U 盘往往只能制作成一个系统的启动盘/安装盘&#xff0c;想要增加另一…

零入门kubernetes网络实战-33->基于nat+brigde+veth pair形成的跨主机的内网通信方案

《零入门kubernetes网络实战》视频专栏地址 https://www.ixigua.com/7193641905282875942 本篇文章视频地址(稍后上传) 本文主要使用的技术是 nat技术Linux虚拟网桥虚拟网络设备veth pair来实现跨主机网桥的通信 1、测试环境介绍 两台centos虚拟机 # 查看操作系统版本 cat …

VIBRO-METER VM600 IRC4 智能继电器卡

额外的继电器&#xff0c;由来自MPC4和/或AMC8卡的多达86个输入的方程驱动&#xff0c;用于需要2oo3表决等更复杂的逻辑时8个继电器&#xff0c;可配置为8个SPDT或4个DPDT使用IRC4配置器软件进行完全软件配置继电器可配置为正常通电(NE)或正常断电(NDE)&#xff0c;具有可配置的…

小航助学GESP_C++一级模拟测试试卷(含题库答题软件账号)

GESP在线模拟训练系统请点击 电子学会-全国青少年编程等级考试真题Scratch一级&#xff08;2019年3月&#xff09;在线答题_程序猿下山的博客-CSDN博客_小航答题助手 答案:A 第1题人们在使用计算机时所提到的 Windows 通常指的是&#xff08;&#xff09;。 A、操作系统B、多…

Science Bulletin:张占军教授团队提出“额叶保持,颞叶损伤” 假说解析成功认知老化

步入老年后&#xff0c;各项认知能力会逐渐衰退&#xff0c;我们把这一过程称之为认知老化。认知老化的过程与速度因人而异&#xff0c;走向阿尔茨海默病&#xff08;AD&#xff09;等认知障碍疾病为结局的属于病理认知老化&#xff0c;也就是经历轻度认知障碍阶段&#xff0c;…

【分享】PowerPoint如何设置保护和加密?

想保护自己做好的PPT&#xff0c;通常用的方法就是给PPT加密。下面我们来看看PPT加密保护方式有几种&#xff0c;具体如何操作。 打开PPT&#xff0c;点击菜单【文件】&#xff0c;再依次点击【信息】-【保护演示文稿】&#xff0c;就可以看到设置密码保护的5个选项。 选项1&a…