前言
分布式事务?
分布式事务是指涉及多个参与者的事务,这些参与者可能分布在不同的计算机、进程或者网络中。分布式事务需要保证ACID属性,即原子性、一致性、隔离性和持久性
解释
现在我们接触的系统基本上都是分布式系统,并且每个系统都会有自己的业务领域,当涉及到系统交互的时候,必然会涉及相关数据库数据的变动,一般的来说,每个业务系统都会有一个自己单独的数据源(不管是物理的,还是逻辑的),在目前来说,在spring体系下数据库的事务不支持跨应用,这样子就会导致数据一致性问题,即分布式事务问题。
实现方案
事务补偿
建表
CREATE TABLE `tx_compensation_task` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`out_key` varchar(32) DEFAULT NULL COMMENT '外部主键key',
`biz_type` int(10) NOT NULL COMMENT '业务类型',
`times` int(10) NOT NULL DEFAULT '0' COMMENT '已经执行次数\n',
`created` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`last_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '状态0-待执行,1-执行中,3-成功,4-失败',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
执行流程
解释
- 进行本地业务,添加一条补偿任务,这个任务是实际需要调用的二方接口,然后再继续执行自己的业务
- 通过外部的任务驱动拉起相关的任务,获取到相应的out_key,然后根据相应out_key执行对应的逻辑
事务型消息
在rocketmq中提供了分布式事务的解决方案–事务消息
整体执行流程
解释
-
生产者将半事务消息发送至
RocketMQ Broker
。 -
RocketMQ Broker
将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。 -
生产者开始执行本地事务逻辑。
-
生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
-
二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
-
二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
-
-
在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
-
:::note 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置 :::
事务消息回查步骤如下: 7. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 8. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
源码分析
发送半消息
public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter localTransactionExecuter, Object arg) throws MQClientException {
TransactionListener transactionListener = this.getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", (Throwable)null);
} else {
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
MessageAccessor.putProperty(msg, "TRAN_MSG", "true");
MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());
try {
//发送一次半消息
sendResult = this.send(msg);
} catch (Exception var11) {
throw new MQClientException("send message Exception", var11);
}
发送消息流程
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
//这里发送消息
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
Broker接受消息
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
//执行发送消息,其实这里就是往store里面加数据
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}
执行本地事务1-1
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK:
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty("UNIQ_KEY");
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
this.log.debug("Used new transaction API");
//执行本地事务事务,并返回一个状态
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
//如果是这种情况的话,其实是不知道本地事务执行的结果,成功或者失败
//这里本地事务状态如果没有的话,就给一个UNKNOW,让事务回查线程去做check
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
this.log.info(msg.toString());
}
} catch (Throwable var10) {
this.log.info("executeLocalTransactionBranch exception", var10);
this.log.info(msg.toString());
localException = var10;
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
}
执行本地事务1-2
public class MyTransactionListener implements RocketMQLocalTransactionListener {
//本地事务方法
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
//执行本地事务
try {
System.out.println("执行本地事务");
System.out.println(message.getPayload());
} catch (Exception e) {
//如果发生了异常,返回ROLLBACK状态
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
//没有发生错误的话,返回COMMIT状态
return RocketMQLocalTransactionState.COMMIT;
}
发送真实消息
public void endTransactionOneway(
final String addr,
final EndTransactionRequestHeader requestHeader,
final String remark,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
request.setRemark(remark);
//发送最终的消息,前提是 -- 半事务消息的状态是明确的
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
}
事务状态回查线程
在broker启动的时候启动
- org.apache.rocketmq.broker.BrokerStartup#main
- org.apache.rocketmq.broker.BrokerStartup#start
- org.apache.rocketmq.broker.BrokerStartup#createBrokerController
- org.apache.rocketmq.broker.BrokerController#initialize
- org.apache.rocketmq.broker.BrokerController#initialTransaction
- org.apache.rocketmq.common.ServiceThread#waitForRunning
- org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd
- org.apache.rocketmq.broker.transaction.TransactionalMessageService#check