【RocketMQ系列十四】RocketMQ中消息堆积如何处理

news2025/1/21 11:28:13

您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦
💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
😁 2. 毕业设计专栏,毕业季咱们不慌忙,几百款毕业设计等你选。
❤️ 3. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
❤️ 4. Ceph实战,从原理到实战应有尽有。 Ceph实战
❤️ 5. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门

文章目录

    • 1. 消息堆积
    • 2. 消息堆积出现的原因
    • 3. 如何解决消息堆积

1. 消息堆积

消息堆积顾名思义就是消息队列中堆积了大量未被处理的消息,主要发生在高并发的场景下,生产者发送消息的速率远大于消费者组消息的速度。在物联网的AIOT场景中比较常见。

在RocketMQ的Console上可以查看某个Topic上消息堆积的情况。

消息堆积

这里有个延迟就表示目前堆积的消息数。

2. 消息堆积出现的原因

消息堆积的本质原因还是消费者消费消息的速度赶不上生产者发送消息的速度。可能的情况有:

  1. 第一种情况: 新上线的消费者的消费逻辑存在Bug,导致消息不能被正常消费。这种场景主要存在于代码逻辑不严谨导致某些消息消费失败,或者消费超时,从而导致消息被大量堆积。

  2. 第二种情况:消费者实例宕机或者由于网络的原因不能连上Broker集群。这种情况主要是消费者实例可能是单节点或者机房网络不好的情况。

  3. 第三种情况:生产者短时间内大量发送消息到Broker端,消费者的消费能力不足。消费者消费消息往往是一些比较耗时的IO操作,比如操作数据库,调用其他服务。这导致消费者的消费速率远低于生产者发送速率。这种情况也是消息堆积的常见场景。

3. 如何解决消息堆积

  1. 解决第一种情况:对需要上线的消费者进行严格的测试,确保每种消息的场景都能覆盖到。另外,在上线的时候采用灰度发布,先灰度小范围的用户进行使用,确认没有问题了,在全量放开所有用户使用。

  2. 解决第二种情况:在上线消费者实例时需要,采用多实例,异地多活的方式,确保极端的情况下都能有消费者能够正常消费消息。

  3. 解决第三种情况:这种情况的解决本质上是如何提高消费者的消费速率。主要可以从如下方面解决:

    1. 同一个消费者组下,增加消费者实例。比如Topic中有8个队列,那么可以将消费者数量最多增加到8个。那么有同学会问为啥只增加到8个,我增加到9个,乃至10个行不行?答案是你可以增加10个消费者,但是多余的2个消费者是分不到Queue的。这是因为 在RocketMQ中某个topic下的某个队列只能被同一消费者组中的某个消费者消费。 如果消费者数量少于Queue的数量,那么有可能会出现消费不均的情况。

    2. 提高单个消费者的消费并行线程。RocketMQ 支持批量消费消息,可以通过修改DefaultMQPushConsumer 消费者类的consumeThreadMin(最少消费线程数),以及consumeThreadMax(最大消费线程数)来提高单个消费者的消费能力。

    3. 批量消费消息:

      某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量。建议使用5.x SDK的SimpleConsumer,每次接口调用设置批次大小,一次性拉取消费多条消息。

    下面就让我们来看个例子:

    生产者:使用的是DefaultMQProducer;

    	//4.创建消息
    		StopWatch stopWatch = new StopWatch();
    		stopWatch.start();
    		for (int i = 0; i < 20000; i++) {
    			// 创建消息,指定topic,以及消息体
    			Message message = new Message("heap_topic", ("消息堆积测试" + i).getBytes());
    			//5.发送消息
    			SendResult send = defaultMQProducer.send(message);
    			System.out.println(send);
    		}
    		stopWatch.stop();
    		System.out.println("生产者发送2万条消息用时="+stopWatch.getTotalTimeSeconds()+"秒");
    

    消费者:使用的是DefaultMQPushConsumer;

    	// 4.创建一个回调函数
    		consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    			System.out.println("本批次收到的消息数="+msgs.size());
    			// 5.处理消息
    			for (MessageExt msg : msgs) {
    				try {
    					Thread.sleep(1000);
    				} catch (InterruptedException e) {
    					throw new RuntimeException(e);
    				}
    				System.out.println("当前时间="+System.currentTimeMillis()+" 收到的消息内容:" + new String(msg.getBody()));
    			}
    			// 返回消费成功的对象
    			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    		});
    

    生产者329秒内发送了2万条消息,平均60条,

    image-20231014152350841

    而消费者消费一条消息需要一秒,所以生产者发送完消息之后,两个消费者还在消费。

    image-20231014144541572

