RabbitMQ顺序性、可靠性(消息丢失)、重复消费、消息堆积解决方案
顺序性
RabbitMQ使用过程中,有些业务场景需要我们保证顺序消费,例如:业务上产生三条消息,分别是对数据的增加、修改、删除操作,如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了 。
RabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序、队列中消息的顺序、消费消息的顺序。
发送消息的顺序
先看一下是什么原因造成了发送消息时候的顺序错乱
- 消息生产者启用了发送确认(ack)机制,在发生中断时,需要 RabbitMQ 补偿发送时,那么此时消息在源头就已经出现顺序混乱了,导致消息被消费时也是乱序的
- 另一种情况,如果消息发送时,设置了超时时间,并且采用了死信队列,模拟了延时队列的效果,那么此时消息的顺序也时不能保证的
- 还有一种情况,如果消息设置了优先级,那么在高并发的情况下,消息的顺序也是得不到保证的,消息的消费顺序也就不能保证了
发送消息的顺序性,一般来说不做要求,但是如果一定要求顺序,可以使用锁机制配合 ack机制 来保证消息的顺序到达
队列中消息的顺序
消息队列中的消息是遵循FIFO(先进先出)原则,天然有序
消费消息的顺序
有这样一个订单操作,insert 、update、delete连续操作,并且消息已经顺序存在queue中,那么如何保证消费顺序是insert 、update、delete,而不是delete、insert 、update呢?
方案一:拆分多个queue,每个queue一个consumer,该条订单的相关操作全部放到这个queue中,由这一个consumer消费,这样做多了一些queue。
方案二:就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,该条订单相关的消息全部放到一个队列中,然后分发给底层不同的worker线程来处理
可靠性
实际上消息队列是没法百分百保证不丢失的,我们只能尽量降低概率,然后在消息丢失后记录日志,再处理
有这样一个典型的订单场景
- MQ 挂了,消息没发出去。创建订单后面几个优惠券、积分的下游系统全都没有执行业务结算怎么办?
- MQ 是高可用的,消息发出去了,但是优惠券结算业务报错了怎么办?因为这个时候是异步的,也不好去回滚
- 消息正常发出去,消费者也接收到了,订单系统、优惠券系统都正常执行完了,积分业务报错了导致积分没结算,那这个订单的数据就不一致了
要解决上述问题,就是要保证消息一定要可靠地被消费,那么我们可以来分析下消息有哪些步骤会出问题
RabbitMQ 发送的消息是这样的 , 消息被生产者发到指定的交换机根据路由规则路由到绑定的队列,然后推送给消费者 。
在这个过程中,可能会出一下问题
- 生产者消息没到交换机,相当于生产者弄丢消息
- 交换机没有把消息路由到队列,相当于生产者弄丢消息
- RabbitMQ 宕机导致队列、队列中的消息丢失,相当于 RabbitMQ 弄丢消息
- 消费者消费出现异常,业务没执行,相当于消费者弄丢消息
下面是单消费实例的解决方案
多实例的先留个坑,以后再填
生产者弄丢消息
RabbitMQ 提供了确认和回退机制,有一个异步监听机制,每次发送消息,如果成功/未成功发送到交换机都可以触发一个监听ConfirmCallback(),从交换机路由到队列失败也会有一个监听ReturnsCallback()。只需要开启这两个监听机制,使用记录日志、发送邮件通知、落库定时任务扫描重发这些应对策略
生产者弄丢数据其实及其罕见,落库定时任务扫描重发工作量大,一般记录日志后,发邮件给对应人员,补充数据库数据即可
RabbitMQ弄丢消息
宕机重启不开启持久化的情况下 RabbitMQ 重启之后所有队列和消息都会消失,所以我们创建队列时设置持久化
消费者弄丢消息
RabbitMQ 给我们提供了消费者应答(ack)机制,默认情况下这个机制是自动应答,只要消息推送到消费者就会自动 ack ,然后 RabbitMQ 删除队列中的消息。启用手动应答之后我们在消费端调用 API 手动 ack 确认之后,RabbitMQ 才会从队列删除这条消息 。
开启手动ack,在业务处理完成之后手动ack即可,如果在业务处理过程中出异常了,队列会给消费者重推,也要注意重推导致的循环异常,可以配置重试次数策略。
消息重复消费(幂等性)
这个也是生产环境业务中经常出现的场景,重复消费也要从两方面分析,为什么会出现重复消费
生产时消息重复
在网络波动的情况下,生产者给MQ服务器发送消息,由于网络原因导致生产者没有收到ACK确认消息,但是MQ服务器实际上已经接收到了消息,在这种情况下生产者就会重新发送一遍刚才的消息。
此时重发是MQ-client发起的,消息的处理是MQ-server,为了避免broker落地重复的消息,对每条消息,MQ系统内部必须生成一个inner-msg-id,作为去重和幂等的依据,这个内部消息ID的特性是:
- 全局唯一
- MQ生成,具备业务无关性,对消息发送方和消息接收方屏蔽
有了这个inner-msg-id,就能保证即使重发,也只有1条消息落到MQ-server的DB中
消费时消息重复
在消费者方面如果出现网络问题,比如消费者对消息已经成功消费了,在向MQ服务器进行确认的时候网络异常了,这时候MQ服务器就没有接收到确认,MQ为了保证消息被消费,就会继续向消费者发送之前已经被消费了的消息,这种情况下消费者就会接收到两条一样的消息。
我们解决消息重复消费主要是保证消费的幂等性,有两种角度,第一种就是不让消费端执行两次,第二种是让它重复消费了,但是不会对我的业务数据造成影响就行。通常可以在发消息的时候携带业务唯一id,消费成功后保存到redis/db中,消费前再检查下有没有这个ID,有的话就表示已经消费过了,或者使用数据库唯一性主键约束,再或者使用cas,最后遇到重复消息丢弃消息即可
消息堆积
- 对生产者发消息接口进行适当限流(不太推荐,影响用户体验)
- 多部署几台消费者实例(推荐)
- 适当增加 prefetch 的数量,让消费端一次多接受一些消息(推荐,可以和第二种方案一起用)