前言
在分布式系统中,数据一致性是一个核心问题。随着微服务架构的普及,跨服务、跨数据库的操作变得越来越普遍,如何保证这些操作的原子性、一致性、隔离性和持久性(ACID)成为了一个极具挑战性的任务。本文将全面介绍 2PC、TCC、最大努力通知、本地消息表、Saga、XA 协议、可靠消息传输、Seata AT 模式 等主流分布式事务解决方案,并通过 Java 示例代码和 流程图帮助您更直观地理解每种方案。
一、2PC(两阶段提交)
简介
两阶段提交(2PC)是一种经典的分布式事务协议,分为准备阶段和提交阶段两个步骤。它通过协调者(Coordinator)来控制多个参与者的事务执行。
工作原理
- 准备阶段:协调者询问所有参与者是否可以提交事务。
- 提交阶段:如果所有参与者都同意,则协调者发送提交命令;否则回滚事务。
UML图
Java 示例代码
interface Participant {
boolean prepare();
void commit();
void rollback();
}
class ParticipantA implements Participant {
@Override
public boolean prepare() {
System.out.println("参与者A:准备完成");
return true;
}
@Override
public void commit() {
System.out.println("参与者A:提交完成");
}
@Override
public void rollback() {
System.out.println("参与者A:回滚完成");
}
}
class TwoPhaseCommitCoordinator {
private List<Participant> participants;
public TwoPhaseCommitCoordinator(List<Participant> participants) {
this.participants = participants;
}
public void executeTransaction() {
boolean allPrepared = true;
for (Participant participant : participants) {
if (!participant.prepare()) {
allPrepared = false;
break;
}
}
if (allPrepared) {
for (Participant participant : participants) {
participant.commit();
}
} else {
for (Participant participant : participants) {
participant.rollback();
}
}
}
}
二、TCC(Try-Confirm-Cancel)
简介
TCC 是一种补偿型的分布式事务解决方案,将业务逻辑分为三个阶段:
- 尝试(Try):预留资源。
- 确认(Confirm):真正执行业务操作。
- 取消(Cancel):撤销预留资源。
UML图
Java 示例代码
interface TccTransaction {
boolean tryOperation();
void confirm();
void cancel();
}
class ServiceA implements TccTransaction {
@Override
public boolean tryOperation() {
System.out.println("服务A:尝试操作");
return true;
}
@Override
public void confirm() {
System.out.println("服务A:确认操作");
}
@Override
public void cancel() {
System.out.println("服务A:取消操作");
}
}
class TccTransactionManager {
private List<TccTransaction> services;
public TccTransactionManager(List<TccTransaction> services) {
this.services = services;
}
public void executeTransaction() {
boolean allTrySuccess = true;
for (TccTransaction service : services) {
if (!service.tryOperation()) {
allTrySuccess = false;
break;
}
}
if (allTrySuccess) {
for (TccTransaction service : services) {
service.confirm();
}
} else {
for (TccTransaction service : services) {
service.cancel();
}
}
}
}
三、最大努力通知
简介
最大努力通知是一种基于最终一致性的分布式事务解决方案,适用于对实时性要求不高但可靠性要求较高的场景。
工作原理
- 发送通知:支付服务向订单服务发送支付成功的通知。
- 重试机制:如果通知失败,则记录失败日志并通过定时任务重试,直到成功或达到最大重试次数。
UML图
Java 示例代码
public class PaymentService {
private OrderService orderService;
public PaymentService(OrderService orderService) {
this.orderService = orderService;
}
public void notifyPaymentSuccess(String orderId) {
try {
boolean success = orderService.notifyPayment(orderId);
if (!success) {
saveToRetryTable(orderId);
}
} catch (Exception e) {
saveToRetryTable(orderId);
}
}
private void saveToRetryTable(String orderId) {
System.out.println("保存到重试表:orderId=" + orderId);
}
}
public class RetryTask {
private PaymentService paymentService;
public RetryTask(PaymentService paymentService) {
this.paymentService = paymentService;
}
public void executeRetryTask() {
List<String> retryOrders = fetchFromRetryTable();
for (String orderId : retryOrders) {
paymentService.notifyPaymentSuccess(orderId);
}
}
private List<String> fetchFromRetryTable() {
return List.of("order1", "order2");
}
}
四、本地消息表
简介
本地消息表通过在每个服务中维护一个消息表来记录需要发送的消息,并通过异步方式将消息发送出去。
工作原理
- 写入本地消息表:业务操作完成后,将消息插入消息表。
- 异步发送消息:通过后台任务发送消息。
UML图
Java 示例代码
public class LocalMessageService {
public void sendMessage(String message) {
// 执行业务操作
businessOperation();
// 插入本地消息表
insertIntoLocalMessageTable(message);
// 异步发送消息
asyncSendMessage();
}
private void businessOperation() {
System.out.println("执行业务操作");
}
private void insertIntoLocalMessageTable(String message) {
System.out.println("插入本地消息表:" + message);
}
private void asyncSendMessage() {
System.out.println("异步发送消息");
}
}
五、Saga
简介
Saga 是一种基于事件驱动的长事务管理方法,使用正向操作和补偿操作来实现最终一致性。
工作原理
- 正向操作:依次执行事务中的每个步骤。
- 补偿操作:如果某个步骤失败,则依次回滚前面的步骤。
UML图
Java 示例代码
interface SagaStep {
void execute();
void compensate();
}
class StepA implements SagaStep {
@Override
public void execute() {
System.out.println("执行步骤A");
}
@Override
public void compensate() {
System.out.println("补偿步骤A");
}
}
class StepB implements SagaStep {
@Override
public void execute() {
System.out.println("执行步骤B");
}
@Override
public void compensate() {
System.out.println("补偿步骤B");
}
}
class SagaManager {
private List<SagaStep> steps;
public SagaManager(List<SagaStep> steps) {
this.steps = steps;
}
public void executeSaga() {
int stepIndex = 0;
try {
for (SagaStep step : steps) {
step.execute();
stepIndex++;
}
} catch (Exception e) {
for (int i = stepIndex - 1; i >= 0; i--) {
steps.get(i).compensate();
}
}
}
}
六、XA 协议
简介
XA 协议是 X/Open 组织提出的分布式事务处理标准,定义了事务管理器(TM)和资源管理器(RM)之间的接口。XA 协议的核心思想是通过两阶段提交来实现分布式事务。
UML图
Java 示例代码
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
public class XADemo {
public static void main(String[] args) throws Exception {
XAResource resource1 = new MyXAResource();
XAResource resource2 = new MyXAResource();
Xid xid = new MyXid(1, new byte[]{0x01}, new byte[]{0x02});
// 开始事务
resource1.start(xid, XAResource.TMNOFLAGS);
resource2.start(xid, XAResource.TMNOFLAGS);
// 执行操作
resource1.end(xid, XAResource.TMSUCCESS);
resource2.end(xid, XAResource.TMSUCCESS);
// 准备提交
int result1 = resource1.prepare(xid);
int result2 = resource2.prepare(xid);
if (result1 == XAResource.XA_OK && result2 == XAResource.XA_OK) {
resource1.commit(xid, false);
resource2.commit(xid, false);
} else {
resource1.rollback(xid);
resource2.rollback(xid);
}
}
}
七、可靠消息传输
简介
可靠消息传输依赖于消息队列(如 Kafka、RabbitMQ)来保证消息的可靠传递。通常结合本地事务和消息队列来实现最终一致性。
UML图
Java 示例代码
public class ReliableMessageProducer {
public void sendMessage(String message) {
// 执行业务操作
businessOperation();
// 发送消息到消息队列
sendToMessageQueue(message);
}
private void businessOperation() {
System.out.println("执行业务操作");
}
private void sendToMessageQueue(String message) {
System.out.println("发送消息到消息队列:" + message);
}
}
public class ReliableMessageConsumer {
public void consumeMessage(String message) {
// 消费消息并执行业务逻辑
processMessage(message);
// 确认消费
acknowledgeMessage();
}
private void processMessage(String message) {
System.out.println("处理消息:" + message);
}
private void acknowledgeMessage() {
System.out.println("确认消费");
}
}
八、Seata AT 模式
简介
Seata 是一种开源的分布式事务解决方案,支持多种模式来解决分布式事务问题,其中 AT(Auto Transaction)模式是一种无侵入式的解决方案,适用于微服务架构下的分布式事务管理。
Seata AT 模式的工作流程
- 一阶段提交:业务数据和回滚日志记录在同一个本地事务中提交到数据库。
- 二阶段提交:如果所有分支事务都成功,则全局事务提交;如果有任何一个分支失败,则根据回滚日志进行补偿操作。
UML图
Java 示例代码
import io.seata.rm.datasource.DataSourceProxy;
import javax.sql.DataSource;
public class SeataATDemo {
private DataSource dataSource;
public SeataATDemo(DataSource originalDataSource) {
// 使用 Seata 的 DataSourceProxy 包装原始的数据源
this.dataSource = new DataSourceProxy(originalDataSource);
}
public void executeBusinessLogic() throws Exception {
// 假设这里执行一些涉及多个服务或数据库的操作
String sql = "UPDATE account SET balance = balance - 100 WHERE user_id = 'user1';";
try (java.sql.Connection connection = dataSource.getConnection();
java.sql.PreparedStatement statement = connection.prepareStatement(sql)) {
statement.executeUpdate();
}
}
}
总结
分布式事务的解决方案多种多样,每种方法都有其适用场景:
- 2PC 和 XA 适合强一致性场景,但性能开销较大。
- TCC 和 Saga 提供了更高的灵活性,但需要手动设计补偿逻辑。
- 最大努力通知 和 本地消息表 更适合最终一致性场景,实现简单且高效。
- 可靠消息传输 依赖消息队列实现高可靠性和最终一致性。
- Seata AT 模式 提供了一种无侵入式的解决方案,特别适用于微服务架构,简化了分布式事务的处理。
希望本文能为您提供全面的理解,并帮助您在实际项目中选择合适的分布式事务解决方案!