rabbitMQ 消息顺序性、消息幂等性、消息不丢失、最终一致性、补偿机制、消息队列设计

news2025/1/21 10:13:05

一、消息顺序性

消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。

举例:
  比如通过mysql binlog进行两个数据库的数据同步,由于对数据库的数据操作是具有顺序性的,如果操作顺序搞反,就会造成不可估量的错误。比如数据库对一条数据依次进行了 插入->更新->删除操作,这个顺序必须是这样,如果在同步过程中,消息的顺序变成了删除->插入->更新,那么原本应该被删除的数据,就没有被删除,造成数据的不一致问题。

RabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序、队列中消息的顺序、消费消息的顺序。

发送消息的顺序

消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。

队列中消息的顺序

RabbitMQ中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由Rabbitmq保证,通常也不需要开发关心。

提示:不同队列中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。

消费消息的顺序

我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,开篇的示意图已经说明,虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。

例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。

生产者 通过 channel 把消息 通过 exchange 路由到对一个的quque 的 过程中,MQ 本身保证消息的有序性,quque 是有序的,在业务上只要保证生产者发送到mq上的消息是有序的,那么MQ ,quque 就能保证生产者发送到消息的有序性;但是生产者保证了消息的有序性并不能保证消费者消费到的消息就是有序的这主要体现在以下两点:

  • 1.一个quque 上有多个consumer,由于每个消费者处理消息的快慢不一样,因此并不能保证每个consumer都顺序消费消息,保证消息被消费者顺序消费入库;
  • 2.一个quque上只有一个consumer,但是这个consumer 是多线程异步处理,因此并不能保证这个consumer消费消息的处理是顺序处理;

出现顺序错乱的场景

错乱场景一

①一个queue,有多个consumer去消费,这样就会造成顺序的错误,consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
在这里插入图片描述

错乱场景二

一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误。
在这里插入图片描述

保证消息的消费顺序

解决方案一(解决消费顺序的问题,通常就是一个队列只有一个消费者)

拆分成多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式取消费。保证一个quque 只有一个consumer 这样便保证了消费者消费MQ 消息的有序性;这样就可以一个个消息按顺序处理,缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。
在这里插入图片描述

提示:如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。

解决方案二(解决一个quque一个consumer异步处理的顺序问题)

或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理

在这里插入图片描述

二 、消息消费的幂等性(保证消息不被重复消息)

消息幂等的场景

业务场景1:假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。

场景2:如果消费者每一次接收生产者的消息都成功了,只是在响应或者调用API的时候出了问题,会不会出现消息的重复处理?例如:存款100元,ATM重发了5次,核心系统一共处理了6次,余额增加了600元。
场景3:生产者、消费者手动确认消息;
生产者在消息发送消息后。再收到mq的确认后,还未更改数据发送状态结果挂掉了,导致消息的重复发送;
消费者,在消费消息后,还未给mq发送ack确认标志,消费者挂掉了,导致mq会将消息重复推送给消费者

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。从业务上来说重复调用多次产生的业务结果与调用一次产生的业务结果相同;

为了避免相同消息的重复处理,必须要采取一定的措施。RabbitMQ服务端是没有这种控制的(同一批的消息有个递增的DeliveryTag),它不知道你是不是就要把一条消息发送两次,只能在消费端控制。

产生消费重复的原因

如何避免消息的重复消费?消息出现重复可能会有两个原因:

  • 生产者问题,环节①重复发送消息,比如在开启了Confirm模式但未收到确认,生产者重新发送消息;或者生产者在消息发送消息后。再收到mq的确认后,还未更改数据发送状态结果挂掉了,导致消息的重复发送
  • 消费者问题,环节④出了问题,由于消费者未发送ACK或者其他原因,消息重复投递
    对于重复发送的消息,可以对每一条消息生成一个唯一的业务ID,通过日志或者消息落库来做重复控制。

消息幂等的解决方案

对于重复发送的消息,可以对每一条消息生成一个唯一的业务ID,通过日志或者消息落库来做重复控制。或者利用redis、mysql等中间工具的特性解决幂等性问题

1、保证生产者者发送到mq的消息幂等性

2、保证消费者消费mq消息的幂等性

常用解决方案

  1. 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  2. 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  3. 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  4. 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据;

三、消息不丢失

在这里插入图片描述

消息丢失原因场景以及解决方案

1 、生产者发送消息至MQ的数据丢失

生产者发送消息,自动ack,或者发生发生网络异常导致,未做重发处理,导致推送mq的消息丢失;
然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了;

解决方法:在生产者端开启comfirm 确认模式,你每次写的消息都会分配一个唯一的 id,如果发生异常的情况下,做好消息的重发机制

