RocketMQ生产者消息发送出去了,消费者一直接收不到怎么办?(Rocket MQ订阅关系一致性)

news2025/1/10 10:38:27

问题: 使用RocketMQ消息队列,生产者将数据发送出去了,但是生产者一致没接收到(或者是间隔好几分钟,突然接收到一条数据)怎么办?并且通过rocket web控制台查看消息的状态为NOT_ONELINE或者NOT_CONSUME,(如下图) 这种诡异现象该怎么解决?
在这里插入图片描述

1. 先说解决方案

这种情况99%是由于订阅关系不一致导致的,可以排查下程序看看是否有多个消费者使用了同一个group,并且订阅了不同的主题。逻辑图展示如下:
在这里插入图片描述
这种情况只需要将不同的消费者的group区分一下即可, 逻辑关系图变成如下这种:
在这里插入图片描述

到此为止,是不是惊奇的发现,问题解决了?

2. 注意事项:订阅关系一致性

看下Rocket MQ官方文档给出的说明:

定义
消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。

和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。

在这里插入图片描述
这里面只描述出了Tag的一致,事实上下面这种订阅关系也是错误的,同一个group中的两个消费者分别订阅了不同的主题, 违背了定义中的消费行为一致原则:

//Consumer c1
Consumer c1 = ConsumerBuilder.build(groupA);
c1.subscribe(topicA);
//Consumer c2Consumer 
c2 = ConsumerBuilder.build(groupA);
c2.subscribe(topicB);

3. 剖析源码实现,分析原因

从GitHub下载rocketmq源码通过idea打开之后,从官方提供的example进来:
在这里插入图片描述
进入到DefaultMQPushConsumer构造方法中,可以发现初始化了一个DefaultMQPushConsumerImpl类:

    public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.consumerGroup = consumerGroup;
        this.namespace = namespace;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        // 这里初始化一个默认的push类型的Consumer实现类
        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    }

然后继续进入到DefaultMQPushConsumerImpl类中, 可以看见有一个成员变量MQClientInstance mQClientFactory, 在DefaultMQPushConsumerImpl类的start()(启动消费者)方法中会通过MQClientManager初始化MQClientInstance类.
在这里插入图片描述
接着跳转到MQClientInstance构造方法中, 会发现有这样一行代码, 初始化了一个rebalanceService. 这个rebalanceService就是RocketMQ隔一段时间进行rebalance的核心实现.
在这里插入图片描述
继续剖析RebalanceService类, 发现其实现了Runnable接口, 话不多说, 直接看其 run()方法中做了什么事.

呀! 原来是隔一段时间调用一次上述咱们提到的DefaultMQPushConsumerImpl类中的doRebalance()方法, 搞了半天又绕回来了. … … … … … …

直接进入到这里面, 看看rebalance的逻辑:
在这里插入图片描述
集群部署模式下, 会进行rebalance操作, 根据topic名称和group名称获取到所有的consumer列表.

case CLUSTERING: {
   Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
   // 这里根据topic名称和Group进行获取到所有的consumer
   List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
   if (null == mqSet) {
       if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
           this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());
           log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
       }
   }

但是进去这行代码里面发现, topic名称仅仅用来获取Broker的网络地址, 真正获取到所有Consumer列表的是通过Group名称获取的, 看到这里相信大家基本上能够恍然大悟. 回归到上面的问题: 如果一个一个Group中的多个消费者分别订阅了不同的主题, 即: 消费行为不一致, 无论这个属于当前Group中的消费者是否订阅了这个主题, 都会参与rebalance.
在这里插入图片描述
画图解释一下, 假设在同一个Group下, 两个Consumer都分别订阅了Topic1和Topic2, 这种情况订阅关系一致,
在这里插入图片描述
假设消费者1消费Topic2的速度比较快, 经过一次rebalance之后, Consumer订阅的队列逻辑有可能成为这样的:
在这里插入图片描述
此时由于订阅关系的一致性, 整体系统并不会出现问题. 接下来看一种情况, 同一个消费组中的Consumer1 订阅了Topic1, Consumer2订阅了Topic2, 初始情况逻辑关系是这样:
在这里插入图片描述
由于进行rebalance是通过Group获取对应的消费者客户端ID, 因此rebalance之后可能出现Consumer1 指向了Topic2中的某一个队列, 同理, Consumer2指向了Topic1中的队列. 但是这与Consumer中设定的topic不一致, 因此会出现RocketMQ中消息状态为为NOT_COMSUME_YET

