如何确定RocketMQ中消费者的线程大小

news2025/1/24 7:20:55

背景

        随着物联网行业的发展、智能设备数量越来越多,随着设备活跃量过大,常常存在一些高并发的请求,形成了流量尖峰,过多的请求会压垮服务器,影响其他服务运行。因此,为了保护云端服务,需要对请求进行缓冲,RocketMQ就是一款非常优秀消息队列的中间件,在互联网领域久经考验,也被各个行业广泛应用。

        在上一篇文章中,介绍了RocketMQ的工作原理使用。物联网中如何使用RockeMQ

如何在配置文件端配置消费者线程大小

        当生产者将大量的消息堆积到消息队列中时,需要同步启用消费者去消费队列中的消息,达到动态平衡的效果。

        如下代码所示,在消费者类上,会使用@RocketMQMessageListener注解,并填写必要的属性:consumerThreadNumber:消费线程、主题、消费组。

@RocketMQMessageListener(
    // 指定消费线程大小
        consumeThreadNumber = 16,
        topic = TOPIC_DEMO,
        consumerGroup = "consumer_demo_group"

)
@Component
public class Consumer implements RocketMQListener<MessageExt> {

    private final String CHARSET = Charset.defaultCharset().name();

    @Override
    public void onMessage(MessageExt message) {
        byte[] body = message.getBody();
        String str = new String(body, Charset.forName(this.CHARSET));
        System.out.println("消费者消费的消息为: " + str);
    }
}

        其中的topicconsumerGroup可以指定一次就不会变啦,但是consumerThreadNumber会根据机器的性能发生变化;因此需要将其提出放到配置文件中,方便修改,比如"application.yaml"。

        那应该如何实现呢?

        其中 consumerThreadNumber = 16,表明填入的是一个static的变量,因此如果简单地使用@Value来进行注入变量是不成功的,因为它只能注入非静态变量。为了能实现从配置文件中读取变量,并转为static变量,采用了显示set方式赋值变量的方法。

/**
* 注入mq消费的线程数量
*/
public static int RocketMQThreadSize;

@Value("${biz.RocketMQThreadSize}")
public void setRocketMQThreadSize(int threadSize) {
RocketMQThreadSize = threadSize;
}

        那在配置文件中就可以配置RocketMQThreadSize的大小啦。如下在application.yaml就可以搞定。

biz: RocketMQThreadSize: 200

如何使得自定义的线程大小生效

        如上一章所示,可以得到静态的变量,那如何在消费者中生效呢?幸好RocketMQ提供一个接口可以实现消费者线程的自定义。

        在消费者的类需要实现RocketMQPushConsumerLifecycleListener接口即可,然后在类中实现prepareStart方法即可。具体如下所示:

@RocketMQMessageListener(
        topic = TOPIC_DEMO,
        consumerGroup = "consumer_demo_group"

)
@Component
public class Consumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {

    private final String CHARSET = Charset.defaultCharset().name();

    @Override
    public void onMessage(MessageExt message) {
        byte[] body = message.getBody();
        String str = new String(body, Charset.forName(this.CHARSET));
        System.out.println("消费者消费的消息为: " + str);
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        // 指定消费线程大小
        defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
        defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
    }
}

        在prepareStart方法中,指定一些必要的线程参数

  • 最大线程:defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
  • 最小线程:defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);

并且通过实验和查看源码,其中最大、最小设置一样才会生效。

如何确定合适的线程大小

        以上的步骤已经帮忙把线程设置提取出来啦,之后只需在配置文件中修改线程数大小,而不需去代码中修改,避免代码导致系统出现问题。那如何去确定线程的数量大小呢?

        线程是计算机执行任务的基本的单位,即消费任务可以交给线程去执行。

        当线程数量较少时,CPU性能不能充分发挥。但是线程数量过的就会导致过多的线程处于等待中,机器的负载升高。因此需要确定适合当前机器的线程数量。

        在RocketMQ线程调优有两个指标可以帮助大致确定消费线程大小:

  • 消费者的TPS,表明消费者的能力;
  • 机器负载,分为CPU负载和IO负载,和自身的核心数量有关。

        RocketMQ提供web界面,可以监测TPS的大小,这个数量当然是越大越好,但是也要考虑负载。