2、MQ挂掉导致消息丢失

MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失;

如exchange、quque 未设置消息的持久化,再消费者消息未消费或者
未确认的情况下导致消息丢失

解决方式:MQ设置为持久化。将内存数据持久化到磁盘中

3,消费者自动确认ack下的消息丢失

消费者刚拿到消息,先给mq 发送 ack确认标志 ,再处理业务,解过还未处理业务挂掉了或发生异常

解决方式:用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

四、消息的最终一致性

消息的最终一致性需要结合消息的生产者、消费者的、mq的消息不丢失一块考虑

消息一致性的场景

如果确实是消费者宕机了,或者代码出现了BUG导致无法正常消费,在我们尝试多次重发以后,消息最终也没有得到处理,怎么办?

例如存款的场景,客户的钱已经被吞了,但是余额没有增加,这个时候银行出现了长款,应该怎么处理?如果客户没有主动通知银行,这个问题是怎么发现的?银行最终怎么把这个账务做平?

在我们的金融系统中,都会有双方对账或者多方对账的操作,通常是在一天的业务结束之后,第二天营业之前。我们会约定一个标准,比如ATM跟核心系统对账,肯定是以核心系统为准。ATMC获取到核心的对账文件,然后解析,登记成数据,然后跟自己记录的流水比较,找出核心有ATM没有,或者ATM有核心没有,或者两边都有但是金额不一致的数据。

对账之后,我们再手工平账。比如取款记了账但是没吐钞的,做一笔冲正。存款吞了钞没记账的,要么把钱退给客户,要么补一笔账

解决方案: 消息补偿机制

由于生产者与消费者完全隔离,即使消费者没有接收到消息,或者消费时出现异常,生产者也是完全不知情的。所以生产者最终确定消费者有没有消费成功有两种通信方式:
1. 消费者收到消息,处理完毕后,调用生产者的API
例如:提单系统给其他系统发送了碎屏保消息后,其他系统必须在处理完消息后调用提单系统提供的API,来修改提单系统中数据的状态。只要API没有被调用,数据状态没有被修改,提单系统就认为下游系统没有收到这条消息。
2. 消费者收到消息,处理完毕后,发送一条响应消息给生产者
例如:商业银行与人民银行二代支付通信,无论是人行收到了商业银行的消息,还是商业银行收到了人行的消息,都必须发送一条响应消息(叫做回执报文)。
在这里插入图片描述

重试机制的实现方案

无论采用哪种回调方式,如果生产者的API就是没有被调用,也没有收到消费者的响应消息,怎么办?可能是消费者处理时间太长或者网络超时。

  1. 生产者与消费者之间应该约定一个超时时间,比如5分钟,对于超出这个时间没有得到响应的消息,可以设置一个定时重发的机制
  2. 重发可以通过消息落库+定时任务来实现。要控制发送间隔和次数,比如每隔2分钟发送一次,最多重发3次,否则会造成消息堆积

重发的消息,随具体场景的变化而变化,不能在Producer写死
参考:ATM机上运行的系统叫C端(ATMC)。前置系统叫P端(ATMC),它接收ATMC的消息,再转发给卡系统或者核心系统。

  • 如果客户存款,没有收到核心系统的应答,不知道有没有记账成功,最多发送5次存款确认报文,因为已经吞钞了,所以要保证成功;
  • 如果客户取款,ATMC未得到应答时,最多发送5次存款冲正报文。因为没有吐钞,所以要保证失败。

五、消息积压处理

消息不积压需要总体上保持消费者的消息消费速率rate 大于生产者的生产速率rate,这样在设计上不会出现消息积压;消息积压处理需要考虑消息的幂等性,保证消息不被重复消费

消息堆积的判定

1.在消费者未消费到消息、或者消费收到的消息延迟比较大的情况下需要消息是否积压;
在rmq中可以通过rmq的管理界面查看消费者的消息消费情况

消息积压的原因场景

如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百分消息持续积压几个小时,说说怎么解决。

这些问法,本质上都是针对场景,都是说可能你的消息端出来问题,不消费了。或者消费的及其满。接着就坑爹了。可能你的消息队列集群的磁盘都快满了。都没人消费,这个时候怎么办?或者是积压了几个小时,怎么办?或者是积压时间太长了,导致比如RabbitMQ设置了过期时间后就没了。其实这事,线上挺常见的,一般不出,一出就是大case。举个例子,消费端每次消费之后要写mysql,结果mysql挂了,消费端不动了,或者是消费端出了什么叉子,导致消费速度灰常慢。

1、快速处理积压大量积压的数据方案设计

