消息中间件之RocketMQ(三)

news2025/1/18 2:11:23

常见问题

1.重复消费

  • 产生的原因是发送消息时采用了多数分布式消息中间件产品提供的最少一次(at least once)的投递保障,对于这个问题最常见的解决方案,就是消息消费端实现业务幂等,只要保持幂等性,不管来多少条重复消息,最后处理的结果都是一样的
  • 保障策略有at most once 最多消费一次, at least once 最少消费一次, exactly once 刚好一次,RocketMQ不支持exactly once只有一次的模式,因为要在分布式系统下实现发送不重复并且消费不重复,将会产生非常大的开销,RocketMQ为了追求高性能并没有支持此特性
    其实该问题的本质时网络调用存在不确定性,即既不成功也不失败的第三种状态,所以才会产生消息重复的问题

2.为什么RocketMQ不用ZooKeeper而要自己实现一个NameServer来注册?

为什么要RocketMQ自己实现注册中心,而不是用Zookeeper,Nacos?

3.Consumer分组有什么用? Producer分组的作用?

  • Producer Group
  • 生产者组(ProducerGroup)是一类生产者地集合,这类生产者通常发送一类消息并且发送逻辑一致,所以将这些生产者分组在一起,从部署结构上看,生产者通过生产者组的名字来标识自己是一个集群
  • 事务消息回查机制,可以使用到,同一个生产者组的生产消息逻辑是相同的,所以当事务消息向Broker提交本地事务不成功时,有可能是在执行完本地事务之后宕机的,那么Broker只需要向同一个Producer Group中的任意一个Producer调用事务回查就可以获取到本地事务的执行结果
  • Consumer Group
  • 消费者组(ConsumerGroup)是一类消费者的组合,这类消费者通常消费同一类消息并且消费逻辑一致,所以将这些消费者分组在一起,消费者组于生产者组类似,都是将相同角色的消费者分组在一起并命名的。分组是一个很精妙的概念设计,RocketMQ正是通过这种分组机制,实现了天然的消息负载均衡。
    消费消息时,通过消费者组实现了将消息分发到多个消费者服务器实例,比如某个主题由9条消息,其中,一个消费者组由3个实例(3个进程或者3台机器),那么每个实例将均摊3条消息,也就意味着我们可以很方便地通过增加机器来实现水平扩容

4.哪些环节会有丢消息的可能

在这里插入图片描述
这4个环节都有丢消息的可能

5.RocketMQ消息零丢失方案

1.生产者使用事务消息机制保证消息零丢失

在这里插入图片描述

1.1.1.为什么要发送这个half消息?有什么用

这个Half消息时在订单系统进行下单操作前发送,并且对下游服务的消费者是不可见的。
那这个消息的作用更多的体现在确认RocketMQ的服务是否正常。相当于嗅探下RocketMQ
服务是否正常,并且通知RocketMQ,我马上就要发一个很重要的消息了,你做好准备

1.1.2.half消息如果写入失败了怎么办?

如果没有half消息这个流程,那我们通常是会在订单系统中先完成下单,再发送消息给MQ.
这时候写入消息到MQ如果失败就会非常尴尬了,而Half消息如果写入失败,我们就可以认为MQ的服务,是有问题的,这时就不能通知下游服务了,我们可以在下单时给订单一个状态标记,然后等待MQ服务,正常后再进行补偿操作,等MQ服务正常后重新下单通知下游服务

1.1.3.订单系统写数据库失败了怎么办?

