RabbitMQ
【黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战】
文章目录
- RabbitMQ
- 第二天 高级
- 8 RabbitMQ 应用问题
- 8.1 消息可靠性保障
- 8.1.1 消息补偿
- 8.2 消息幂等性保障
- 8.2.1 乐观锁机制
第二天 高级
8 RabbitMQ 应用问题
8.1 消息可靠性保障
在RabbitMQ的使用过程中我们如何保证消息百分百从生产端发送到服务端呢,在淘宝,银行等系统中是不允许任何错误的,任何消息的不可达都可能会造成巨大的损失。
我们通过发送端消息确认机制confirm机制和return机制确保消息能发送出去,通过接收端的Acknowledge (ACK)确保队列中的消息被处理了,但是如果保证发送的完整链上的消息被处理了,比如服务端发送成功后,接收端尚未接收,队列重启了,消息丢失。
8.1.1 消息补偿
【一个需求】100%确保消息发送成功
示意图
这个图看起来好像很复杂,其实只做到了三件事情:
发消息
收到消息确认
检查比对是不是收到消息了
-
当发生业务操作的时候,业务数据写入数据库
-
生产者将消息发送给MQ的队列Q1
-
发送了一条与step2中一摸一样的延迟消息给Q3
-
Consumer监听Q1,获取到了step2中发送的业务消息
-
消费者在收到生产者的业务消息后,发送了一条确认消息(记录收到的消息信息)到Q2
-
回调检查服务监听了Q2,获取到了消费者发送的确认消息
-
回调检查服务将这条确认消息写入数据库等待之后的比对
-
Q3中的延迟消息延迟时间已到,被回调检查服务接收到,之后就拿着这条延迟消息在数据库中比对,如果比对成功,证明消费者接收到了生产者的业务消息并处理成功(如果不处理成功谁会傻了吧唧发送确认消息呢);如果比对失败,证明消费者没有接收到生产者的业务消息,或者说消费者接收到了业务消息之后始终没有处理成功相关的业务并发送确认消息。这时回调检查服务就会调用生产者的相关业务接口,让生产者再次发送这条失败的消息
-
有一种最极端的情况,step2和step3的消息都发送失败了或者说在消息传递过程中发生意外丢失了!定时检查服务会一直轮询保存确认消息的数据库中的消息数据,并于生产者的业务数据库中的业务数据进行比对,如果两者比对数量一致,则代表业务执行没有问题;如果比对不一致,确认消息数据库的数据量小于生产者业务数据量的话,就证明消费者没有接收到生产者发送的消息。这时定时检查服务会通知生产者再次发送消息到MQ的队列Q1
核心思想是定时任务进行核对
- 业务数据入库
- 在发送消息前,先将消息入库
- 发送消息
- 发送消息m到消息队列Q1,Consumer监听到Q1的消息m后,消费消息m,并将消息m入库。然后发送确认消息给Q2,回调检查服务监听到Consumer反馈的确认消息后,也将消息写入数据库MDB。这是消息发送成功的正常情况。
倘若:发送消息失败,那么Consumer消费不到消息,Consumer的消息入库操作也失败了。这时也不用担心,因为在我们发送消息m后,过了几分钟后也会进行延迟发送消息m的副本给Q3,回调检查服务监听到延迟消息后,与MDB中的进行比对,MDB中没有消息m的记录,这说明Consumer消费消息m失败,那么回调检查服务就远程调用Producer跟它说一下再重发消息m,又重新进行上面消息发送的步骤。
几率很小的发送消息失败情况:
发送消息失败,延迟发送消息也失败。。。
这时定时检查服务会将消息数据库MDB中与业务数据库DB进行比对,然后让producer重发缺失的消息。
当消息从生产者发出的时候,首先在要发送的业务数据写到生产者的数据库,然后将消息发送到消息队列,如果消费者接受到了消息就进行消费并且写入数据库,同时返回一个确认消息,回调检查服务将确认消息写入数据库,如果此时生产者发送了一个延迟消息与上一次发送给消费者的消息是一样的,并且消费者成功消费了,那么回调检查服务就会通过监听延迟消息的id与自己的数据库中的消息进行对比,如果存在一样的id,就会知道生产者的这条消息已经消费过了,如果回调检查服务发现这个消息没有被消费者成功消费,那么就会通过远程调用告诉生成者需要重新发送这条消息给消费者。
还有一种情况是生产者发送消息失败,延迟发送的消息也失败了,这个时候rabbitmq就会通过定时检查服务,通过一定的时间检查消息数据库与生产者的数据库进行对比,发现与生成者的数据不一致,匹配不成功,就会通过远程调用告诉生产者需要重新发送数据不一致的消息。
真淦啊 【大概了解下这个方案 吧】
8.2 消息幂等性保障
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。
也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
首先,无论是RabbitMQ、RocketMQ还是kafka,都有可能出现消息的重复发送,这个是MQ无法保障的,而幂等性是开发或者运维人员需要保证的。
所谓消息的幂等性是指即使收到多次消息,也不会重复消费,这点很重要,例如用户付钱,点的太快了,付了多次,但是你也只能扣一次钱。
比如说转账案例:我第一次通过手机银行发送的转账500申请由于网络或者说银行系统负载高等原因,导致没有执行成功,我个人又点了很多次转账的按钮,但是银行的系统必须保证我只转出去了500。
8.2.1 乐观锁机制
通过每次消息中带有的version信息,来判断该业务操作是否能执行成功:
针对每个操作,会生成一个唯一的id,并且携带version=1,那么通过数据库的乐观锁操作,如果重复入库的话,会无效,这样就实现了幂等性,避免重复处理相同的数据。
【比如】
用户付款500后,发送一条扣款500的消息到mq,但是由于网络什么各种原因,Consumer没有消费到消息便让Producer再发一条,此时Q1中还存在着未被消费的和重发的那一条消息,这时Consumer在来正常消费,就消费了两次同样的消息,扣了用户1000,用户表示【很无语】
可以使用数据库的乐观锁来解决这个问题。第一次发送的消息中version字段作为条件,在Consumer消费掉第一条消息完毕后将库中的version+1,这样重复的第二条消息的version字段值就不匹配,sql也就不会执行,也就不会重复消费了。