image-20231014152042570

这里消费者使用的是DefaultMQPushConsumer消费者 每批次Broker端会向消费者推送32条消息,通过pullBatchSize字段设置,而消费者,每次消费1条消息,通过consumeMessageBatchMaxSize字段设置。

image-20231014153721132

当然,官方推荐使用SimpleConsumer进行批量消费消息。

	//每批次拉取16条消息
		int maxMessageNum = 16;
		// Set message invisible duration after it is received.
		Duration invisibleDuration = Duration.ofSeconds(15);
		// Receive message, multi-threading is more recommended.
		do {
			final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
			log.info("Received {} message(s)", messages.size());
			for (MessageView message : messages) {
				final MessageId messageId = message.getMessageId();
				try {
					consumer.ack(message);
					log.info("Message is acknowledged successfully, messageId={}", messageId);
				} catch (Throwable t) {
					log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
				}
			}
		} while (true);

官方的代码示例

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1130446.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Elasticsearch分词器-中文分词器ik

文章目录 使用standard analysis对英文进行分词使用standard analysis对中文进行分词安装插件对中文进行友好分词-ik中文分词器下载安装和配置IK分词器使用ik_smart分词器使用ik_max_word分词器 text analysis 使用standard analysis对英文进行分词 ES默认使用standard analys…

【Java】智慧医院绩效考核系统源码

医院绩效考核系统使用JAVA语言开发&#xff0c;采用B/S架构模式设计&#xff0c;后台使用MySql数据库进行管理的一整套计算机应用软件。系统和his系统进行对接&#xff0c;按照设定周期&#xff0c;从his系统获取医院科室和医生、护士、其他人员工作量&#xff0c;对没有录入信…

【MySQL架构篇】逻辑架构

逻辑架构 文章目录 逻辑架构1. 服务器处理客户端请求2. Connectors3. 第一层&#xff1a;连接层4. 第二层&#xff1a;服务层5. 第三层&#xff1a;存储引擎6. 存储层7. 小结 1. 服务器处理客户端请求 首先 MySQL 是典型的 C/S 架构&#xff0c;即 Client/Server 架构&#xf…

idea 基础设置

1、设置 IDEA 主题 2、自动导包和优化多余的包 3、同一个包下的类&#xff0c;超过指定个数的时候&#xff0c;导包合并为* 4、显示行号 &#xff0c; 方法和方法间的分隔符&#xff1a; 5、忽略大小写&#xff0c;进行提示 6、多个类不隐藏&#xff0c;多行显示 7、设置默认的…

2023高频前端面试题-CSS

1. CSS 选择器的优先级是怎么样的&#xff1f; CSS 选择器的优先级顺序&#xff1a; 内联样式 > ID选择器 > 类选择器 > 标签选择器 优先级的计算&#xff1a; 优先级是由 A、B、C、D 四个值来决定的&#xff0c;具体计算规则如下 A{如果存在内联样式则为 1&…

计算机网络-计算机网络体系结构-应用层

目录 一、网络应用模型 客户/服务器模型(Client/Server) P2P模型(Peer-to-peer) 二、域名解析系统(DNS) 域名 域名服务器 解析过程 三、文件传输协议(FTP) FTP控制原理 四、电子邮件 组成结构 协议 SMTP MIME POP3 IMAP 五、万维网和HTTP协议 概述 HTTP 报…

MySQL数据库---入门篇

文章目录 数据库介绍什么是数据库&#xff1f;数据库分类 MySQL的结构MySQL客户端和服务器MySQL服务器是如何组织数据的&#xff1f; 数据库操作显示当前数据库创建数据库使用数据库删除数据库 数据库中常用数据类型数值类型字符串类型日期类型 表的操作创建表查看表结构查看当…

Linux系统编程:线程

从进程到线程 为什么需要线程&#xff1f;这是因为进程本身存在一定问题&#xff1a; 首先是进程切换时&#xff0c;各类进程资源如寄存器CPU、包括虚拟地址和物理地址要进行映射等等进行上下文切换&#xff0c;这是非常消耗资源和时间的事情&#xff0c;并且实现进程间通信非…

死锁的发生原因和怎么避免

死锁 死锁&#xff0c;简单来说就是两个或者两个以上的线程在执行的过程中&#xff0c;争夺同一个共享资源造成的相互等待的现象。如果没有外部干预&#xff0c;线程会一直阻塞无法往下执行&#xff0c;这些一直处于相互等待资源的线程就称为死锁线程。 死锁产生原因 导致死…

使用强化学习训练 AI 去玩神奇宝贝

