MQ最终一致性理论与实践
原理
分布式事务无论是2PC&3PC还是TCC,基本都遵守XA协议的思想,但全局事务方案并发性较差;
最终一致性方案指的是将最有可能出错的业务以本地事务的方式完成后,采用不断重试的方式(不限于消息系统)来促使同一个分布式事务中的其他关联业务全部完成,不遵从XA协议。相关理论可以参考并学习 分布式事务 | 凤凰架构 | 可靠事件队列
需求
创建订单(order-service), 同时扣减库存(repo-service)
非事务型消息队列
order-service 本地事务
- 在t_order表添加订单记录
- 在transaction_log 添加对应的扣减库存消息
repo-service 本地事务
- 检查本次扣减库存操作是否已经执行过 && 是否可以扣减库存
- 执行扣减库存
- 写判重表
- 向MQ 发送消费完成 ACK
repo-service重复收到消息的原因,一是生产者重复生产,二是中间件重传。为了实现业务的幂等性,repo-service 中维护了一张判重表
- order-service后台任务会把消息表中的消息发送给MQ,成功后则删除消息表中的消息。如网络超时则会重新发送消息直到MQ响应成功ACK。这样可能会导致消息的重复,需要repo-service做去重操作。
- MQ向repo-service推送消息时,repo-service处理消费完成后会向MQ进行ACK响应,但如果ACK响应发送网络超时则也会出现消费重复消费的情况,需要repo-service做去重操作。
RocketMQ事务型消息队列
存在的问题
- producer发送失败
- producer.send()返回消息异常
- 本地事务执行,如果异常,此时如何解决
解决方案
- 如果发送失败,那么调用端直接抛出异常,后续不会执行。✅
- 如果producer.send没有返回send_ok,则不会执行executeLocal方法,后续会执行check回查,一直查不到信息,最后会回滚消息。✅
- rocketMQ的client在事务消息中的bug 下文分析 ❌
如实现方案类似@Transactional add(serviceA→producer.sendTransactionMessage()→saveTransaction(中置half)这一流程,如果在本地事务执行过程中,saveTransaction出现异常,当前操作不会成功,但是由于exception会被RocketMQ捕获,且不会继续抛出,因此异常不会被事务方法add感知,导致serivceA执行成功,但是本地事务不成功。然后执行回查时,会回滚消息,后续的serviceB不会消费消息。
RocketMQ 源码解析
try {
sendResult = this.send(msg); //同步发送消息
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
//判断sendResult类型
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
//在executeLocalTransaction执行中,如果抛出异常,会被catch掉,但是没有重新throw,因此不会被调用方感知
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
RocketMQ最终一致性如何正确开发
producer端
发送half后置
当前half后置发送的开发,订单服务+事务消息落库+producer.sendHalf都是在一个事务中,注意去判断发送消息返回是否为send_ok。
- 订单数据或者事务消息有异常,由于在同一个事务中,因此事务rollback ✅
- 如果half消息发送异常,外层事务方法可以感知,因此事务rollback ✅
- 对sendResult的发送结果判断事务是否发送成功,如果发送结果不是send_ok,那么需要抛出异常,此时执行事务rollback ✅
发送half中置
订单和producer.sendHalf在同一个事务方法中,事务消息持久化在executeLocalTransaction方法中。
- 如果下单异常,那么事务rollback,则send不会执行 ✅
- 如果half发送失败,事务rollback ✅
- 如果send_ok,那么下单操作完成,但是executeLocalTransaction执行失败
- 如果不抛出异常,localTransactionState=RollBack时,订单的事务方法感知不到异常,导致订单落库,事务消息存储失败 ❌
- 如果抛出异常,会被rocketmq捕获,也感知不到异常 ❌
发送half前置
把业务方法和事务持久化的操作,统一放在executeLocalTransaction方法中
- producer.sendHalf() 异常或者状态不为send_ok,那么抛出异常,本地事务不执行 ✅
- 如果本地事务抛出异常,事务Rollback。抛出的异常被rocketmq捕获,broker不会得到事务状态和启动本地回查。✅
结论
对于half前后置都可以保证事务的最终一致性,但是对于把所有的事务执行放在executeLocalTransaction中执行,略微有些问题
- 如果业务方法耗时,执行executeLocalTransaction的执行时间过长,可以会增加不必要的回查;而half消息后置,把业务方法先执行,那么会减少不必要的事务回查。
- 会添加将object转化为java-bean的代码。
consumer端
- consumer端消费失败而去执行回滚的话,需要付出更多的代价,而且还会引发其他系统回退导致的新问题。
- consumer端会返回reconsume_later,并重发消息,且默认重试16次,直到消费成功。如果失败,则人工介入处理。
解决方案
- 设置消息重试次数,如果达到指定次数,就发邮件或者短信通知人工介入;
- 等待消息重试次数超过默认的16次,进入死信队列,然后程序监听对应的私信队列主题,通知人工介入或者在rocketMQ控制台查看处理。
实战(代码样例)
欢迎star https://github.com/WeiXiao-Hyy/mq-eventual-consistency
参考资料
- MQ最终一致性事务 - 文章分类 - LBJboy - 博客园
- SpringCloud Alibaba微服务实战三十二 - 集成RocketMQ实现分布式事务
- 基于RocketMQ分布式事务 - 完整示例 - 掘金