分布式事务之最终一致性
- 参考链接
- 分布式事务基础理论
- 概述案例
- 解决方案:RocketMQ可靠消息
- 注意事项:
- 代码实现
参考链接
原文链接:https://blog.csdn.net/jikeyeka/article/details/126296938
分布式事务基础理论
基于上述的CAP和BASE理论,一般情况下会保证P和A,舍弃C,保证最终一致性。最终一致是指经过一段时间后,所有节点数据都将会达到一致。如订单的"支付中"状态,最终会变为“支付 成功”或者"支付失败",使订单状态与实际交易结果达成一致,但需要一定时间的延迟、等待。
概述案例
此方案的核心是将分布式事务拆分成多个本地事务,然后通过网络由消息队列协调完成所有事务,并实现最终一致性。以商城下单为例:
-
消息发送方,用户下单: 创建订单,然后通过网络发送消息到MQ
-
消息接收方,扣减库存: 通过网络从MQ中接收消息,然后扣减库存
该解决方案容易理解,实现成本低,但是面临以下几个问题:
1.消息发送方执行本地事务与发送消息的原子性问题,也就是说如何保证本地事务执行成功,消息一定发送成功
begin transaction
1.数据库操作
2.发送消息
commit transation
这种情况下,貌似没有问题,如果发送消息失败,就会抛出异常,导致数据库事务回滚。但如果是超时异常,数据库回滚,但此时消息已经正常发送了,同样会导致不一致。
2.消息接收方接收消息与本地事务的原子性问题,也就是说如何保证接收消息成功后,本地事务一定执行成功
3.由于消息可能会重复发送,这就要求消息接收方必须实现幂等性
由于在生产环境中,消费方很有可能是个集群,若某一个消费节点超时但是消费成功,会导致集群同组 其他节点重复消费该消息。另外意外宕机后恢复,由于消费进度没有及时写入磁盘,会导致消费进度部 分丢失,从而导致消息重复消费。
解决方案:RocketMQ可靠消息
RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目。Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便 利性支持。因此,我们通过RocketMQ就可以解决前面的问题。
1.消息发送方执行本地事务与发送消息的原子性问题,也就是说如何保证本地事务执行成功,消息一定发送成功
RocketMQ中的Broker 与 发送方 具备双向通信能力,使得 broker 天生可以作为一个事务协调者存在;并且RocketMQ 本身提供了存储机制,使得事务消息可以持久化保存;这些优秀的设计可以保证即使发生了异常,RocketMQ依然能够保证达成事务的最终一致性。
-
发送方发送一个事务消息给Broker,RocketMQ会将消息状态标记为“Prepared”,此时这条消息暂时不能被接收方消费。这样的消息称之为Half Message,即半消息。
-
Broker返回发送成功给发送方
-
发送方执行本地事务,例如操作数据库
-
若本地事务执行成功,发送commit消息给Broker,RocketMQ会将消息状态标记为“可消费”,此 时这条消息就可以被接收方消费;若本地事务执行失败,发送rollback消息给Broker,RocketMQ 将删除该消息。
-
如果发送方在本地事务过程中,出现服务挂掉,网络闪断或者超时,那Broker将无法收到确认结 果
-
此时RocketMQ将会不停的询问发送方来获取本地事务的执行状态(即事务回查)
-
根据事务回查的结果来决定Commit或Rollback,这样就保证了消息发送与本地事务同时成功或同时失败。
以上主干流程已由RocketMQ实现,对于我们来说只需要分别实现本地事务执行的方法以及本地事务回查的方法即可,具体来说就是实现下面这个接口:
public interface TransactionListener {
/**
- 发送prepare消息成功后回调该方法用于执行本地事务
- @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
- @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,
这里能获取到
- @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:未知,需要回查
*/
LocalTransactionState executeLocalTransaction(final Message msg, final
Object arg);
/**
- @param msg 通过获取transactionId来判断这条消息的本地事务执行状态
- @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:未知,需要回查
*/
LocalTransactionState checkLocalTransaction(Message msg);
}
2.消息接收方接收消息与本地事务的原子性问题,也就是说如何保证接收消息成功后,本地事务一定执行成功
- 如果是出现了异常,RocketMQ会通过重试机制,每隔一段时间消费消息,然后再执行本地事务;如果 是超时,RocketMQ就会无限制的消费消息,不断的去执行本地事务,直到成功为止。
- 实现逆向流程,比如下单流程:在订单服务执行本地事务成功,消息commit, 然后 商品服务 消费消息进行扣减库存,在商品服务扣减库存过程中发生异常,那么先将本地数据回滚,然后发送回滚消息,在订单服务监听回滚消息进行处理。
注意事项:
-
因为使用最终一致性解决分布式事务,可能出现的问题:
发送方本地事务执行完成,消息commit, 这时候返回客户是成功,但是消费者可能失败,会导致数据回滚,所以为了避免用户看到下单成功后,因为数据回滚导致订单数据消失,所以需要增加一个中间状态,比如:下单中,只有所有的消费者消费成功后,在改变订单状态为下单成功。 -
发送方监听事务消息 RocketMQLocalTransactionListener, 一个工程只可以有一个实现类,否则启动报错。
代码实现
- 发送方代码
public Long placeOrder(Long userId, SeckillOrderCommand seckillOrderCommand) {
SeckillGoodsDTO seckillGoods = seckillGoodsDubboService.getSeckillGoods(seckillOrderCommand.getGoodsId(), seckillOrderCommand.getVersion());
//检测商品
this.checkSeckillGoods(seckillOrderCommand, seckillGoods);
boolean exception = false;
long txNo = SnowFlakeFactory.getSnowFlakeFromCache().nextId();
String key = SeckillConstants.getKey(SeckillConstants.GOODS_ITEM_STOCK_KEY_PREFIX, String.valueOf(seckillOrderCommand.getGoodsId()));
try{
//获取商品限购信息
Object limitObj = distributedCacheService.getObject(SeckillConstants.getKey(SeckillConstants.GOODS_ITEM_LIMIT_KEY_PREFIX, String.valueOf(seckillOrderCommand.getGoodsId())));
//如果从Redis获取到的限购信息为null,则说明商品已经下线
if (limitObj == null){
throw new SeckillException(ErrorCode.GOODS_OFFLINE);
}
if (Integer.parseInt(String.valueOf(limitObj)) < seckillOrderCommand.getQuantity()){
throw new SeckillException(ErrorCode.BEYOND_LIMIT_NUM);
}
Long result = distributedCacheService.decrementByLua(key, seckillOrderCommand.getQuantity());
this.checkResult(result);
}catch (Exception e){
logger.error("SeckillPlaceOrderLuaService|下单异常|参数:{}|异常信息:{}", JSONObject.toJSONString(seckillOrderCommand), e.getMessage());
exception = true;
//将内存中的库存增加回去
distributedCacheService.incrementByLua(key, seckillOrderCommand.getQuantity());
}
//事务消息
Message<String> message = this.getTxMessage(txNo, userId, SeckillConstants.PLACE_ORDER_TYPE_LUA, exception, seckillOrderCommand, seckillGoods);
//发送事务消息
rocketMQTemplate.sendMessageInTransaction(SeckillConstants.TOPIC_TX_MSG, message, null);
return txNo;
}
- 发送方本地事务
/**
* @author binghe(微信 : hacker_binghe)
* @version 1.0.0
* @description 监听事务消息
* @github https://github.com/binghe001
* @copyright 公众号: 冰河技术
*/
@Component
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class OrderTxMessageListener implements RocketMQLocalTransactionListener{
private final Logger logger = LoggerFactory.getLogger(OrderTxMessageListener.class);
@Autowired
private SeckillPlaceOrderService seckillPlaceOrderService;
@Autowired
private DistributedCacheService distributedCacheService;
@Override
@Transactional(rollbackFor = Exception.class)
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
TxMessage txMessage = this.getTxMessage(message);
try{
//已经抛出了异常,则直接回滚
if (BooleanUtil.isTrue(txMessage.getException())){
return RocketMQLocalTransactionState.ROLLBACK;
}
seckillPlaceOrderService.saveOrderInTransaction(txMessage);
// int i = 1/0;
logger.info("executeLocalTransaction|秒杀订单微服务成功提交本地事务|{}", txMessage.getTxNo());
return RocketMQLocalTransactionState.COMMIT;
}catch (Exception e){
logger.error("executeLocalTransaction|秒杀订单微服务异常回滚事务|{}",txMessage.getTxNo());
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
TxMessage txMessage = this.getTxMessage(message);
logger.info("checkLocalTransaction|秒杀订单微服务查询本地事务|{}", txMessage.getTxNo());
Boolean submitTransaction = distributedCacheService.hasKey(SeckillConstants.getKey(SeckillConstants.ORDER_TX_KEY, String.valueOf(txMessage.getTxNo())));
return BooleanUtil.isTrue(submitTransaction) ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.UNKNOWN ;
}
private TxMessage getTxMessage(Message msg){
String messageString = new String((byte[]) msg.getPayload());
JSONObject jsonObject = JSONObject.parseObject(messageString);
String txStr = jsonObject.getString(SeckillConstants.TX_MSG_KEY);
return JSONObject.parseObject(txStr, TxMessage.class);
}
}
- 消费者监听事务消息
@Component
@RocketMQMessageListener(consumerGroup = SeckillConstants.TX_GOODS_CONSUMER_GROUP, topic = SeckillConstants.TOPIC_TX_MSG)
public class GoodsTxMessageListener implements RocketMQListener<String> {
private final Logger logger = LoggerFactory.getLogger(GoodsTxMessageListener.class);
@Autowired
private SeckillGoodsService seckillGoodsService;
@Override
public void onMessage(String message) {
if (StrUtil.isEmpty(message)){
return;
}
logger.info("秒杀商品微服务开始消费事务消息:{}", message);
TxMessage txMessage = this.getTxMessage(message);
//如果协调的异常信息字段为false,订单微服务没有抛出异常,则处理库存信息
if (BooleanUtil.isFalse(txMessage.getException())){
seckillGoodsService.updateAvailableStock(txMessage);
}
}
private TxMessage getTxMessage(String msg){
JSONObject jsonObject = JSONObject.parseObject(msg);
String txStr = jsonObject.getString(SeckillConstants.TX_MSG_KEY);
return JSONObject.parseObject(txStr, TxMessage.class);
}
}
- 消费者处理本地事务
@Override
@Transactional(rollbackFor = Exception.class)
public boolean updateAvailableStock(TxMessage txMessage) {
Boolean decrementStock = distributedCacheService.hasKey(SeckillConstants.getKey(SeckillConstants.GOODS_TX_KEY, String.valueOf(txMessage.getTxNo())));
if (BooleanUtil.isTrue(decrementStock)){
logger.info("updateAvailableStock|秒杀商品微服务已经扣减过库存|{}", txMessage.getTxNo());
return true;
}
boolean isUpdate = false;
try{
isUpdate = seckillGoodsDomainService.updateAvailableStock(txMessage.getQuantity(), txMessage.getGoodsId());
//成功扣减库存成功
if (isUpdate){
distributedCacheService.put(SeckillConstants.getKey(SeckillConstants.GOODS_TX_KEY, String.valueOf(txMessage.getTxNo())), txMessage.getTxNo(), SeckillConstants.TX_LOG_EXPIRE_DAY, TimeUnit.DAYS);
}else{
//发送失败消息给订单微服务
rocketMQTemplate.send(SeckillConstants.TOPIC_ERROR_MSG, getErrorMessage(txMessage));
}
// int i = 1/0;
}catch (Exception e){
isUpdate = false;
logger.error("updateAvailableStock|抛出异常|{}|{}",txMessage.getTxNo(), e.getMessage());
//发送失败消息给订单微服务
rocketMQTemplate.send(SeckillConstants.TOPIC_ERROR_MSG, getErrorMessage(txMessage));
}
return isUpdate;
}
注意:在消费者执行本地事务时,如果发生异常,则执行逆向流程
- 发送者监听逆向流程消息
@Component
@RocketMQMessageListener(consumerGroup = SeckillConstants.TX_ORDER_CPNSUMER_GROUP, topic = SeckillConstants.TOPIC_ERROR_MSG)
public class OrderErrorMessageListener implements RocketMQListener<String> {
private final Logger logger = LoggerFactory.getLogger(OrderErrorMessageListener.class);
@Autowired
private SeckillOrderService seckillOrderService;
@Override
public void onMessage(String message) {
logger.info("onMessage|秒杀订单微服务开始消费消息:{}", message);
if (StrUtil.isEmpty(message)){
return;
}
//删除数据库中对应的订单
seckillOrderService.deleteOrder(this.getErrorMessage(message));
}
private ErrorMessage getErrorMessage(String msg){
JSONObject jsonObject = JSONObject.parseObject(msg);
String txStr = jsonObject.getString(SeckillConstants.ERROR_MSG_KEY);
return JSONObject.parseObject(txStr, ErrorMessage.class);
}
}
注意:该代码仅提供执行流程参考