一、消息顺序性
消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。
举例:
比如通过mysql binlog进行两个数据库的数据同步,由于对数据库的数据操作是具有顺序性的,如果操作顺序搞反,就会造成不可估量的错误。比如数据库对一条数据依次进行了 插入->更新->删除操作,这个顺序必须是这样,如果在同步过程中,消息的顺序变成了删除->插入->更新,那么原本应该被删除的数据,就没有被删除,造成数据的不一致问题。
RabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序、队列中消息的顺序、消费消息的顺序。
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。
队列中消息的顺序
RabbitMQ中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由Rabbitmq保证,通常也不需要开发关心。
提示:不同队列中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。
消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,开篇的示意图已经说明,虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
生产者 通过 channel 把消息 通过 exchange 路由到对一个的quque 的 过程中,MQ 本身保证消息的有序性,quque 是有序的,在业务上只要保证生产者发送到mq上的消息是有序的,那么MQ ,quque 就能保证生产者发送到消息的有序性;但是生产者保证了消息的有序性并不能保证消费者消费到的消息就是有序的这主要体现在以下两点:
- 1.一个quque 上有多个consumer,由于每个消费者处理消息的快慢不一样,因此并不能保证每个consumer都顺序消费消息,保证消息被消费者顺序消费入库;
- 2.一个quque上只有一个consumer,但是这个consumer 是多线程异步处理,因此并不能保证这个consumer消费消息的处理是顺序处理;
出现顺序错乱的场景
错乱场景一
①一个queue,有多个consumer去消费,这样就会造成顺序的错误,consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
错乱场景二
一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误。
保证消息的消费顺序
解决方案一(解决消费顺序的问题,通常就是一个队列只有一个消费者)
拆分成多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式取消费。保证一个quque 只有一个consumer 这样便保证了消费者消费MQ 消息的有序性;这样就可以一个个消息按顺序处理,缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。
提示:如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。
解决方案二(解决一个quque一个consumer异步处理的顺序问题)
或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理
二 、消息消费的幂等性(保证消息不被重复消息)
消息幂等的场景
业务场景1:假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。
场景2:如果消费者每一次接收生产者的消息都成功了,只是在响应或者调用API的时候出了问题,会不会出现消息的重复处理?例如:存款100元,ATM重发了5次,核心系统一共处理了6次,余额增加了600元。
场景3:生产者、消费者手动确认消息;
生产者在消息发送消息后。再收到mq的确认后,还未更改数据发送状态结果挂掉了,导致消息的重复发送;
消费者,在消费消息后,还未给mq发送ack确认标志,消费者挂掉了,导致mq会将消息重复推送给消费者
一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。
幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。从业务上来说重复调用多次产生的业务结果与调用一次产生的业务结果相同;
为了避免相同消息的重复处理,必须要采取一定的措施。RabbitMQ服务端是没有这种控制的(同一批的消息有个递增的DeliveryTag),它不知道你是不是就要把一条消息发送两次,只能在消费端控制。
产生消费重复的原因
如何避免消息的重复消费?消息出现重复可能会有两个原因:
- 生产者问题,环节①重复发送消息,比如在开启了Confirm模式但未收到确认,生产者重新发送消息;或者生产者在消息发送消息后。再收到mq的确认后,还未更改数据发送状态结果挂掉了,导致消息的重复发送
- 消费者问题,环节④出了问题,由于消费者未发送ACK或者其他原因,消息重复投递
对于重复发送的消息,可以对每一条消息生成一个唯一的业务ID,通过日志或者消息落库来做重复控制。
消息幂等的解决方案
对于重复发送的消息,可以对每一条消息生成一个唯一的业务ID,通过日志或者消息落库来做重复控制。或者利用redis、mysql等中间工具的特性解决幂等性问题
1、保证生产者者发送到mq的消息幂等性
2、保证消费者消费mq消息的幂等性
常用解决方案
- 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
- 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
- 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
- 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据;
三、消息不丢失
消息丢失原因场景以及解决方案
1 、生产者发送消息至MQ的数据丢失
生产者发送消息,自动ack,或者发生发生网络异常导致,未做重发处理,导致推送mq的消息丢失;
然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了;
解决方法:在生产者端开启comfirm 确认模式,你每次写的消息都会分配一个唯一的 id,如果发生异常的情况下,做好消息的重发机制
2、MQ挂掉导致消息丢失
MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失;
如exchange、quque 未设置消息的持久化,再消费者消息未消费或者
未确认的情况下导致消息丢失
解决方式:MQ设置为持久化。将内存数据持久化到磁盘中
3,消费者自动确认ack下的消息丢失
消费者刚拿到消息,先给mq 发送 ack确认标志 ,再处理业务,解过还未处理业务挂掉了或发生异常
解决方式:用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
四、消息的最终一致性
消息的最终一致性需要结合消息的生产者、消费者的、mq的消息不丢失一块考虑
消息一致性的场景
如果确实是消费者宕机了,或者代码出现了BUG导致无法正常消费,在我们尝试多次重发以后,消息最终也没有得到处理,怎么办?
例如存款的场景,客户的钱已经被吞了,但是余额没有增加,这个时候银行出现了长款,应该怎么处理?如果客户没有主动通知银行,这个问题是怎么发现的?银行最终怎么把这个账务做平?
在我们的金融系统中,都会有双方对账或者多方对账的操作,通常是在一天的业务结束之后,第二天营业之前。我们会约定一个标准,比如ATM跟核心系统对账,肯定是以核心系统为准。ATMC获取到核心的对账文件,然后解析,登记成数据,然后跟自己记录的流水比较,找出核心有ATM没有,或者ATM有核心没有,或者两边都有但是金额不一致的数据。
对账之后,我们再手工平账。比如取款记了账但是没吐钞的,做一笔冲正。存款吞了钞没记账的,要么把钱退给客户,要么补一笔账
解决方案: 消息补偿机制
由于生产者与消费者完全隔离,即使消费者没有接收到消息,或者消费时出现异常,生产者也是完全不知情的。所以生产者最终确定消费者有没有消费成功有两种通信方式:
1. 消费者收到消息,处理完毕后,调用生产者的API
例如:提单系统给其他系统发送了碎屏保消息后,其他系统必须在处理完消息后调用提单系统提供的API,来修改提单系统中数据的状态。只要API没有被调用,数据状态没有被修改,提单系统就认为下游系统没有收到这条消息。
2. 消费者收到消息,处理完毕后,发送一条响应消息给生产者
例如:商业银行与人民银行二代支付通信,无论是人行收到了商业银行的消息,还是商业银行收到了人行的消息,都必须发送一条响应消息(叫做回执报文)。
重试机制的实现方案
无论采用哪种回调方式,如果生产者的API就是没有被调用,也没有收到消费者的响应消息,怎么办?可能是消费者处理时间太长或者网络超时。
- 生产者与消费者之间应该约定一个超时时间,比如5分钟,对于超出这个时间没有得到响应的消息,可以设置一个定时重发的机制
- 重发可以通过消息落库+定时任务来实现。要控制发送间隔和次数,比如每隔2分钟发送一次,最多重发3次,否则会造成消息堆积
重发的消息,随具体场景的变化而变化,不能在Producer写死
参考:ATM机上运行的系统叫C端(ATMC)。前置系统叫P端(ATMC),它接收ATMC的消息,再转发给卡系统或者核心系统。
- 如果客户存款,没有收到核心系统的应答,不知道有没有记账成功,最多发送5次存款确认报文,因为已经吞钞了,所以要保证成功;
- 如果客户取款,ATMC未得到应答时,最多发送5次存款冲正报文。因为没有吐钞,所以要保证失败。
五、消息积压处理
消息不积压需要总体上保持消费者的消息消费速率rate 大于生产者的生产速率rate,这样在设计上不会出现消息积压;消息积压处理需要考虑消息的幂等性,保证消息不被重复消费
消息堆积的判定
1.在消费者未消费到消息、或者消费收到的消息延迟比较大的情况下需要消息是否积压;
在rmq中可以通过rmq的管理界面查看消费者的消息消费情况
消息积压的原因场景
如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百分消息持续积压几个小时,说说怎么解决。
这些问法,本质上都是针对场景,都是说可能你的消息端出来问题,不消费了。或者消费的及其满。接着就坑爹了。可能你的消息队列集群的磁盘都快满了。都没人消费,这个时候怎么办?或者是积压了几个小时,怎么办?或者是积压时间太长了,导致比如RabbitMQ设置了过期时间后就没了。其实这事,线上挺常见的,一般不出,一出就是大case。举个例子,消费端每次消费之后要写mysql,结果mysql挂了,消费端不动了,或者是消费端出了什么叉子,导致消费速度灰常慢。
1、快速处理积压大量积压的数据方案设计
几千万条数据在MQ里,积压了七八个小时。这个时候就是恢复consumer的问题。让它恢复消费速度,然后傻傻地等待几个小时消费完毕。这个肯定不能再面试的时候说。1个消费者1秒时1000条,1秒3个消费者是3000条。1分钟是18万条。1个小时是1000多万条。如果积压了上万条数据,即使消费者恢复了,也大概需要1个多小时才能恢复过来。
原来3个消费者1个小时。现在30个消费者,需要10分钟搞定。
一般情况下,这个时候只能做临时扩容了。具体操作步骤和思路如下:
① 先修改consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。
② 新建1个topic,partition是原来的10倍,临时建立好原来10倍或者20倍的Queue。
③ 然后写一个临时的分发数据的consumer程序,这个程序部署上去,消费积压的数据。消费之后,不做耗时的处理。直接均匀轮训写入临时建立好的10倍数量的Queue。
④ 接着征用10倍的机器来部署consume。每一批consumer消费1个临时的queue。
⑤ 这种做法,相当于将queue资源和consume资源扩大10倍,以10倍的速度来消费数据。
⑥ 等快速消费完积压数据之后,恢复原来的部署架构,重新用原先的consumer来消费消息。
(2)过期失效了怎么办
过期失效就是TTL。如果消息在Queue中积压超过一定的时间就会被RabbitMQ给清理掉。这个数据就没了。这就不是数据积压MQ中了,而是大量的数据会直接搞丢。
在这种情况下,增加consume消费积压就不起作用了。此时,只能将丢失的那批数据,写个临时的程序,一点一点查出来,然后再灌入MQ中,把白天丢失的数据补回来。
六、设计消息队列中间件
如何让你来设计消息队列中间件,如何设计?
主要考察两块。
- ① 是有没有对某个消息队列做过较为深入的原理的了解。或者从整体把握一个mq的架构原理。
- ②是看看你的设计能力,给你一个常见的系统,就是消息队列系统,看能够从全局把握一下整体架构设计的关键点。
比如说,这个消息队列,我们从以下几个方面来了解下: 1、首先MQ得支持可伸缩性吧。就是需要的时候增加吞吐量和容量? 2、其次,需要考虑一下MQ的数据是不是要持久化到磁盘 3、再次,考虑一下MQ的可用性。 4、最后,考虑一下能不能支持数据零丢失