分布式事务中间件对⽐与选择
- tx-lcn
- EasyTransaction
- ByteTCC
- Seata
Seata实现分布式事务
我们主要以Seata的分布式事务框架进行介绍分析,相关的并且针对于其三种模式进行分别说明介绍。
搭建Seata Server
前往https://github.com/seata/seata/releases 下载Seata安装包,本书使⽤Seata 1.0.0。将⽬录切换⾄Seata根⽬录,根据操作系统,执⾏对应命令,即可启动Seata Server。
Linux/Unix/Mac
sh ./bin/seata-server.sh
Windows
bin\seata-server.bat
启动时,也可指定参数
$ sh ./bin/seata-server.sh -p 8091 -h 127.0.0.1 -m file
⽀持的参数如下表所示
Seata AT模式
- ⼀阶段:业务数据和回滚⽇志记录在同⼀个本地事务中提交,释放本地锁和连接资源。
- ⼆阶段:提交异步化,⾮常快速地完成。回滚通过⼀阶段的回滚⽇志进⾏反向补偿。
官⽅⽂档
http://seata.io/zh-cn/docs/dev/mode/at-mode.html
代码演示
Maven依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
配置
seata:
tx-service-group: content-center-seata-service-group
service:
vgroup-mapping:
content-center-seata-service-group: seata-cluster
grouplist:
seata-cluster: 127.0.0.1:8091
disable-global-transaction: false
配置说明
- tx-service-group:事务分组,默认是 ${spring.application.name}-seata-service-group ,唯⼀即可。
- vgroup-mapping:事务分组映射,表示 tx-service-group 对应到哪个Seata Server集群。
- key是tx-service-group的值,value是集群名称,唯⼀即可
- grouplist:集群中所包含的Seata Server的地址列表,key是vgroup-mapping中value的值,
- value是Seata Server的地址列表
- disable-global-transaction:是否开启全局事务开关,默认false。
在Seata1.0.0中,该配置⽆法正常读取,这是⼀个Bug,详⻅ https://github.com/seata/seata/issues/2114 ,好在,该配置的默认值就是false,所以不影响使⽤。
创建Seata事务记录表
-- auto-generated definition
create table undo_log (
id bigint auto_increment comment 'increment id' primary key,
branch_id bigint not null comment 'branch transaction id',
xid varchar(100) not null comment 'global transaction id',
context varchar(128) not null comment 'undo_log context,such as serialization',
rollback_info longblob not null comment 'rollback info',
log_status int not null comment '0:normal status,1:defense status',
log_created datetime not null comment 'create datetime',
log_modified datetime not null comment 'modify datetime',
constraint ux_undo_log
unique (xid, branch_id)
) comment 'AT transaction mode undo table' charset = utf8;
Controller代码:
private final ShareSeataService shareSeataService;
@PutMapping("/audit/seata1/{id}")
public Share auditByIdSeata1(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) {
return this.shareSeataService.auditById(id, auditDTO);
}
Service代码:
- 审核中心服务代码(含调用用户中心代码接口)
@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareSeataService {
private final ShareMapper shareMapper;
private final UserCenterFeignClient userCenterFeignClient;
@GlobalTransactional(rollbackFor = Exception.class)
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
userCenterFeignClient.addBonus(id, 50);
// 故意抛异常,如果⽤户中⼼侧也能回滚,说明实现了分布式事务
// throw new IllegalArgumentException("发⽣异常...");
}
this.auditByIdInDB(id, auditDTO);
return this.shareMapper.selectByPrimaryKey(id);
}
- 审核中心服务代码(执行操作更新数据库机制)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString()).reason(auditDTO.getReason())
.build();
this.shareMapper.updateByPrimaryKeySelective(share);
}
@GlobalTransactional 注解⽤来创建分布式事务。
被调⽤⽅代码
Maven依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
配置
seata:
tx-service-group: user-center-seata-service-group
service:
vgroup-mapping:
user-center-seata-service-group: seata-cluster
grouplist:
seata-cluster: 127.0.0.1:8091
disable-global-transaction: false
可以看出来,差距主要体现在tx-service-group的值。
Controller代码:
@GetMapping("/add-bonus/{id}/{bonus}")
public User addBonus(@PathVariable Integer id, @PathVariable Integer bonus) {
this.userService.addBonus(id, bonus);
return this.userService.findById(id);
}
Service代码:
@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class UserService {
private final UserMapper userMapper;
private final BonusEventLogMapper bonusEventLogMapper;
public User findById(Integer id) {
// select * from user where id = #{id}
return this.userMapper.selectByPrimaryKey(id);
}
public void addBonus(Integer userId, Integer bonus) {
// 1. 为⽤户加积分
User user = this.userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + bonus);
this.userMapper.updateByPrimaryKeySelective(user);
// 2. 记录⽇志到bonus_event_log表⾥⾯
this.bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE").createTime(new Date()).description("投稿加积分..").build());
log.info("积分添加完毕...");
}
}
Seata TCC模式
- ⼀阶段 prepare ⾏为
- ⼆阶段 commit 或 rollback ⾏为
需要实现的3个⽅法:
- ⼀阶段:
- ⽤于业务预处理的⽅法,即 Try 阶段、的⽅法,⽐如冻结⽤户的部分余额等等;
- ⼆阶段:
- ⽤于提交业务的⽅法,即 Commit ⽅法,⽐如扣除⽤户之前冻结的部分余额;
- ⽤于回滚业务的⽅法,即 Rollback ⽅法,⽐如返还之前冻结的⽤户余额;
官⽅⽂档
http://seata.io/zh-cn/docs/dev/mode/tcc-mode.html
代码演示
行为操作接口
@LocalTCC
public interface TccActionOne {
@TwoPhaseBusinessAction(name = "TccActionOne", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepare(BusinessActionContext actionContext, int a);
boolean commit(BusinessActionContext actionContext);
boolean rollback(BusinessActionContext actionContext);
}
接口实现类
- 实现类1
@Component
public class TccActionOneImpl implements TccActionOne {
@Override
public boolean prepare(BusinessActionContext actionContext, int a) {
// 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionOne prepare, xid:" + xid);
return true;
}
@Override
public boolean commit(BusinessActionContext actionContext) {
// 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionOne commit, xid:" + xid);
ResultHolder.setActionOneResult(xid, "T");
return true;
}
@Override
public boolean rollback(BusinessActionContext actionContext) {
// 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionOne rollback, xid:" + xid);
ResultHolder.setActionOneResult(xid, "R");
return true;
}
}
@LocalTCC
public interface TccActionTwo {
@TwoPhaseBusinessAction(name = "TccActionTwo", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepare(BusinessActionContext actionContext, int a);
boolean commit(BusinessActionContext actionContext);
boolean rollback(BusinessActionContext actionContext);
}
- 实现类2
@Component
public class TccActionTwoImpl implements TccActionTwo {
@Override
public boolean prepare(BusinessActionContext actionContext, String b) {
// 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionTwo prepare, xid:" + xid);
return true;
}
@Override
public boolean commit(BusinessActionContext actionContext) {
// 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionTwo commit, xid:" + xid);
ResultHolder.setActionTwoResult(xid, "T");
return true;
}
@Override
public boolean rollback(BusinessActionContext actionContext) {
// 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionTwo rollback, xid:" + xid);
ResultHolder.setActionTwoResult(xid, "R");
return true;
}
}
- 聚合实现服务业务实现类执行
@Service
public class ShareSeataService{
@Autowired
TccActionOne tccActionOne;
@Autowired
TccActionTwo tccActionTwo;
@GlobalTransactional
public void tccTransactionCommit(Map<String, String> paramMap) {
//第一个TCC 事务参与者
boolean result = tccActionOne.prepare(null, "one");
if (!result) {
paramMap.put("xid",RootContext.getXID());
throw new RuntimeException("TccActionOne failed.");
}
List list = new ArrayList();
list.add("c1");
list.add("c2");
result = tccActionTwo.prepare(null, "two");
if (!result) {
paramMap.put("xid",RootContext.getXID());
throw new RuntimeException("TccActionTwo failed.");
}
paramMap.put("xid",RootContext.getXID());
return ;
}
}
// 回滚的代码相似,就不写了
- 执行调用点
@Slf4j
@RestController
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareAdminController {
private final ShareSeataService shareSeataService;
@GetMapping("tcc-commit")
public String tccTransactionCommit() {
Map<String, String> map = new HashMap<>();
this.shareSeataService.tccTransactionCommit(map);
String xid = map.get("xid");
// 结果T
return ResultHolder.getActionOneResult(xid);
}
@GetMapping("/tcc-rollback")
public String tccTransactionRollback() {
Map<String, String> map = new HashMap<>();
try {
this.shareSeataService.tccTransactionRollback(map);
} catch (Throwable t) {
log.warn("事务回滚..", t);
}
String xid = map.get("xid");
// 结果R
return ResultHolder.getActionOneResult(xid);
}
}
定义状态机⽂件:
{
"Name": "reduceInventoryAndBalance",
"Comment": "reduce inventory then reduce balance in a transaction",
"StartState": "ReduceInventory",
"Version": "0.0.1",
"States": {
"ReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceInventory",
"Next": "ChoiceState",
"Input": [
"$.[businessKey]",
"$.[count]"
],
"Output": {
"reduceInventoryResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
}
},
"ChoiceState": {
"Type": "Choice",
"Choices": [
{
"Expression": "[reduceInventoryResult] == true",
"Next": "ReduceBalance"
}
],
"Default": "Fail"
},
"ReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceBalance",
"Input": [
"$.[businessKey]",
"$.[amount]",
{
"throwException": "$.[mockReduceBalanceFail]"分布式事务27
}
],
"Output": {
"compensateReduceBalanceResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
}
],
"Next": "Succeed"
},
"CompensateReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "compensateReduce",
"Input": [
"$.[businessKey]"
]
},
"CompensateReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "compensateReduce",
"Input": [
"$.[businessKey]"
]
},
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"Succeed": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail",
"ErrorCode": "PURCHASE_FAILED",
"Message": "purchase failed"
}
}
}
测试代码:
public class LocalSagaTransactionStarter {
public static void main(String[] args) {
AbstractApplicationContext applicationContext = new ClassPathXmlApplication
Context(new String[] {"spring/seata-saga.xml"});
StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationCon
text.getBean("stateMachineEngine");
transactionCommittedDemo(stateMachineEngine);
transactionCompensatedDemo(stateMachineEngine);
new ApplicationKeeper(applicationContext).keep();
}
private static void transactionCommittedDemo(StateMachineEngine stateMachineEng
ine) {
Map<String, Object> startParams = new HashMap<>(3);
String businessKey = String.valueOf(System.currentTimeMillis());
startParams.put("businessKey", businessKey);
startParams.put("count", 10);
startParams.put("amount", new BigDecimal("100"));
//sync test
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduce
InventoryAndBalance", null, businessKey, startParams);
Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transactio
n execute failed. XID: " + inst.getId());
System.out.println("saga transaction commit succeed. XID: " + inst.getId())
;
inst = stateMachineEngine.getStateMachineConfig().getStateLogStore().getSta
teMachineInstanceByBusinessKey(businessKey, null);
Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transactio
n execute failed. XID: " + inst.getId());
//async test
businessKey = String.valueOf(System.currentTimeMillis());
inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBala
nce", null, businessKey, startParams, CALL_BACK);
waittingForFinish(inst);
Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transactio
n execute failed. XID: " + inst.getId());
分布式事务
29
System.out.println("saga transaction commit succeed. XID: " + inst.getId())
;
}
private static void transactionCompensatedDemo(StateMachineEngine stateMachineE
ngine) {
Map<String, Object> startParams = new HashMap<>(4);
String businessKey = String.valueOf(System.currentTimeMillis());
startParams.put("businessKey", businessKey);
startParams.put("count", 10);
startParams.put("amount", new BigDecimal("100"));
startParams.put("mockReduceBalanceFail", "true");
//sync test
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduce
InventoryAndBalance", null, businessKey, startParams);
//async test
businessKey = String.valueOf(System.currentTimeMillis());
inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBala
nce", null, businessKey, startParams, CALL_BACK);
waittingForFinish(inst);
Assert.isTrue(ExecutionStatus.SU.equals(inst.getCompensationStatus()), "sag
a transaction compensate failed. XID: " + inst.getId());
System.out.println("saga transaction compensate succeed. XID: " + inst.getI
d());
}
private static volatile Object lock = new Object();
private static AsyncCallback CALL_BACK = new AsyncCallback() {
@Override
public void onFinished(ProcessContext context, StateMachineInstance stateMach
ineInstance) {
synchronized (lock){
lock.notifyAll();
}
}
@Override
public void onError(ProcessContext context, StateMachineInstance stateMachine
Instance, Exception exp) {
synchronized (lock){
lock.notifyAll();
}
}
};
private static void waittingForFinish(StateMachineInstance inst){
synchronized (lock){
if(ExecutionStatus.RU.equals(inst.getStatus())){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
Seata saga模式
Saga 事务:最终一致性
方案简介
Saga 事务核心思想是将长事务拆分为多个本地短事务,由 Saga 事务协调器协调,如果正常结束那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。
处理流程
Saga事务基本协议
- 每个 Saga 事务由一系列幂等的有序子事务(sub-transaction) Ti 组成。
- 每个 Ti 都有对应的幂等补偿动作 Ci,补偿动作用于撤销 Ti 造成的结果。
- 可以看到,和 TCC 相比,Saga 没有“预留”动作,它的 Ti 就是直接提交到库。
代码实现
// 接⼝略
public class BalanceActionImpl implements BalanceAction {
private static final Logger LOGGER = LoggerFactory.getLogger(BalanceActionImpl.class);
@Override
public boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params) {
if(params != null && "true".equals(params.get("throwException"))){
throw new RuntimeException("reduce balance failed");
}
LOGGER.info("reduce balance succeed, amount: " + amount + ", businessKey:" + businessKey);
return true;
}
@Override
public boolean compensateReduce(String businessKey, Map<String, Object> params) {
if(params != null && "true".equals(params.get("throwException"))){
throw new RuntimeException("compensate reduce balance failed");
}
LOGGER.info("compensate reduce balance succeed, businessKey:" + businessKey);
return true;
}
}
// 接⼝略
public class InventoryActionImpl implements InventoryAction {
private static final Logger LOGGER = LoggerFactory.getLogger(InventoryActionImpl.class);
@Override
public boolean reduce(String businessKey, int count) {
LOGGER.info("reduce inventory succeed, count: " + count + ", businessKey:" + businessKey);
return true;
}
@Override
public boolean compensateReduce(String businessKey) {
LOGGER.info("compensate reduce inventory succeed, businessKey:" + businessKey);
return true;
}
}
Seata XA模式
官⽅⽂档
http://seata.io/zh-cn/docs/dev/mode/xa-mode.html
代码演示
- 添加Seata的依赖 & 配置
- 你平时的JDBC代码怎么写,依然怎么写。
4种模式对⽐与选择
AT
优势:
- 使⽤简单:对业务侵⼊性⼩
缺点:
- 性能中等
- 有全局锁
适⽤场景:
- 适⽤于对性能没有特别⾼的要求的场景
- 适⽤于不希望对业务进⾏改造的场景
TCC
优势
- 性能会⽐ AT 模式⾼很多
缺点
相对于 AT 模式,TCC 模式对业务代码有⼀定的侵⼊性
- 适⽤场景:
适⽤于核⼼系统等对性能有很⾼要求的场景
Saga
优势:
- ⼀阶段提交本地数据库事务,⽆锁,⾼性能;
- 参与者可以采⽤事务驱动异步执⾏,⾼吞吐;
- 补偿服务即正向服务的“反向”,易于理解,易于实现;
缺点:
- Saga 模式由于⼀阶段已经提交本地数据库事务,且没有进⾏“预留”动作,所以不能保证隔离性。后续会讲到对于缺乏隔离性的应对措施。
适⽤场景:
- 业务流程⻓/多
- 参与者包含其他公司或遗留系统服务,⽆法提供 TCC 模式要求的三个接⼝
- 典型业务系统:如⾦融⽹络(与外部⾦融机构对接)、互联⽹微贷、渠道整合、分布式架构服务
- 集成等业务系统
- 银⾏业⾦融机构使⽤⼴泛
XA
优势:
- ⽆侵⼊
缺点:
- 性能较差
- 需要数据库⽀持XA
适⽤场景:
- 强⼀致性的解决⽅案,适⽤于对⼀致性要求⾮常⾼的场景(使⽤较少)