生产者
生产者通过RocketMQ提供的事务消息(两阶段提交)能保证消息的一致性。
第一阶段给Broker发送一个半事务消息,半事务消息是不能消费的消息,broker已经收到生产者发送的消息,但是并未收到生产者的二次确认,所以该消息被标记为【暂不能投递】状态(即消费者不能消费这条消息);
如果Producer发送成功,则执行本地事务,如果本地事务执行成功则二次确认消息,向Broker发送commit消息,如果本地事务执行失败则发送Rollback消息给Broker。如果Producer发送半事务消息后没有收到Broker的成功响应,可能是因为网络抖动,Broker宕机(则触发重试机制,在重试后仍然失败则将该消息持久化,通过线程扫描定时发送到Broker,直到Broker恢复)。
如果由于网络抖动,生产者重启导致而消息确认失败,Broker会通过扫描发现某条消息长期处于【半事务消息】状态,就会主动向生产者发起询问查询该条消息的最终状态,默认每隔60s回查一次,回查15次还是不行,则不投递半事务消息。
事务消息发送步骤如下:
- 生产者将半事务消息发送至 RocketMQ Broker。
- RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。
- 生产者开始执行本地事务逻辑。
- 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
- 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
- 断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
- 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置
消费者
消费者保证不丢失和幂等性
消息幂等判断去重
消费者消费到消息后,如何保证消息的幂等,很容易想到的是通过if判断;比如用户下单,在用户支付订单后,更新订单状态同时发布一条消息到Broker,通知下游服务新增物流、更新用户积分等信息,我们就那新增物流信息为例,物流服务在消费该消息时,通过if判断物流信息表中是否存在相同的物流id,不存在则正常消费,存在则直接return,不重复消费。
if(null != xxxMapper.selectById(id)) {
return "success";
}
// todo 消费逻辑
问题分析
该方案非常简单,但是在大量并发和复杂的网络环境下,如果细节没有处理好会有很多漏洞。
- 并发情况下如果网络抖动导致消息重复消费,在第一条消息没有消费完成,重复的消息又执行这个if判断会导致主键冲突异常
- 在业务层面重复消费可能导致库存扣减两倍,用户积分也会新增两倍。
锁机制+事务去重
- 在新增信息的表中,做好主键冲突的异常捕获处理
- 将xxxMapper.selectById(id)的select语句使用for update和消费逻辑使用事务处理。
这样虽然能保证消息不会重复消费,但是牺牲了很大的性能开销,因为消费消息逻辑被整个事务包裹而导致处理时间被延长。并且还增加了整个系统的复杂度,因为还有其他业务需要消费消息,其实我们还可以继续思考,引入所有业务更加通用的方案。
本地事务消息表
引入一张非业务的消息表,记录消息消费的状态,状态包括两种:消费中/已消费。将它与其他业务绑定在一起作为一个本地事务,大致逻辑如下:
- 开启事务
- 插入消息表
- 处理其他业务(比如:新增物流,更新积分,更新库存信息)
- 提交/回滚事务
虽然他没有对数据库记录加锁,但是他还是使用事务,有事务就会延长消费时间
去事务的本地消息表解决方案
利用消息表的消息状态作为基础条件,
- 消费者消费首先插入消息去重表,
- 插入成功,则继续执行消费逻辑:
- 消费失败,则删除消息表,延迟继续消费
- 消费成功,则更新消息表消息状态为已消费
- 插入失败分为两种情况:
- 消息如果在消费中,则延迟继续消费;
- 消息如果已经消费,返回消费成功
特别注意:延迟继续消费的消息需要判断消费时长,或者消费次数,达到消费上限则告警,或者投递到死信Topic中。
代码和方案持续优化更新中。。。