这个问题我们同样比较下没有使用事务消息机制时会怎么办?如果没有使用事务消息,我们只能判断
下单失败,抛出了异常,那就不往MQ发消息了,这样至少保证不会对下游服务进行错误的通知。但是这样的话,如果过一段时间数据库恢复过来了,这个消息就无法再次发送了,当然也可以设计另外的补偿机制,例如将订单数据缓存起来,再启动一个线程定时尝试往数据库写。如果使用事务机制,就可以有一种更优雅的方案,如果下单时,写数据库失败了(可能是数据库崩了,需要等待
一段时间才能恢复),那我们可以另外找个地方把订单消息先缓存起来(Redis、文本或者其他方式),
然后给RocketMQ返回一个UNKNOWN状态,这样RocketMQ就会过一段时间来回查事务状态,我们就可以在回查事务状态时再尝试把订单数据写入数据库,如果数据库这时候已经恢复了,那就能完成正常的下单,再继续后面的业务。这样这个订单的消息就不会因为数据库临时崩了而丢失

1.1.4.Half消息写入成功后RocketMQ挂了怎么办?

在事务消息的处理机制中,未知状态的事务状态回查是由RocketMQ的Broker主动发起的,也就是如果出现了这种情况,那RocketMQ就不会回调到事务消息中回查事务状态的服务,这时,我们就可以将订单一直标记为"新下单"的状态。而等RocketMQ恢复后,只要存储的消息
没有丢失,RocketMQ就会再次继续状态回查的流程

1.1.5.下单成功后如何优雅地等待支付成功?
  • 数据库方案.
    在订单场景下,通常会要求下单完成后,客户在一定时间内,例如10分钟内完成订单支付,支付完成
    后才会通知下游服务进行进一步地营销补偿?
    如果不适用事务消息,那通常会怎么办?
    最简单地方式是启动一个定时任务,每隔一段时间扫描订单表,比对未支付的订单的下单时间,将超时的
    订单进行回收,这种方式显然有很大问题的,需要定时扫描很庞大的一个订单信息,这对系统是个不小
    的压力。

  • 延迟消息方案.
    更进一步的方案是什么呢?是不是就可以使用RocketMQ提供的延迟消息机制,往MQ发一个延迟一分钟的
    消息,消费到这个消息后去检查订单的支付状态,如果订单已经支付,就往下游发送下单的通知,而如果
    没有支付,就再发一个延迟1分钟的消息,最终在第是个消息时把订单回收,这个方案就不用对全部的订单
    表进行扫描,而只需要每次处理一个单独的订单消息

  • 事务消息方案.
    利用事务消息的状态回查机制来替代定时的任务。在下单时,给Broker返回一个UNKNOWN的位置状态。
    而在状态回查的方法中去查询订单的支付状态。这样整个业务逻辑就会简单很多。只需要配置RocketMQ的
    事务消息回查次数(默认15此)和事务回查间隔时间(messageDelayLevel)就可以更优雅的完成这个支付状态
    检查的需求

1.1.6.事务消息机制的作用

整体来说,在订单这个场景下,消息不丢失的问题实际上就还是转化成了下单这个业务与下游服务
的业务的分布式事务一致性问题,而事务一致性问题一直依赖都是一个非常复杂的问题。而RocketMQ
的事务消息机制,实际上只保证了整个事务消息的一半,他保证的是订单系统下单和发消息这两个事件
的事务一致性,而对下游服务的事务并没有保证,但是即便如此,也是分布式事务的一个很好的降级方案,
目前来看,也是业内最好的降级方案

2.RocketMQ配置同步刷盘+(Dledger)Broker主从架构保证MQ主从同步时不会丢消息
2.1.1.同步刷盘

可以简单的把RocketMQ的刷盘方式flushDiskType配置成同步刷盘就可以保证消息在刷盘过程中不会丢失了

2.1.2.Dledger的文件同步

在这里插入图片描述

  • 在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过两阶段提交的方式抱着呢个文件在主从之间成功同步
  • 简单来说,数据同步会通过两个阶段,一个是uncommited阶段,一个是commiitted阶段Leader Broker上的Dledger收到一条消息后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发送给Follower Broker的DledgerServer组件。接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger,如果Leader Broker收到超过半数的Follower Broker返回的ack之后,
    就会把消息标记为committed状态,再接下来,Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让它们把消息也标记为committed状态,这样,就基于Raft协议完成了两阶段的数据同步