(个人通过对源码的简单梳理总结的文章, 如有错误欢迎指正)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1155677.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

把Qt6.2.4内置的标签打印了一遍

2023年10月31日&#xff0c;周二晚上 #include <QGridLayout> #include <QPushButton> #include <QLabel> #include <QApplication> #include <QStyle>int main(int argc, char *argv[]) {QApplication a(argc, argv);QWidget widget;widget.set…

SpringBoot -- 请求数据多态映射(jackson)

有些情况下&#xff0c;服务端提供了一个抽象类及其多个实现类&#xff0c;当前端传递 json 数据到后端时&#xff0c;我们希望映射得到的对象数据是根据某个特征区分开的具体的实现类对象。 文章目录 实现方式示例抽象类对象若干实现类测试接口及前端传递请求体接参结果 JsonT…

JavaScript的高级概述

还记得我们刚刚开始的时候给JavaScript的定义吗&#xff1f; JavaScript是一种高级的&#xff0c;面向对象的&#xff0c;多范式变成语言&#xff01; 这种定义JavaScript只是冰山一角&#xff01; JavaScript的高级定义 JavaScript是一种高级的、基于原型的、面向对象、多范…

node使用fs模块(二)—— 读取文件的基本使用(普通读取、同步读取等、流式读取)

文章目录 一、读取文件1. 参数说明2. 基本使用3.读取文件的同步和异步 二、流式文件写入&#xff08;fs.appendFile&#xff09;1. 参数说明2.基本使用 一、读取文件 1. 参数说明 参数1&#xff1a; path——读取的文件路径&#xff08;必填&#xff09; 参数2&#xff1a; op…

怎么在电脑桌面上添加待办事项?

在电脑桌面上选择一款待办事项工具&#xff0c;可以高效率地督促各项任务的按时完成&#xff0c;大大地提高工作的效率&#xff0c;支持在电脑上安装待办事项的工具类型是比较多的&#xff0c;为更好的辅助日常办公&#xff0c;建议大家可以选择高效率辅助办公的电脑便签工具&a…

leetcode:374. 猜数字大小(二分查找)

一、题目 函数原型&#xff1a;int guessNumber(int n) 二、思路 本题其实就是从 1 - n 中找出所要的答案。利用guess函数来判断数字是否符合答案。 答案小于当前数字&#xff0c;guess函数返回-1 答案等于当前数字&#xff0c;guess函数返回0 答案大于当前数字&#xff0c;gue…

会自动写代码的AI大模型来了!阿里云推出智能编码助手通义灵码

用大模型写代码是什么样的体验&#xff1f;10月31日&#xff0c;杭州云栖大会上&#xff0c;阿里云对外展示了一款可自动编写代码的 AI 助手&#xff0c;在编码软件的对话窗口输入“帮我用 python 写一个飞机游戏”&#xff0c;短短几秒&#xff0c;这款名为“通义灵码”的 AI …

10月31日星期二今日早报简报微语报早读

10月31日星期二&#xff0c;农历九月十七&#xff0c;早报微语早读分享。 1、广西官宣&#xff1a;做试管婴儿费用可报销&#xff1b; 2、港媒&#xff1a;4名港大学生承认“煽惑他人蓄意伤人罪”&#xff0c;被判监禁2年&#xff1b; 3、331名中国维和官兵全部获联合国勋章…

广告电商——边看广告边赚钱,实现三方共赢的局面

购物能赚钱&#xff1f;你相信嘛&#xff1f;广告电商模式边购物边赚钱&#xff0c;每天看看广告就能赚钱&#xff0c;白嫖 零撸 我之前有个客户就用了这种模式&#xff0c;购物的积分&#xff0c;然后用户看广告释放积分来赚钱&#xff0c;因为广告都是我们对接好了的一些头部…

【SOC基础】单片机学习案例汇总 Part2:蜂鸣器、数码管显示

&#x1f4e2;&#xff1a;如果你也对机器人、人工智能感兴趣&#xff0c;看来我们志同道合✨ &#x1f4e2;&#xff1a;不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 &#x1f4e2;&#xff1a;文章若有幸对你有帮助&#xff0c;可点赞 &#x1f44d;…