几千万条数据在MQ里,积压了七八个小时。这个时候就是恢复consumer的问题。让它恢复消费速度,然后傻傻地等待几个小时消费完毕。这个肯定不能再面试的时候说。1个消费者1秒时1000条,1秒3个消费者是3000条。1分钟是18万条。1个小时是1000多万条。如果积压了上万条数据,即使消费者恢复了,也大概需要1个多小时才能恢复过来。

在这里插入图片描述

原来3个消费者1个小时。现在30个消费者,需要10分钟搞定。

一般情况下,这个时候只能做临时扩容了。具体操作步骤和思路如下:
① 先修改consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。
② 新建1个topic,partition是原来的10倍,临时建立好原来10倍或者20倍的Queue。
③ 然后写一个临时的分发数据的consumer程序,这个程序部署上去,消费积压的数据。消费之后,不做耗时的处理。直接均匀轮训写入临时建立好的10倍数量的Queue。
④ 接着征用10倍的机器来部署consume。每一批consumer消费1个临时的queue。
⑤ 这种做法,相当于将queue资源和consume资源扩大10倍,以10倍的速度来消费数据。
⑥ 等快速消费完积压数据之后,恢复原来的部署架构,重新用原先的consumer来消费消息。

(2)过期失效了怎么办

过期失效就是TTL。如果消息在Queue中积压超过一定的时间就会被RabbitMQ给清理掉。这个数据就没了。这就不是数据积压MQ中了,而是大量的数据会直接搞丢。

在这种情况下,增加consume消费积压就不起作用了。此时,只能将丢失的那批数据,写个临时的程序,一点一点查出来,然后再灌入MQ中,把白天丢失的数据补回来。

六、设计消息队列中间件

如何让你来设计消息队列中间件,如何设计?

主要考察两块。

  • ① 是有没有对某个消息队列做过较为深入的原理的了解。或者从整体把握一个mq的架构原理。
  • ②是看看你的设计能力,给你一个常见的系统,就是消息队列系统,看能够从全局把握一下整体架构设计的关键点。
比如说,这个消息队列,我们从以下几个方面来了解下:

   1、首先MQ得支持可伸缩性吧。就是需要的时候增加吞吐量和容量?

   2、其次,需要考虑一下MQ的数据是不是要持久化到磁盘

   3、再次,考虑一下MQ的可用性。

   4、最后,考虑一下能不能支持数据零丢失

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/40743.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

内存和函数

程序的内存布局 Linux默认情况下将高地址的1GB空间分配给内核,用户进程使用剩下2GB或者3GB的内存空间。在用户空间里,也有很多地址区间有特殊的地位,一般来讲,应用程序使用的内存空间里有如下"默认"的区域 1、栈&#…

疫情可视化part2

前言 这是疫情可视化最开始的文章,有需要了解的可前往查看:https://blog.csdn.net/xi1213/article/details/126824752。 本来说有时间就把这个项目完结了的,结果后面一直有事拖着,直到现在十一月份了才搞完。老样子,先…

F1. 生活在树上(easy version)树,dfs

题目链接 F1. 生活在树上(easy version) 题目背景 本题是 B 组的最后一题,是 F2 题的简单版本,两道题目的解法略有不同。本题和 F2 题在题意上的区别在于本题给定树上的边权,而不是点权。 小智生活在「传智国」&am…

汽车 Automotive > SOME/IP VS DDS调研和未来方向

参考:JASPAR, General incorporated association:What is the conqueror in the SOA platform for the future in-vehicle networks? 目录 SOME/IP介绍参考 DDS介绍 SOME/IP VS DDS 研究方向 SOME/IP介绍参考 汽车Automotive > SOME/…

MAC安全(防MAC泛洪攻击)

一、MAC地址表项分类: 1.1 动态表项:通过对帧内的源MAC进行学习而来,有老化时间 1.2 静态表项:由管理员手工配置,不会老化 1.3 黑洞表项:丢弃特定源MAC或目的MAC,不会老化 静态和黑洞表项不会被动态表项…

类与对象(下篇)

类与对象(下)再谈构造函数回顾构造函数初始化列表explicit 关键字拷贝构造函数也具有初始化列表友元 friend友元函数输入输出流的重载友元类static 成员内部类再谈构造函数 回顾构造函数 在上一篇博客中提到了构造函数,构造函数其主要目的是…

类与对象(中篇)

类中六个默认成员函数构造函数基本概念构造函数特性析构函数基本概念析构函数特性拷贝构造函数基本概念拷贝构造函数特性赋值运算符重载概念引入运算符重载函数的特性部分运算符的重载函数判等赋值前置 、前置--后置、后置--const 成员函数取地址只要生成一个类 ,那…