2.1.3.消费者端不要使用异步消费机制
  • 正常情况下,消费者端都是需要先处理本地事务,然后再给MQ一个ACK相应,这时MQ
    就会修改Offset,将消息标记为已消费,从而不再往其他消费者推送消息。所以在Broker的
    这种情况会造成服务端消息丢失,这种异步消费的方式,就有可能造成消息状态返回后消费者
    本地业务逻辑处理失败造成消息丢失的可能
2.1.4.RocketMQ特有的问题,NameServer挂了如何保证消息不丢失
  • NameServer在RocketMQ中,扮演一个路由中心的角色,提供到Broker的路由功能。
    但是其实这样的路由中心这样的功能,在所有的MQ中都是需要的,Kafka使用ZooKeeper
    和一个作为Controller的Broker一起来提供路由服务的,整个功能是相当复杂纠结的。而
    RabbitMQ是由每一个Broker来提供路由服务,只有RocketMQ把这个路由中心单独抽取了
    出来,并独立部署,每一个NameServer都是独立的,集群中任意多的节点挂掉,都不会影响
    它提供的路由功能,如果集群中所有的NameServer节点都挂了呢?
  • 有很多人就会认为生产者和消费者中都会有全部路由信息的缓存副本,那整个服务可以正常工作
    一段时间,当NameServer全部挂了后,胜场这和消费者是立即就无法工作了的
  • 回到消息不丢失的问题。在这种情况下,RocketMQ相当于整个服务都不可用了,那它本身肯定无法给我们
    保证消息不丢失了。我们只能自己设计一个降级方案来处理这个问题了。例如再订单系统中,如果多次尝试
    发送RocketMQ不成功,那就只能另找地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程
    定时地扫描这些失败地订单消息,尝试往RocketMQ发送,这样等RocketMQ的服务恢复过来后,就能第一
    时间把这些消息重新发送出去。整个这套降级的机制,在大型互联网项目中,都是必须要有的
2.1.5.RocketMQ消息零丢失方案总结
  • 1.生产者使用事务消息机制
  • 2.Broker配置同步刷盘+Dledger主从架构
  • 3.消费者不要使用异步消费
  • 4.整个MQ挂了之后准备降级方案
  • 这套方案在各个环节都大量地降低了系统地处理性能以及吞吐量。在很多场景下,这套方案带来的性能损失
    的代价可能远大于部分消息丢失的代价。所以在使用这套方案时,要根据实际的业务情况来考虑,
    例如,如果针对所有服务器都在同一个机房的场景,完全可以把Broker配置成异步刷盘来提升吞吐量。
    而在有些对消息可靠性要求没有那么高的场景,在生产者端就可以采用其他一些更简单的方案来提升吞吐,
    而采用定时对账、补偿的机制来提高消息的可靠性。而如果消费者不需要进行消息存盘,
    那使用异步消费的机制带来的性能提升也是非常显著的。

3.使用RocketMQ如何保证消息顺序

3.1.为什么要保证消息有序?

比如,下单完之后,需要支付成功,才会进行物流快递,不能先让物流服务执行,再支付成功

