深入浅出消息队列----【如何保证消息不重复?】
- 消息一定会重复
- 消息幂等消费
- 改造业务符合天然幂等写法
- 数据库唯一索引
- redis 唯一判断
本文仅是文章笔记,整理了原文章中重要的知识点、记录了个人的看法
文章来源:编程导航-鱼皮【yes哥深入浅出消息队列专栏】
消息一定会重复
发送消息的流程:
生产者发送消息给 Broker 后,需要等待 Broker 响应的 ack 才能确保消息已经被 Broker 存储了,以这种请求-应答的模式来确保消息一定被接收到,不会丢失。
那么假设 Broker 确实已经收到了这条消息并成功存储了,但返回给生产者 ack 的时候因为网络原因,生产者并没有收到这条消息的 ack,那么为了保证消息不会丢失,生产者只能再次发生这次消息。
在这种情况下,同一条消息被发送了两次,Broker 上就存在了两条一样的消息。
看到这有同学回想,既然为了保证消息不丢失,生产者没办法,只能多次发送同一条消息,那么就由 Broker 来过滤这些重复的消息吧!
理论上确实可以这样实现,但是实际应用上消息的体量都会比较大,Broker 本身的负载就不低,如果要加上去重功能,那么势必需要解析接收到的所有消息内容,然后进行对比,这会进一步加重 Broker 的负担,在高并发情况下还会大大降低性能。
从我们之前分析的 RocketMQ Broker 的存储流程来看,它并没有支持这个去重功能,包括是市面上别的消息队列中间件都不支持,因此想让 Broker 自动去重是没戏了。
所以,从发送流程来看,无法避免消息不重复发送,如果网络抖动 Broker 很可能会存储多条一模一样的消息。
再从消费的流程来看,即使 Broker 实现了消息去重功能,也无法保证同一条消息一定只被消费者消费一次。
之前已经提到消费者是通过提交消费点位来跟 Broker 同步已经消费到的位置。
在集群模式下,消费点位是存储在 Broker 上的,并且是消费者在拉取消息的时候顺带把此时的消费点提交给 Broker。
假设消费者消费完消息后,立马挂了,此时的消息点位还没有提交到 Broker,然后发送对垒重平衡后,另一个消费者顶上了这个队列的消费,就产生了消息的重复消费。
所以不论从生成、存储、还是消费三个方向来看,都无法保证消息的不重复。
虽然消息无法保证不重复,但是我们可以保证它仅被幂等消费来达到和仅消费一次的效果。
消息幂等消费
所谓的幂等其实是数学上的一个概念,f(f(x)) = f(x),对我们程序而言就是一个方法被同样的入参调用一次和多次产生的影响是一样的。
假设你的方法里面就是一条 update 语句:
update tableA set name = 'yes' where id = 1
像这样的逻辑,无论被执行多少次,和只执行一次达到的效果是一样的,这就是我们程序上的幂等。
因此,既然我们无法保证消息不重复,但是可以利用幂等来避免重复消费产生的影响。
那如何保证消息的幂等消费呢?
改造业务符合天然幂等写法
在项目初期或者新功能刚开始设计的时候,就可以考虑幂等的设计来满足业务需求,好比我上面提到的那个 update 语句,利用天然的幂等写法来满足消息的幂等消费,这是最简单和自然的一种方式。
然后可以给 update 设置一些前置条件。
比如现在有个消费消息业务逻辑:给用户加积分,并且将订单的状态从待完成变成已完成。
首先我们要调整下执行的顺序,不是先加积分,而是先改变订单的状态,执行:
update order set status = 1 where orderNO = 123 and status = 0;
添加一个 status = 0 的判断,如果消息已经被消费过,那么 orderNo 123 这个订单的 status 肯定已经是 1 了。
这样一来这个 update 执行之后影响的行数是 0,通过这个我们就能确定消息是否重复消费。
如果是重复消费,流程直接终止,这样后面给用户加积分这种不好利用 update 实现幂等操作的业务,利用前置其它业务处理也能实现幂等,重复消费也不会使得用户的积分被多加。
在业务上幂等改造就需要抓住这些点,将一些容易完成幂等改造的业务前置处理,并添加一些约束条件,提前终止重复消费,使得完整的大业务都实现了幂等。
数据库唯一索引
但是往往很多需求不是初期就有的,而是后面迭代的,此时整个业务流程基本上已经定型了,业务逻辑可能已经很复杂,不太好改造出上述 update 这样的语句来满足当下的业务需求。
此时可以利用数据库的唯一索引约束来保证消费的幂等消息。
例如可以给消息价格事务 ID,这个事务 ID 是全局唯一的,数据库表记录给这个事务 ID 添加唯一索引,当第一次处理完成这条消息的时候,同时在数据库中存储这条消息的处理记录,如果后面有重复消息过来,那么插入一定是会抛错的,这样一来就能避免消息的重复消费实现消费幂等。
具体操作起来大致两个方向:第一个方向时利用当前的已有的字段来作为唯一索引,比如订单的处理,那么订单的订单号肯定是唯一的,此时不需要再额外添加一个事务 ID 的字段。
如果没有合适的已有字段,那么就扩展一个事务 ID 字段来满足要求。
如果当下的业务表结构或者处理流程不方便扩展新的字段,那么消费端可以添加一张流水表来存储新的事务 ID 字段,将这个流水表的事务和业务处理放在一个事务中,这样就能保证业务事务执行成功后流水表一定添加上了,这样通过流水表就能实现消息的幂等。
redis 唯一判断
同样 redis 也能实现幂等的功能,相比数据库的唯一索引需要改表结构,或者新加一张流水表,redis 更加简单,利用 SETEX 这个命令就能实现幂等消费。
同样也是用全局唯一值来标记这条消息,例如订单号或者定义的事务 ID,每次在业务逻辑执行之前先利用 SETNX 来判断下,如果已经插入就直接返回,反之正常执行业务逻辑。
但是这里有个问题,如果唯一值插入 redis 后,消费者直接宕机了,业务逻辑并没有执行成功,那么即使由另一个消费者顶上消费到这条消息,由于 redis 还存储着这个唯一值,会使得这条消息被跳过,这样这条消息就跟丢了是一样的。
所以在异常情况下,这个方案会有这个问题,需要注意!