iOS_Custom Transition Animation 自定义转场动画

文章目录1、push-pop 动画协议2、present-dismiss 动画协议3、实现转场动画协议3.1 动画时长3.2 push or present animation (显示动画)3.3 动画结束3.4 pop or dismiss animation (消失动画)4、UIPresentationController4.1 设置presentVC的frame4.2 present 动画4.3 dismiss …

Docker快速安装Oracle 12c

【Oracle系列3】Docker快速安装Oracle 12c 背景 现在还很多企业用12c,以这个版本为例,介绍docker快速启动Oracle并做实验 步骤 1、docker环境的安装(略) 2、查询镜像,挑选镜像 docker search oracle结果 StoneM…

阿里P8架构师都在学习参考的SpringCloud微服务实战文档

我一直在使用Spring Boot、Spring Data等框架来进行开发工作。 作为一名Spring系列的忠实粉丝,我自然希望能够有更多的开发者参与进来,于是自己坚持写Spring Cloud相关的文章,并且将文章涉及的代码整理后放在GitHub上分享。 这使我得到了很…

【Hack The Box】Linux练习-- Luanne

HTB 学习笔记 【Hack The Box】Linux练习-- Luanne 🔥系列专栏:Hack The Box 🎉欢迎关注🔎点赞👍收藏⭐️留言📝 📆首发时间:🌴2022年11月24日🌴 &#x1f3…

零基础搭建基于知识图谱的电影问答系统

零基础搭建基于知识图谱的电影问答系统一、项目准备二、项目数据三、训练问题分类器四、准备问答模板五、搭建webapp六、问题预处理一、项目准备 首先需要一款python编译器,本人选用的是PyCharm,搭建好Python环境;安装第三方依赖库&#xff…

【Hack The Box】linux练习-- Delivery

HTB 学习笔记 【Hack The Box】linux练习-- Delivery 🔥系列专栏:Hack The Box 🎉欢迎关注🔎点赞👍收藏⭐️留言📝 📆首发时间:🌴2022年11月17日🌴 &#x1…

黄佳《零基础学机器学习》chap1笔记

黄佳 《零基础学机器学习》 chap1笔记 这本书实在是让我眼前一亮!!! 感觉写的真的太棒了! 文章目录黄佳 《零基础学机器学习》 chap1笔记第1课 机器学习快速上手路径—— 唯有实战1.1 机器学习族谱1.2 云环境入门实践:…

ERD Online 4.0.4 元数据在线建模(免费、私有部署)

❝ fix(erd): 修改表名、模块名自定义提示fix(erd): 支持自定义表名显示格式fix(erd): 升级ant到5.0.1版本fix(erd): 修复PDMan导入类型列为空fix(erd): 增加类型列宽度,避免类型显示不全fix(erd): 修复表设计报undefine异常fix(erd): 修复版本比对,出现…

二分搜索算法框架解析

文章目录 一、寻找一个数(基本的二分搜索)二、寻找左侧边界的二分搜索三、寻找右侧边界的二分查找总结 一、寻找一个数(基本的二分搜索) 这个场景是最简单的,可能也是大家最熟悉的,即搜索一个数&#xf…

2023年天津财经大学珠江学院专升本经济学专业课考试大纲

天津财经大学珠江学院2023年高职升本科专业课考试《经济学》考试大纲一、本大纲系天津财经大学珠江学院2023年高职升本科《经济学》课程考试大纲。所列考试范围出自郑健壮、王培才主编的教材《经济学基础(第二版)》,清华大学出版社&#xff0…

win10通过Docker搭建LNMP环境全流程

win10通过Docker搭建LNMP环境全流程 下载安装docker desktop 由于博主环境已经安装好了,一些异常设置,暂且略过 根据官方教程下载docker desktop执行文件.exe 注意尽量不要把docker安装到C盘,除非你的C盘很大,具体可以参考文章 …

初识 Spring 框架

文章目录一、Spring 介绍二、Spring 下载安装三、编写入门程序1.项目文件构架2.引入相关依赖3.创建实体类4.Spring 配置文件5.编写测试类四、控制反转与依赖注入1.控制反转概念2.依赖注入概念3.依赖注入的类型4.依赖注入的应用一、Spring 介绍 Spring 是由 Rod Johnson 组织和…

计算机毕设题目设计与实现(论文+源码)_kaic

毕业设计(论文)题目 高校图书馆座位预约选座微信小程序设计与实现 基于防火墙的访问控制系统的设计与实现 基于区块链的农产品追溯系统设计与实现 学生公寓楼改造布线系统规划与设计 智能家居网络设计与实现“互联网”农村精准扶贫共享平台的设计与实现“智慧健康少儿成长平台”…