3.2.如何保证消息有序?
  • 全局有序。整个MQ系统的所有消息岩哥按照队列先入先出顺序进行消费
  • 局部有序。只保证一部分关键消息的消费顺序
  • 首先我们需要分析下这个问题,在通常的业务场景中,全局有序和局部有序哪个更重要?其实在大部分的MQ业务场景,我们只需要保证局部有序就可以了,对于电商订单场景,只要保证一个订单的所有消息是有序的就可以了,全局消息的顺序并不会太关心
  • 落地到RocketMQ。通常情况下,发送者发送消息时,会通过MessageQueue轮询的方式
    保证消息尽量均匀地分布到所有的MessageQueue上,而消费者也就同样需要从多个MessageQueue
    上消费消息。而MessageQueue是RocketMQ存储消息的最小单元,它们之间的消息都是互相隔离的,
    在这种情况下,是无法保证消息全局有序的,而对于局部有序的要求,只需要将有序的一组消息都存入
    同一个MessageQueue里,这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。
    RocketMQ中,剋在发送者发送消息时指定一个MessageSelector对象,让这个对象来决定消息发到
    哪一个MessageQueue。这样就可以保证一组有序的消息能够发到同一个MessageQueue里。
  • 另外,通常所谓的保证Topic全局消息有序的方式,就是将Topic配置成只有一个MessageQueue队列(默认是4个)。这样天生就能保证消息全局有序,这种方式对整个Topic的消息吞吐影响是非常大的,如果这样用,基本上就没有用MQ的必要了

4.使用RocketMQ如何快速处理积压消息

4.1.1.如何确定RocketMQ有大量的消息积压?
  • 在正常情况下,使用MQ都会要尽量保证它的消息生产速度和消费速度整体上是平衡的,但是如果部分消费者系统出现故障,就会造成大量的消息积累。这类问题通常在实际工作中会出现得比较隐蔽。例如某一天一个数据库突然挂了,大家大概率就会集中处理数据库得问题,等好不容易把数据库恢复过来了,这时基于这个数据库服务得消费者程序就会积累大量的消息。或者网络波动等情况,也会导致消息大量的积累。这在一些大型的互联网项目中,消息积压的速度是相当恐怖的,所以消息积压是个需要时时关注的问题

  • 对于消息积压,如果是RocketMQ或者Kafka还好,它们的消息积压不会对性能造成
    很大的影响,而如果是RabbitMQ的话,那就不太好了,大量的消息积压可以瞬间造成
    性能直线下滑。对于RocketMQ来说,有个最简单的方式来确定消息是否有积压。那就是使用web控制台,就能直接看到消息的积压情况,另外也可以通过mqadmin指令在后台检查各个Topic的消息延迟情况,还可以在它的${sotrePathRootDir}/config目录下落地一系列
    的json文件,也可以用来跟踪消息积压情况

4.1.2.如何处理大量积压的消息
  • 如果Topic下的MessageQueue配置得是足够多的,那每个Consumer实际上会分配
    多个MessageQueue来进行消费。这个时候,就可以简单地通过增加Consumer的服务节点数量来加快消息的消费,等积压消息消费完了,再恢复成正常情况。最极限的情况是把Consumer的节点个数设置成MessageQueue的个数相同。但是如果此时再继续增加Consumer的服务节点就没有用了
  • 如果Topic下的MessageQueue配置不够多的话,那就不能用上面这种增加Consumer节点个数的方法了
    这时如果要快速处理积压的消息,可以创建一个新的Topic,配置足够的多MessageQueue.然后把所有
    消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,
    并转储到新的Topic中,这个速度是可以很快的,然后在新的Topic上,就可以通过增加消费者个数来提高
    消费速度了.之后再根据情况恢复成正常的情况

5.RocketMQ的消息轨迹

RocketMQ默认提供了消息轨迹的功能,这个功能在排查问题时是非常有用的

5.1.RocketMQ消息轨迹数据的关键属性
5.1.1 Producer
  • 生产实例信息
  • 发送消息时间
  • 消息是否发送成功
  • 发送耗时
5.1.2 Consumer
  • 消费实例信息
  • 投递时间,投递轮次
  • 消息是否消费成功
  • 消费耗时
5.1.3 Broker
  • 消息的Topic
  • 消息存储位置
  • 消息的Key值
  • 消息的Tag值
5.2.消息轨迹配置

broker.conf打开一个关键配置
traceTopicEnable=true,默认是false,关闭的

5.3 消息轨迹数据存储

