目录
1. RocketMQ 消息丢失的原因有哪些
2. 如何保证 RocketMQ 全链路消息不丢失
2.1 保证生产者发送消息到 MQ,消息不丢失
2.2 保证消息写入 Broker 后不丢失
2.3 保证 Broker 集群时,消息不丢失
2.4 保证消费者消费消息不丢失
3. 如果整个 MQ 服务都挂了呢,怎么保证消息零丢失
1. RocketMQ 消息丢失的原因有哪些
所有 MQ 产品消息丢失的元凶无非就两个:网络+缓存
这两个原因落地到具体的场景:
1.生产者发送消息到 MQ,消息丢失
2.消息写入 MQ,消息丢失
① 消息写入内存后非正常关机;
② 消息写入磁盘后,磁盘坏了;
③ master 数据备份到 slave 由于网络原因导致备份失败;
3.消息者消费 MQ ,消息丢失
2. 如何保证 RocketMQ 全链路消息不丢失
2.1 保证生产者发送消息到 MQ,消息不丢失
【方案一】同步发送 + 消息重试机制
Producer 发送消息的三种方式:
1.单向发送:消息发送出去就不管了.
2.同步发送: 同步等待 Broker 响应.
3.异步发送: 异步处理 Broker 通知.
同步发送+消息重试:生产者向 mq 发送消息,mq 收到消息成功就会回复 ack,如果生产者没收到 ack,就会重试再次发送消息到 mq,通过这种机制来保证消息不丢失。(重试的消息会被加入到重试队列中,重试一定次数还未成功发送就会被加入到死信队列中)
PS:同步发送有个缺点,生产者在等待服务端回复ack的过程,他干不了别的事,所以比较综合的方式就是采用异步发送的方式,异步发送它会注册一个 sendCallback 回调监听器,相当于找了个小弟给你干活,可以在小弟的这个回调方法里面写逻辑,如果消息发送失败了,让小弟去重试就行。
【方案二】RocketMQ 的事务消息机制
⭐ RocketMQ 事务消息机制的思想:
首先,事务消息的本质就是,生产者这边处理一个本地事务,消费者这边处理一个本地事务,这两个事务要保证一个原子性,但这是一个分布式事务,直接保证这两个事务的一致性很难做到; 而 RocketMQ 的事务机制是怎么保证这两个本地事务的一致性的呢,它是通过保证生产者处理本地事务和往 RocketMQ 发消息这两个操作是原子性的,然后在消费者这一端,只要生产者消息成功发送到了 MQ,MQ 就可以通过重试的机制将消息发送给消费者,哪怕当时消费者的事务处理失败了,那么经过多次的重试,最终消费的事务也会执行成功。所以他的本质是通过保证事务消息的一半,从而来保证整体业务逻辑的事务性。
⭐ RocketMQ 事务消息机制的执行流程:
【参照上图】
1.生产者向 MQ 发送半消息(half),MQ 回复半消息.
这个消息不会被下游消费者消费,用于判断 MQ 服务是否正常.
2.生产者处理自己的本地事务.
3.生产者返回本地事务状态给 MQ.
总共有三种状态:
① 本地事务执行成功 commit 状态,MQ 可以将消息向下游消费者推送.
② 本地事务执行失败 rollback 状态,MQ 直接将消息丢弃.
③ 本地事务执行时间比较长,unknow 状态,MQ 拿到这个消息后,等一段时间,就会向生产者发起一个响应,来确认生产者的本地事务有没有完成,生产者就会检查本地事务的状态,成功就返回 commit,失败就返回 rollback。这个发起确认生产者状态的操作默认可以重试 15 次,如果超过 15 次, 生产者的本地事务还没有执行成功,MQ 就会 rollback,丢弃消息.
⭐ Rocket 事务消息机制的应用场景:
如上图,在电商项目中,当用户下单后,等待支付,15 分钟内要完成支付这么一个场景,就可以使用 RocketMQ 事务消息机制来做!!
【定时任务做法】
1. 这个场景最简单的做法就是,用户下完单之后,起一个 15 分钟的定时任务,然后等待 15 分钟结束后,再去检查一下用户有没有完成支付,完成支付了就 commit,没有完成支付就 rollback。
2. 但是这样存在一个问题,如果用户下完订单 1 分钟就完成支付了呢?所以这样就没办法立即感知用户的支付状态,就办法立即给用户发货。(例如火车票,机票抢票)
3. 所以就不能等到 15 分钟后再去检查订单状态,就得每隔一分钟就做一个定时任务,而且还要维护任务的状态,处理任务失败的情况,这就显得很麻烦。
【RocketMQ 事务消息机制的做法】
使用 RocketMQ 事务消息的机制来做就显得更优雅。执行流程如下:
- 订单创建后,生产者发送一条事务消息给 RocketMQ,表示创建订单.
- 生产者执行本地事务(在 MySQL 中记录订单信息)。
- 向支付系统申请预支付订单.
- 主动向 MQ 返回 Unknow 状态. (即使下单成功)
- RocketMQ 在接收到
Unknow
状态后,会定期回查生产者的事务状态.- 生产者在回查时检查本地事务状态,如果订单已支付,则提交事务消息;如果订单未支付且超过15分钟,则回滚事务消息并取消订单.
所以定时任务中复杂的检查过程就得以优化了,但是我们要在 RocketMQ 中配置对应的参数,例如:回查的次数、回查的间隔、事务超时的时间
TransactionMQProducer producer
= new TransactionMQProducer("transaction_producer_group");
// 设置事务回查参数
producer.setTransactionCheckMax(10); // 每条消息最大回查 10 次
producer.setTransactionCheckInterval(30000); // 回查间隔 30 秒
producer.setTransactionTimeout(60000); // 事务超时 60 秒
PS:事务超时时间:消息从发送到生产者开始,在指定时间内没有提交或回滚事务,则认为事务重试,就进行回查。
当然,有的朋友可能会问了,支付系统那边订单支付成功后,会有一个回调事件来通知你事务执行成功了,万一支付系统他这个消息不发呢,发失败了怎么办? 万一呢是不是,所以主动去查 + 被动去推,这样才是最稳妥的做法。
2.2 保证消息写入 Broker 后不丢失
当我们把消息发送到 Broker 后,它会将消息存到缓存里面,缓存会将这些数据过段时间写入磁盘,那缓存也会存在断电丢失的可能,那如何保证消息不丢失呢 ?
PS:有些朋友就会说了,我不用这个缓存,直接写磁盘,不就没问题了??
应用能控制的只有用户态的缓存,而内核态还涉及了一个缓存(PageCache),这个缓存只有操作系统内部可以调用。所以这个缓存没办法不用,这是操作系统强制使用的。所以此处的缓存丢失消息的可能针对的是内核态的缓存。
【同步刷盘机制】
虽然应用程序控制不了内核态的缓存,但是操作系统提供了一个刷盘的接口,而应用程序就可以调这个刷盘的操作去申请一次刷盘,将这个 PageCache 刷到磁盘里面去。
同步刷盘:同步刷盘也不能绝对的保证数据不丢失!所谓的同步,字面意思是来一个消息,就调用一次刷盘,但是当消息非常多的时候,那么这个调用刷盘的频率就会非常高,那么操作系统就扛不住了,而操作系统就是因为写磁盘太慢才设计了一个 PageCache 缓存。所以所谓的同步刷盘并不是来一个消息就调用一次刷盘操作,RocketMQ 是每隔 10 ms 定时调用一次刷盘,只不过这个定时间隔比较短,它可以减少消息丢失的可能性。本质上来说,它还是存在消息丢失的可能。
【异步刷盘】
RocketMQ 还提供了异步刷盘的机制,所谓的异步刷盘不只是消息积累一批,然后再一次性写入磁盘。RocketMQ 对异步刷盘也是做了处理的
异步刷盘:异步刷盘可以配置是使用堆内存,还是使用堆外内存(直接内存)。
1. 使用堆内存,这些消息就会在 JVM 的堆中进行分配,当需要消息刷盘的时候, RocketMQ 的后台线程会主动调用刷盘操作将消息写入磁盘;但是这个过程有 GC 的介入,就免不了 STW,这就会影响消息从 RocketMQ 写入堆内存的速度(STW 直接暂停)以及消息从堆内存刷到磁盘的速度(STW 影响后台线程,导致刷盘速度下降)。
2. 使用直接内存,消息写入内存以及消息的刷盘就不会受 JVM 垃圾回收的影响,它可以更高效的利用操作系统的内存和 I/O 调度机制。使用直接内存,就不再是 RocketMQ 主动调用刷盘了,而是操作系统接管了,它会通过脏页的机制先给内存中修改了但尚未刷盘的消息打上一个脏页的标记,一旦这个脏页的内存超过整个内存的阈值了,它内部就会调用一次刷盘操作。当然使用直接内存虽然性能更优,但是它会导致内存泄漏,因为没有 GC 的介入,长时间的内存泄露,可能会导致 OOM。
PS:异步刷盘使用直接内存,RocketMQ 不再主动调用刷盘,但并不代表着它无法控制直接内存写入磁盘的时机了,它可以利用 mmap 内存映射机制 (使用 FileChannel.map 方法将磁盘文件的一部分映射到直接内存,即直接缓冲区),当需要保证数据一致性的时候,主动去同步脏页(调用 MappedByteBuffer.force() 强制将缓冲区的内容写入磁盘)。
当然有些朋友可能会问,应用程序直接决定消息写入磁盘的位置,如果我将消息写入到磁盘的启动 Cache 上,那操作系统不就崩了吗 ??
所谓的内存映射,它映射的区域仅限于它所映射的文件部分,不能越界访问其他内存区域系统关键区域。即使应用程序尝试访问或修改不属于自己的内存区域,操作系统的权限控制机制也会阻止这种操作。
2.3 保证 Broker 集群时,消息不丢失
【方案一】普通集群,指定角色,各司其职
普通集群可以使用同步同步的主节点来 "保证" 消息不丢失。
ASYNC_MASTER:异步同步的主节点
SYNC_MASTER:同步同步的主节点
SLAVE:从节点 (master 节点挂了后,slave 节点不会切换)
所谓同步同步的主节点,就是当生产者发送消息到 Broker 的时候,Broker 集群的主节点会先将数据同步到 slave,同步完之后才会给生产返回响应。
而异步同步的主节点,当生产者发送消息到 Broker 后,主节点会先返回响应给生产者,然后异步的发起一个线程往 slave 同步数据。
【方案二】Dledger 高可用集群,自行选举,多数同意
1.生产者将消息写到 Leader
2 Leader 将消息写到自己的 commitLog 日志里面,
3. 然后给生产者返回一个响应,
4. 随着心跳,给其他的节点发起消息同步,
5. 如果有多数的节点完成了消息同步,那么最终这个消息就会写到磁盘里,并记为 commit 状态。
但是这种集群下,它仍然是会存在消息丢失的可能性的,只不过这个可能性非常的小(在工程上忽略不计)。
当 Leader 节点将消息记录到 commitLog 里面后,还没来得及同步消息,它就挂了,这个时候,Dledger 集群就会选出一个日志最新的节点作为 Leader,而挂掉的 Leader 节点中还未提交的消息,当服务重启的时候,就会被主动丢弃,然后以新的 Leader 中的消息为准。
2.4 保证消费者消费消息不丢失
此处消费者端由于有消息重试机制,所以通常是不会丢失的,更多的是要考虑消息幂等性问题(由于网络抖动,消费可能被消费多次)。
虽然说有消息重试机制,但并不代表着消息的绝对不丢失,在某些情况下,还是会存在消息丢失的,当 MQ 发送消息给消费者,如果消费者在消息还未实际处理完成之前就返回了消费成功的响应,就存在消息丢失的可能。所以要保证消费者消费消息不丢失,最好还是实际处理完消息后,再返回消费成功的响应。
3. 如果整个 MQ 服务都挂了呢,怎么保证消息零丢失
当整个 MQ 服务都挂了,那么就需要有一个降级机制来临时存储未成功发送的消息,确保系统在恢复后能够尽快地将这些消息重新写入 MQ。可以选择 Redis 、本地文件,或者内中的集合作为降级缓存。然后新起一个线程不断的尝试将降级缓存里的消息写入到 MQ,这样至少在 MQ 服务重启时,消息可以尽快的写入到 MQ 里去。
【总结】
RocketMQ 如何保证全链路消息不丢失:
1. 生产者发送消息到 MQ,消息不丢失
- 同步发送 + 多次尝试 -- 降低吞吐
- 事务消息机制 -- 多次网络请求
2. Broker 收到消息后消息不丢失
- 设置同步刷盘 -- I/O 负担
- 搭建 Dledger -- 不断地 RPC 心跳,网络负担
3. 消费者消费消息不丢失
- 同步处理消息,再提交 offset -- 无法通过异步提高吞吐
4. 整个 MQ 集群挂了,如何保证消息零丢失
- 增加临时的降级存储