FreeRTOS深入教程(队列内部机制和源码分析)

文章目录 前言一、队列结构体分析二、创建队列三、读写队列源码分析1.读队列源码分析2.写队列源码分析 总结 前言 本篇文章主要来为大家分析队列的内部机制和源码实现。 一、队列结构体分析 在FreeRTOS中队列会使用一个结构体来表示&#xff1a; 1.int8_t * pcHead 和 int…

MySQL数据库的存储引擎,底层存储结构,事物隔离级别,索引,日志等

存储引擎 存储引擎就是存储数据、建立索引、更新/查询数据等技术的实现方式。存储引擎是基于表而不是基于库的&#xff0c;所以存储引擎也可以被称为表引擎。 默认存储引擎是InnoDB。 InnoDB 在 MySQL 5.5 之后&#xff0c;InnoDB 是默认的 MySQL 引擎。 1.支持事务 2.行级锁…

YOLOv5/YOLOv7改进: AIFI (尺度内特征交互)助力YOLO | YOLO终结者?RT-DETR一探究竟

💡💡💡本文全网首发独家改进: AIFI (尺度内特征交互)助力YOLO ,提升尺度内和尺度间特征交互能力,同时降低多个尺度的特征之间进行注意力运算,计算消耗较大等问题 推荐指数:五星 AIFI | 亲测在多个数据集能够实现涨点 💡💡💡Yolov5/Yolov7魔术师,独家首…

Windows11无法打开Photoshop CC 2017问题解决

情况描述&#xff1a; Windows11上&#xff0c;双击Photoshop CC 2017没反应 解决办法&#xff1a; 此时需要启动Windows的“事件查看器”来确认问题出在哪里。可以直接通过开始菜单搜索启动&#xff0c;也可以通过右键点击“此电脑”->“管理”&#xff0c;然后找到事件查…

解密杭州亚运背后科技:核心系统100%上云,20多项全球首创智能应用

10月31日&#xff0c;2023杭州云栖大会&#xff0c;杭州亚运会信息技术中心执行指挥长、杭州亚组委广播电视和信息技术部副部长张鸽以《科技创新在亚运舞台精彩绽放》为主题&#xff0c;分享了杭州亚运会的智能亚运实践。 杭州亚运会打造了史上首个全覆盖的数字化服务体系&…

港府Web3宣言周年思考:合规困境中的“隐患”

出品&#xff5c;欧科云链研究院 作者&#xff5c;毕良寰 距离《有关虚拟资产在港发展的政策宣言》已过去一年&#xff0c;我们欧科云链研究院在分析全球几个主要国家和地区对Web3的监管政策及态度后&#xff0c;对港府的雄心壮志充满期待。然而&#xff0c;由于近期一些庞氏骗…

学习redis之前的泛泛而谈(特性介绍,应用场景,Ubuntu安装与通用命令介绍)

文章目录 前言关于分布式系统Redis特性Redis应用场景Redis5安装redis命令最核心的两个命令&#xff1a;get和setkeysexitsdelexpirettlredis中key的过期策略type redis数据类型的内部实现方式redis的单线程 前言 redis最重要的概念&#xff1a;在内存中存储数据 为什么要设计一…

MySQL(4):运算符

算术运算符 算术运算符主要用于数学运算&#xff0c;其可以连接运算符前后的两个数值或表达式&#xff0c;对数值或表达式进行加&#xff08;&#xff09;、减&#xff08;-&#xff09;、乘&#xff08;*&#xff09;、除&#xff08;/&#xff09;和取模&#xff08;%&#…

在uni-app中使用ECharts - 配置四种不同的图表

&#x1f468;‍&#x1f9b0;博主&#xff1a;小猫娃来啦 &#x1f468;‍&#x1f9b0;文章核心&#xff1a;在uni-app中使用ECharts - 配置四种不同的图表 文章目录 前言安装ECharts插件引入ECharts库创建Charts实例和图表容器配置和渲染图表配置柱状图配置折线图配置饼图配…

详解final, abstract, interface关键字

一.final关键字 1.final关键字介绍 ——final关键字可以去修饰类、方法、属性和局部变量 2.final关键字的作用 1&#xff09;final修饰类&#xff0c;这个类不能被其他类继承 2&#xff09;final修饰方法&#xff0c;方法不能被重写 3&#xff09;final修饰属性&#xff0c;属…