使用强化学习训练 AI 去玩神奇宝贝 这两天在逛 Youtube 的时候意外发现了一个非常有趣的视频&#xff0c;十天的时间已经获得了两百多万的点击&#xff1a; 现在已经 360w 点击了 视频的名称就和题目的名称一样&#xff1a;Training AI to Play Pokemon with Reinforcement Le…

Kaggle - LLM Science Exam(四):Platypus2-70B with Wikipedia RAG

文章目录 一、赛事概述1.1 OpenBookQA Dataset1.2 比赛背景1.3 评估方法和代码要求1.4 比赛数据集1.5 优秀notebook1.6 RAG 二、Platypus2-70B with Wikipedia RAG&#xff08;Version8&#xff09;2.1 离线安装依赖2.2 导入库并设置常量2.3设置辅助功能2.4 SentenceTransforme…

phpstorm+phpstudy+xdebug快速搭建php调试环境

1、安装phpstudy 让你的项目能正常跑起来&#xff0c;再来进行下一步 2、安装拓展 勾选需要用到的插件&#xff0c;配置好端口 再php.ini最下面复制如下配置&#xff0c;插件的地址按实际路径配置 [Xdebug] zend_extensionD:/phpstudy_pro/Extensions/php/php5.6.9nts/ext/p…

UG\NX二次开发 实现“适合窗口”的功能

文章作者:里海 来源网站:王牌飞行员_里海_里海NX二次开发3000例,里海BlockUI专栏,C\C++-CSDN博客 感谢粉丝订阅 感谢 shsjdj 订阅本专栏,非常感谢。 简介 实现“适合窗口”的功能 效果 代码1 #include "me.hpp"extern DllExport void ufusr(char* param, int* re…

java将list转为逗号隔开字符串,将逗号连接的字符串转成字符数组,​将逗号分隔的字符串转换为List​(Java逗号分隔-字符串与数组相互转换)

一、通过testList.stream().collect(Collectors.joining(",")) &#xff0c;通过流转换&#xff0c;将list转为逗号隔开字符串 List<String> testList new ArrayList<>(); testList.add("test1"); testList.add("test2"); testList…

Jenkins部署失败:JDK ‘jdk1.8.0_381‘ not supported to run Maven projects

Jenkins部署报错&#xff1a;JDK ‘jdk1.8.0_381’ not supported to run Maven projects提示使用的jdk有问题&#xff0c;启动的jdk版本不能满足项目启动。 登录Jenkins管理页面&#xff0c;系统管理——全局工具配置——JDK安装配置满足条件的JDK版本&#xff0c;保存配置&…

Stable-diffusion-webui

AI 画图&#xff0c;之前整理的 AI换脸 CSDN不给通过&#xff0c;说是换脸之类的不给通过&#xff0c;只能自己看了。 GitHub&#xff1a;https://github.com/AUTOMATIC1111/stable-diffusion-webuihttps://github.com/AUTOMATIC1111/stable-diffusion-webui 安装完毕跑起来大概…

k8s-----18、Ingress(对外服务)

Ingress 1、Ingress概念2、 pod和ingress的关系3、 Ingress的工作流程4、 使用步骤5、对外暴露应用实战5.1 创建nginx应用&#xff0c;对外暴露端口使用NodePort5.2 部署ingress controller5.3 创建ingress规则5.4 访问 1、Ingress概念 k8s 对外暴露服务&#xff08;service&am…

uniapp解决iOS切换语言——原生导航栏buttons文字不生效

uniapp 切换语言原生导航栏buttons文字不生效&#xff1f; 文章目录 uniapp 切换语言原生导航栏buttons文字不生效&#xff1f;效果图page.json配置解决方式 效果图 场景&#xff1a;在 tabbar 页面中&#xff0c;配置 原生导航栏 buttons &#xff0c;切换语言时&#xff0c;不…

单片机中的 _nop_() 函数及 us 延时

使用 _nop_() 函数做延时遇到的一些问题 ...... by 矜辰所致前言 最近还是继续做着项目&#xff0c;因为在某 8051 内核芯片上使用到了 I2C 通讯&#xff0c;又需要 _nop_() 函数来实现 us 延时&#xff0c;那么正好来写一篇与_nop_() 函数有关的文章 。 我是矜辰所致&…

给运行中的docker容器挂载目录——筑梦之路

使用场景 对于一个已经运行的容器&#xff0c;如果后续需要新挂载一个目录怎么办&#xff1f;为什么不能重新创建一个容器&#xff1f; 容器内可能安装过很多东西&#xff0c;很费时&#xff0c;如果重新创建一个容器再挂载&#xff0c;还得重新安装很多东西&#xff0c;非常费…