一、分布式事务
事务(Transaction),一般是指要做的或所做的事情。在计算机术语中是指访问并可能更新数据库中各种数据项的一个程序执行单元(unit)。事务通常由高级数据库操纵语言或编程语言(如SQL,C++或Java)书写的用户程序的执行所引起,并用形如begin transaction和end transaction语句(或函数调用)来界定。事务由事务开始(begin transaction)和事务结束(end transaction)之间执行的全体操作组成。
摘自百度百科
提到事务我们很容易会想到事务的四大特性ACID,但是在分布式的情况下想实现和单机下一样的事务并不是一件容易的事情,目前一些常见的分布式事务解决方案有如下几种,**消息队列+本地事件表、2PC、3PC、TCC、基于RocketMQ的半消息机制等等。**这些方案都有各自的使用场景,个人理解在高并发的情况下基于RocketMQ的半消息机制来实现分布式事务是一种不错的解决方案,其他的方案例如2PC、3PC或多或少存在锁定资源的情况,所以今天的内容就是介绍以RocketMQ的半消息为基础来实现分布式事务。
二、原理分析
这里借用一下RocketMQ官方的图,从图中可以看出当订单支付后有对应4个分支的操作分别是:更新订单状态、更新物流、更新用户积分、清空购物车。这4个步骤应该在同一个事务中,但是由于分布式的情况通常我们很难做到一致性,所以我们会采用一种折中的手段:最终一致性。接下来我们学习一下如何使用RocketMQ来解决这一问题。
首先在上一篇文章中我们知道了RocketMQ有一种独特的机制,半消息,当消息处于“半事务消息”的状态时,消费者是无法获取到消息的,利用这一特性我们可以轻松的实现分布式事务。
流程如下:1、首先生产者向RocketMQ服务端投递一条半事务消息并等待服务端的响应
2、投递成功则开始执行本地事务(如果失败可以尝试重新投递,如果多次投递失败则需要人工介入)
3、根据本地事务执行的结果来告知RocketMQ服务端是否提交消息,如果执行成功则投递否则回滚
4、RocketMQ服务端将成功的消息投递给消费者,消费者进行消费。
这里需要注意的几点:
1、在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
2、消费者端最好对消费做好幂等性处理,防止重复消费。
三、代码演示
1、业务描述
支付成功修改订单流水状态为支付成功,订单修改为已支付。其中订单和支付处于两个微服务中使用不同的数据库。
2、生产者
2.1、依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
2、配置文件
rocketmq:
name-server: 192.168.111.152:9876
producer:
group: test-group
server:
port: 8080
3、核心代码1
package com.cmxy.producerdemo.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.cmxy.producerdemo.entity.LocalTransaction;
import com.cmxy.producerdemo.entity.PayFlow;
import com.cmxy.producerdemo.mapper.LocalTransactionMapper;
import com.cmxy.producerdemo.mapper.PayFlowMapper;
import com.cmxy.producerdemo.service.PayService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
@Service
@Slf4j
public class PayServiceImpl implements PayService {
@Autowired
private PayFlowMapper payFlowMapper;
@Autowired
private LocalTransactionMapper localTransactionMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public String paySuccess(String orderNo) {
//判断支付流水是否存在
PayFlow payFlow = payFlowMapper.selectOne(new LambdaQueryWrapper<PayFlow>().eq(PayFlow::getOrderNo, orderNo));
if (payFlow == null) {
log.error("支付流水不存在");
return "fail";
}
//校验完成,发送半事务消息;
String transactionId = UUID.randomUUID().toString().replace("-", "");
Message<String> message = MessageBuilder.withPayload("rocketMQTemplate transactional message ").
setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).build();
rocketMQTemplate.sendMessageInTransaction("TestTransaction", message, orderNo);
return "success";
}
/**
* 修改支付流水状态
*
* @param orderNo
*/
@Override
@Transactional
public boolean updatePayFlow(String orderNo, String transactionId) {
//将支付流水更新为已支付,并且插入一条本地事务数据表示当前事务执行完成
PayFlow payFlow = new PayFlow();
payFlow.setStatus(1);
int update = payFlowMapper.update(payFlow, new LambdaQueryWrapper<PayFlow>().eq(PayFlow::getOrderNo, orderNo));
int insert = localTransactionMapper.insert(new LocalTransaction(transactionId));
return update > 0 && insert > 0;
}
}
核心代码2
/**
* 由于在Springboot rocket mq start 2.1.0版本之后 @RocketMQTransactionListener移出了txProducerGroup属性
* 所以如果存在多个不同事务,需要从代码层面来区分。(之前是一个Listener对应一个)
*/
@Slf4j
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Autowired
private PayService payService;
@Autowired
private LocalTransactionMapper localTransactionMapper;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
log.info("开始执行本地事务");
String orderNo = (String) o;
String transactionId = message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID, String.class);
log.info("本地事务执行完成");
return payService.updatePayFlow(orderNo, transactionId) ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
}
/**
* 检查本地事务表 注意这里不要去查具体业务,只需要查本地事务表是否插入成功
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String transactionId = message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID, String.class);
LocalTransaction localTransaction = localTransactionMapper.selectOne(new LambdaQueryWrapper<LocalTransaction>().eq(LocalTransaction::getTransactionId, transactionId));
return localTransaction == null ? RocketMQLocalTransactionState.UNKNOWN : RocketMQLocalTransactionState.COMMIT;
}
}
3、消费者
消费者相对来说要简单的多,和普通消息一样。需要注意的是一定要处理好幂等性问题!
@Slf4j
@Component
@RocketMQMessageListener(topic = "TestTransaction", consumerGroup = "test-group") // topic、tag保持一致
public class OrderListener implements RocketMQListener<String> {
@Autowired
private OrderMapper orderMapper;
/**
* 重点:对于消费来说一定要做好幂等性!!!(当前demo偷个懒就不做了),重复消费可能会导致很严重的问题
* 笔者之前就因为没有注意幂等性,导致给客户多发了3张优惠券!!!
* @param message
*/
@Override
public void onMessage(String message) {
log.info("收到信息:{}",message);
Order order = orderMapper.selectOne(new LambdaQueryWrapper<Order>().eq(Order::getOrderNo, message));
if(order == null){
//这里如果有业务上的异常,需要记录日志,通知开发人员等等
throw new RuntimeException("订单不存在");
}
order.setStatus(1);
orderMapper.updateById(order);
}
}
四、总结
总的来说RocketMQ来实现分布式事务相对来说不难,个人感觉基于消息队列来实现分布式事务适用于业务上对于数据一致性要求没有那么高的,允许中间有一段时间数据不一致的场景;相较于2PC、3PC使用RocketMQ来实现分布式事务效率更高,且代码编写也不复杂。本案例中的代码笔者后续会放到github、gitee上。希望对你有所帮助