在这里插入图片描述

        在服务器输入top命令就可以看大,当前机器的负载:

在这里插入图片描述

分别为1、5、15分钟负载。

        在进行压测的时候,需要知道机器的核心数量,监测负载的时候负载的大小就不能超过核心数量。

        在测试的时候可以从小到大调节线程数大小,并且关注TPS和负载。

结尾

 以上就是确定消费者线程大小的整个过程,有疑问欢迎留言交流!!!
线程介绍

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/370798.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

KALOS.art AI 作品每周精选 006

KALOS.art —— AI 和 数字艺术作品展示及销售平台。创作者们可以在这创建自己的主页和画廊&#xff0c;收取充电打赏、以图库模式出售作品。爱好者们可以在这里探索发现&#xff0c;购买作品图片&#xff08;带商用授权&#xff09;&#xff0c;跟艺术家们开启私信通道交流。具…

在外包公司熬了 3 年终于进了字节,竭尽全力....

其实两年前校招的时候就往字节投了一次简历&#xff0c;结果很明显凉了&#xff0c;随后这个理想就被暂时放下了&#xff0c;但是这个种子一直埋在心里这两年除了工作以外&#xff0c;也会坚持写博客&#xff0c;也因此结识了很多优秀的小伙伴&#xff0c;从他们身上学到了特别…

云镜CVE-2021-44983复现

CVE-2021-44983复现漏洞信息漏洞复现读取flag&#x1f349; shell来源&#xff1a;https://yunjing.ichunqiu.com/cve/detail/967?type1&pay2漏洞信息 漏洞名称taocms 3.0.1 登陆后台后文件管理处存在任意文件下载漏洞漏洞编号CVE-2021-44983危害等级中危漏洞类型任意文…

大学毕业后,送了2个月外卖,哭了一整晚

先简单介绍一下自己&#xff0c;我来自湛江&#xff0c;大学学的的物流管理专业&#xff0c;现在就职于一家互联网公司&#xff0c;从事软件测试工作。 我来自湛江的一个偏远农村&#xff0c;家里兄弟姐妹多&#xff0c;父母无力负担我的学费&#xff0c;很多时候学费都是靠姐…

戴尔Latitude 3410电脑 Hackintosh 黑苹果efi引导文件

原文来源于黑果魏叔官网&#xff0c;转载需注明出处。硬件型号驱动情况主板戴尔Latitude 3410处理器英特尔酷睿i7-10510U已驱动内存8GB已驱动硬盘SK hynix BC511 NVMe SSD已驱动显卡Intel UHD 620Nvidia GeForce MX230(屏蔽)无法驱动声卡Realtek ALC236已驱动网卡Realtek RTL81…

哈希表以及哈希冲突

目录 哈希表 哈希冲突 1. 冲突发生 2. 比较常见的哈希函数 3. 负载因子调节(重点) 散列表的载荷因子概念 负载因子和冲突率的关系 冲突-解决-闭散列 线性探测 二次探测 冲突-解决-开散列 结尾 我们在前面讲解了TerrMap&#xff08;Set&#xff09;的底层是一个搜索…

雅思经验(十四)

剑10 test3 阅读p3这篇阅读比较难做下来&#xff0c;主要是这个题材我们不太熟悉&#xff0c;介绍了一种成为拉皮塔人&#xff0c;他们在太平洋上航行&#xff0c;很多岛屿上都有他们足迹&#xff0c;后来人们发掘、探索他们的历史的故事。1.derelict 与 abandoned 主要是前面的…

Mysql 语句优化 (Explain)

Mysql 语句优化 &#xff08;Explain&#xff09; 1. 概述 ​ 在 select 语句之前增加 explain 关键字&#xff0c; mysql 会在查询上设置一个标记&#xff0c;返回查询执行计划信息&#xff0c;而不是执行这条sql 字段formatjson时的名称含义idselect_id该语句的唯一标识sel…

图形编辑器:拖拽阻塞优化

大家好&#xff0c;我是前端西瓜哥。在图形编辑器中&#xff0c;想象这么一个场景&#xff0c;我们撤销了一些重要的操作&#xff0c;然后想选中一个图形&#xff0c;看看它的属性。你点了上去&#xff0c;然后你发现你再也无法重做了。 你以为你点了一下&#xff0c;但其实你…