默认情况下,消息轨迹数据是存于一个系统级别的Topic
RMQ_SYS_TRACE_TOPIC,这个Topic在Broker节点启动时
会自动创建出来,当然也可以自定义

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1410761.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

码多多ChatAI智能聊天系统-一款好用的代码编程助手

码多多ChatAI智能聊天系统可以作为一款智能编程助手,帮助程序员提高编程效率,降低开发成本。 产品介绍 码多多ChatAI智能聊天系统是一款基于人工智能技术的编程辅助工具,它通过深度学习算法和大数据分析,为程序员提供智能代码提…

权威的健康养生与医学基础知识科普学习信息汇总

目录 1 关于健康与食物营养的权威网址1.1 世界卫生组织(World Health Organization: WHO)1.2 美国国家卫生研究院 (National Institutes of Health: NIH)1.3 澳大利亚政府健康门户 (Healthdirect)1.4 国际食品信息委员会 (International Food Informatio…

如何使用宝塔面板配置Nginx反向代理WebSocket(wss)

本章教程,主要介绍一下在宝塔面板中如何配置websocket wss的具体过程。 目录 一、添加站点 二、申请证书 三、配置代理 1、增加配置内容 2、代理配置内容 三、注意事项 一、添加站点 二、申请证书 三、配置代理 1、增加配置内容 map $http_upgrade $connection_…

【GitHub项目推荐--不错的 Electron开源项目】【转载】

eDEX-UI:超炫酷终端工具 eDEX-UI 是一款跨平台基于 Electron 的炫酷终端工具。好莱坞级别的终端使用体验,拥有漂亮的启动动画、浮夸的音效,还能够直观地展示文件目录、系统资源、网络等信息。支持实时系统和网络监控、触摸式显示器&#xff…

AI Toolkit软件安装教程(附软件下载地址)

软件简介: 软件【下载地址】获取方式见文末。注:推荐使用,更贴合此安装方法! AI Toolkit是一款卓越的人工智能软件,专为企业和个人提供一体化的解决方案,助力其工作流程高效运转。该软件套件融合了多种顶…

【公务员】图形推理技巧

位置规律 图形特征: 图形组成相同,优先考虑位置规律。 详细技巧: 样式规律 图形特征: 图形组成相似,优先考虑样式规律。 详细技巧: 形式技巧加法1、直接叠加2、☆规则运算:外轮廓与分割区域…

OpenCV笔记之图像处理中遮罩和掩模的关系

OpenCV笔记之图像处理中遮罩和掩模的关系 code review 文章目录 OpenCV笔记之图像处理中遮罩和掩模的关系1.遮罩详解遮罩的创建遮罩的应用遮罩的主要应用遮罩的类型如何创建遮罩遮罩在图像处理中的应用方式 2.遮罩和掩模的关系 1.遮罩详解 在图像处理中,遮罩&#…

Linux/Doctor

Enumeration nmap 已知目标开放了22,80,8089端口,扫描详细情况如下 可以看到对外开放了22,80,8089三个端口 TCP/80 SSTI 访问80端口,有一个infodoctors.htb的电子邮件,点击其他的也没有什么反应,猜测有可能需要域名访问 在/et…

进程(三)进程间的切换、环境变量

文章目录 进程间的切换Linux2.6内核进程调度队列一个CPU拥有一个runqueue优先级活跃进程过期队列active指针和expired指针 环境变量基本概念常见环境变量查看环境变量的方法测试PATH测试HOME和环境变量相关的命令通过代码如何获取环境变量通过系统调用获取环境变量 进程间的切换…

错误票据-蓝桥杯

思路&#xff1a; 其实只是排序一下&#xff0c;然后遍历&#xff0c;如果两个值差2&#xff0c;则输出两个值的平均数&#xff0c;如果两个数差值为0 &#xff0c;那么则这个值就是重复的值 代码&#xff1a; #include <iostream> #include<vector> #include&l…

第二篇【传奇开心果短博文系列】鸿蒙开发技术点案例示例:添加组件和事件处理

传奇开心果短博文系列 系列短博文目录鸿蒙开发技术点案例示例短博文系列 短博文目录一、前言二、添加组件和事件处理示例代码三、补全其余组件事件处理示例代码 系列短博文目录 鸿蒙开发技术点案例示例短博文系列 短博文目录 一、前言 有一必然会有二&#xff0c;有了第一个…

Java项目:基于SSM框架实现同城蔬菜配送管理系统(SSM+B/S架构+源码+数据库+毕业论文)

一、项目简介 本项目是一套ssm825基于SSM框架实现同城蔬菜配送管理系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&…

小程序系列--12使用 npm 包

一、Vant Weapp 1. 什么是 Vant WeappVant Weapp 是有赞前端团队开源的一套小程序 UI 组件库&#xff0c;助力开发者快速搭建小程序应用。它所使用的是 MIT 开源许可协议&#xff0c;对商业使用比较友好。 官方文档地址 https://youzan.github.io/vant-weapp 2. 安装 Vant 组…

《WebKit 技术内幕》学习之六(1): CSS解释器和样式布局

《WebKit 技术内幕》之六&#xff08;1&#xff09;&#xff1a;CSS解释器和样式布局 CSS解释器和规则匹配处于DOM树建立之后&#xff0c;RenderObject树之前&#xff0c;CSS解释器解释后的结果会保存起来&#xff0c;然后RenderObject树基于该结果来进行规范匹配和布局计算。当…

Unity 组合模式(实例详解)

文章目录 示例1&#xff1a;Unity中的图形界面元素组合示例2&#xff1a;Unity中的游戏对象层级组合示例3&#xff1a;Unity中的场景图节点组合示例4&#xff1a;Unity中的场景管理组合示例5&#xff1a;Unity中的角色技能树组合 在Unity中&#xff0c;组合模式&#xff08;Com…

哈希的基本概念(开散列和闭散列)(附代码)

哈希 哈希概念哈希冲突哈希函数常见的哈希函数 哈希冲突的解决闭散列开散列 哈希概念 传统的查找函数&#xff0c;搜索的效率取决于比较的次数。而hash算法&#xff1a;在理想情况下&#xff0c;可以不经过任何比较&#xff0c;一次就能得到要搜索的结果。 存储结构&#xff1…

四、MyBatis 动态语句

本章概要 动态语句需求和简介if 和 where 标签set 标签trim 标签(了解)choose/when/otherwise 标签foreach 标签sql 片段 4.1 动态语句需求和简介 经常遇到很多按照很多查询条件进行查询的情况&#xff0c;比如智联招聘的职位搜索等。其中经常出现很多条件不取值的情况&#…

电脑监控系统:企业网络安全解决方案

在当今数字化的世界里&#xff0c;企业的网络安全已经成为一项至关重要的任务。电脑监控系统作为一种有效的解决方案&#xff0c;正在被越来越多的企业所采用。 电脑监控系统是一种集成了多种安全功能的综合性解决方案&#xff0c;旨在为企业提供全面的网络安全防护。该系统能够…

【操作系统】实验七 显示进程列表

&#x1f57a;作者&#xff1a; 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux &#x1f618;欢迎关注&#xff1a;&#x1f44d;点赞&#x1f64c;收藏✍️留言 &#x1f3c7;码字不易&#xff0c;你的&#x1f44d;点赞&#x1f64c;收藏❤️关注对我真的很重要&…

Java 报错java.Net.UnknownHostException:raw.githubusercontent.com

1.问题 今天在vscode 学习如何使用 plantUML生成图片的时候&#xff0c;发生错误 java.util.concurrent.ExecutionException: java.net.UnknownHostException: raw.githubusercontent.com issue raw.githubusercontent.com java.util.concurrent.ExecutionException: java.n…