Java知识复习(七)常见的设计模式(装饰、代理、观察、策略、建造)

前言 参考书籍&#xff1a;《秒懂设计模式》 1、装饰器模式&#xff08;Decorator&#xff09; 1、装饰器模式&#xff1a;对原始对象动态地进行“包装”&#xff0c;是对类实例“装饰”的结果&#xff1b;类似于继承的效果&#xff0c;但这个过程是动态的&#xff0c;是可设…

Java基础常见面试题-异常-泛型-反射-注解-SPI-序列化-IO流

Java基础常见面试题-异常-泛型 1 Exception 和 Error 有什么区别&#xff1f; 1**Exception** :程序本身可以处理的异常&#xff0c;可以通过 catch 来进行捕获。Exception 又可以分为 Checked Exception (受检查异常&#xff0c;必须处理) 和 Unchecked Exception (不受检查异…

构建系统发育树简述

1. 要点 系统发育树代表了关于一组生物之间的进化关系的假设。可以使用物种或其他群体的形态学&#xff08;体型&#xff09;、生化、行为或分子特征来构建系统发育树。在构建树时&#xff0c;我们根据共享的派生特征&#xff08;不同于该组祖先的特征&#xff09;将物种组织成…

Spring AOP之基于注解的使用

1、技术说明 AOP是思想&#xff0c;AspectJ是AOP思想的实现。 动态代理&#xff08;InvocationHandler&#xff09;&#xff1a;JDK原生的实现方式&#xff0c;需要被代理的目标类必须实现接口。因为这个技术要求代理对象和目标对象实现同样的接口&#xff08;兄弟两个拜把子模…

【SPSS】单样本T检验分析详细操作教程(附案例实战)

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

服务端开发之Java备战秋招面试3

今天继续学习&#xff0c;先做两题算法题练练手&#xff0c;在继续整理八股文&#xff0c;深入理解&#xff0c;才能在面试的时候有更好地表现&#xff0c;一起加油吧&#xff0c;希望秋招多拿几个令人心动的offer&#xff0c;冲吧。 目录 1、算法题&#xff1a;判断链表中是…

带你了解IP报警柱的特点

IP可视报警柱是一款室外防水紧急求助可视对讲终端。安装在学校、广场、道路人流密集和案件高发区域&#xff0c;当发生紧急情况或需要咨询求助时按下呼叫按钮立即可与监控中心值班人员通话&#xff0c;值班人员也可通过前置摄像头了解现场情况并广播喊话。IP可视报警柱的使用特…

【双重注意机制:肺癌:超分】

Dual attention mechanism network for lung cancer images super-resolution &#xff08;肺癌图像超分辨率的双重注意机制网络&#xff09; 目前&#xff0c;肺癌的发病率和死亡率均居世界恶性肿瘤之首。提高肺部薄层CT的分辨率对于肺癌筛查的早期诊断尤为重要。针对超分辨…

收割不易,五面Alibaba终拿Java岗offer

前言 前段时间有幸被阿里的一位同学内推&#xff0c;参加了阿里巴巴Java岗位的面试&#xff0c;本人19年双非本科软件工程专业&#xff0c;目前有一年半的工作经验&#xff0c;面试前就职于一家外包公司。如果在自己本人拿到offer之前&#xff0c;如果有人告诉我一年工作经验可…

会声会影2023专业版视频处理制作软件功能详细介绍

会声会影是一款专业的视频处理和制作软件&#xff0c;也是目前影楼制作结婚和一般视频特效制作的必备软件&#xff0c;他是一款专为个人及家庭所设计的数码影片编辑软件&#xff0c;可将数 字或模拟摄像机所拍下来的如成长写真、国外旅游、个人MTV、生日派对、毕业典礼等精彩生…

C++ 修改程序进程的优先级(Linux,Windows)

文章目录1、Linux1.1 常用命令1.1.1 不占用终端运行和后台运行方式1.1.2 查询进程1.1.3 结束进程1.1.4 优先级命令1.2 C 代码示例1.2.1 代码一1.2.2 代码二2、Windows2.1 简介2.2 函数声明2.3 C 代码示例2.3.1 代码一2.3.2 代码二结语1、Linux 1.1 常用命令 1.1.1 不占用终端…