highlight: arduino-light
Rocket事务流程&源码分析
Rocket解决分布式事务流程
事务消息分 2 个阶段:
① 正常事务消息的发送与提交:
a.发送消息(half 消息)
b.服务响应消息写入结果
c.根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
d.根据本地事务状态执行 Commit 或者 Rollback ( Commit操作生成消息索引,消息对消费者可见)
② 事务消息的补偿流程(补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况)
a.对没有 Commit / Rolback 的事务消息( pending 状态的消息),从服务端发起-一次"回查"
b. Producer 收到回查消息,检查回查消息对应的本地事务的状态
c.根据本地事务状态,重新 Commit 或者 Rollback
3、事务的消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
① TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
② TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
③ TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
原文链接:https://blog.csdn.net/weixin_42367582/article/details/112639915
TransactionMQProducer发送事务消息
事务消息发送时,需要打上相应的标记PROPERTYTRANSACTIONPREPARED=true,与普通消息进行区分
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); 给broker发送消息后,根据返回状态,进行相应处理
给broker发送消息后,根据返回状态,进行相应处理
发送事务消息成功
LocalTransactionExecuter或者TransactionListener执行本地事务
并返回本地事务完成状态,包括UNKNOW、ROLLBACK、COMMIT
发送事务消息失败
FLUSHDISKTIMEOUT、FLUSHSLAVETIMEOUT、SLAVENOTAVAILABLE:
都是消息发送失败状态, 标记本地事务状态为ROLLBACK_MESSAGE
之后通过endTransaction将相应本地事务执行状态信息回传给broker.注意发送消息的方式为one way
broker端处理事务消息
TransactionMessageBridge,负责主要的事务消息存储逻辑。
half消息消费队列: prepare消息消费队列即预处理消息(prepare),事务消息首先进入此消息消费队列。
对应的TOPIC是:
RMQSYSTRANSHALFTOPIC
op消息消费队列:事务消息处理完成后,进入op消息消费队列,op消息消费队列主要用来记录事务消息完成状态。
RMQSYSTRANSOPHALF_TOPIC
EndTransactionProcessor的processRequest方法,处理producer端回传的事务状态
如果事务状态是commit,将消息还原成原来的topic和queueId,存储到commitLog中,并且删除预处理消息(prepare),然后将消息存储在主题为:RMQSYSTRANSOPHALF_TOPIC的主题中,代表这些消息已经被处理(提交或回滚)。
如果事务状态是rollback,删除掉prepare消息,同样也是将消息存储在主题为:RMQSYSTRANSOPHALF_TOPIC的主题中,代表这些消息已经被处理。
如果事务状态是unknown,broker定时执行回查。
定时任务回查
如果第一次producer返回的事务消息为UNKNOW,则需要进行事务回查
事务回查,broker端主要逻辑在TransactionalMessageService的check方法
prepare消息,会存储在RMQSYSTRANSHALFTOPIC消息队列中
prepare消息,被处理成功后(消息状态是回滚或者提交),会存储在RMQSYSTRANSOPHALF_TOPIC消息队列中。
所以通过回查prepare消息队列,可以对一些失败的事务消息,进行重试。
为了充分利用commitLog顺序写的特性,
回查时,只要发送了回查消息,pepare消息消费队列消费进度会往前推动,同时往prepare消息队列写入一条新的消息,如果回查失败,新增的消息可以再次发送回查消息。
如果回查成功,可以根据op消息队列中的消息,判断重复,避免重复发送回查消息。
producer端事务回查处理逻辑主要在TransactionListener的checkLocalTransaction方法,一般重写checkLocalTransaction方法,实现自定义的回查逻辑。
默认询问 15 次。
原文链接:https://blog.csdn.net/zycxnanwang/article/details/107431892
事务消息代码示例
```java public class Producer { public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 TransactionMQProducer producer = new TransactionMQProducer("group5"); //2.指定Nameserver地址 producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); //添加事务监听器 producer.setTransactionListener(new TransactionListener() { /* * 在该方法中执行本地事务 * @param msg * @param arg * @return */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { //接收到消息89757 try{ //1调用外部接口 //2判断外部接口成功更新数据库状态 //3判断外部接口失败不更新数据库状态 //如果走的是1和2 那么直接返回COMMIT_MESSAGE即可,这样不会触发回查。 return LocalTransactionState.COMMIT_MESSAGE; //如果走的是1和3 //这是正常情况 相当于2个事务都失败了 //执行了1没执行2(回查更新数据库) 或者 1和2都没执行(回查相当于重试) //目前存在的问题是如果调用外部接口成功 系统宕机,本地数据库没更新 那么返回的也是UNKNOW //因为Producer调用executeLocalTransaction方法时系统宕机 //触发回查,判断外部接口有没有调用成功和数据库到底有没有更新,如果没有调用或者没有更新 //那么根据情况调用接口和更新数据库即可。 return LocalTransactionState.UNKNOW; //还有一种情况是2个事务 A是本地事务 B是外部事务 C生成订单 //A执行成功 B执行失败 //比如A扣减库存B扣减余额,B的余额为0,扣减失败 //此时需要回滚A返回ROLLBACK_MESSAGE 此时消息不会发送给C return LocalTransactionState.ROLLBACK_MESSAGE; //总结 //本地事务执行成功,则返回COMMIT_MESSAGE,提交本地事务发送消息给外部事物系统 //本地事务执行失败,则返回ROLLBACK_MESSAGE //本地事务执行异常 返回UNKNOW 执行回查。 }catch(Exception e){ } } /* * 该方法是MQ进行消息事务状态回查 * @param msg * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("消息的Tag:" + msg.getTags()); return LocalTransactionState.COMMIT_MESSAGE; } }); //3.启动producer producer.start();
//4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ Message msg = new Message("TransactionTopic", tags[i], ("89757").getBytes()); //5.发送消息 SendResult result = producer.sendMessageInTransaction(msg, null); //发送状态 SendStatus status = result.getSendStatus(); System.out.println("发送结果:" + result); //线程睡1秒 TimeUnit.SECONDS.sleep(2); //6.关闭生产者producer //producer.shutdown(); } } ```
事务消息源码解读
TransactionMQProducer#sendMessageInTransaction
1.发送Prepared消息
2.获取发送结果
3.发送成功
4.执行本地事务
5.发送确认消息
```java public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { if (null == this.transactionListener) { throw new MQClientException("TransactionListener is null", null); } return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); }
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { //获取消息监听器 TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } //校验消息 Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
//设置属性TRAN_MSG 为 true 即事务消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
//设置生产者组
MessageAccessor.putProperty(msg,
MessageConst.PROPERTY_PRODUCER_GROUP,
this.defaultMQProducer.getProducerGroup());
try {
//发送事务消息
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
//默认消息状态为UNKNOWN
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
//获取发送消息结果
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
//根据sendResult重新设置消息的事务id
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__",
sendResult.getTransactionId());
}
String transactionId = msg
.getProperty
(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
//localTransactionExecuter是null
if (null != localTransactionExecuter) {
localTransactionState =
localTransactionExecuter
.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
//transactionListener不为空
log.debug("Used new transaction API");
//调用了Producer中自定义的事务监听器的executeLocalTransaction方法
//执行本地事务
localTransactionState = transactionListener
.executeLocalTransaction(msg, arg);
}
// 如果事务监听器执行完以后localTransactionState变成了null
// 即executeLocalTransaction方法的返回值是null
// 设置消息状态为UNKNOWN
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
//如果不是COMMIT_MESSAGE 打印日志
if (localTransactionState !=
LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}",
localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
//发送消息失败
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
//如果从节点为空 设置消息状态为回滚
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
//向服务器Broker发送事务消息的状态
//服务器Broker会根据消息状态判断是发送消息(Commit)还是删除消息(Rollback)
this.endTransaction
(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
//所以一定要判断事务消息的发送结果 不要以为没有异常就是发送成功了
//也有可能是发送失败
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
} ```
应用程序开启一个数据库事务,进行数据库操作,并且在事务中发送一条 PREPARE 消息,PREPARE 消息发送成功后通知应用程序记录本地事务状态,然后提交本地事务。
RocketMQ 在收到类型为 PREPARE 的消息时,首先备份消息的原主题与原消息消费队列,然后将消息存储在主题为 RMQSYSTRANSHALFTOPIC 的消息队列中,故 PREPARE 的消息是不会被客户端消费的。
Broker 消息服务器开启一个定时任务处理 RMQSYSTRANSHALFTOPIC 中的消息,会每隔指定时间向消息发送者发起事务状态查询请求 ,询问消息发送者客户端本地事务是否成功,然后根据回查状态决定是提交还是回滚,即对处于 PREPARE 状态进行提交或回滚操作。
发送者如果明确得知事务成功,则可以返回 COMMIT,服务端会提交该条消息,具体操作是恢复原消息的主题与队列,重新发送到 Broker,消费端感知后消费。
发送者如果无法明确得知事务状态,则返回 UNOWN,此时服务端会等待一定时间后再次向发送者询问,默认询问 15 次。
发送者如果非常明确得知事务失败,则可以返回 ROLLBACK。
在具体实践中,消息发送者在无法获取事务状态时不要武断的返回 ROLLBACK,而是要返回 UNOWN,让服务端定时重试回查,说明如下:
在将 PREPARE 消息发送到 Broker 后,服务端发起事务查询时本地事务可能还未提交,为了避免无效的事务回查机制,RocketMQ 通常至少在收到 PREPARE 消息 6s 后才会发起第一次事务回查,可通过 transactionTimeOut 配置。故客户端在实现事务回查时无法证明事务状态时不应该返回 ROLLBACK,而是返回 UNOWN。
6.应用场景:绑定管家卡
方案1:
1.执行开户,本地事务持久化
2.调用工行绑定管家卡接口
本地事务执行成功,此时宕机,调用第三方接口失败。
方案2:
1.调用工行绑定管家卡接口
2.执行开户,本地事务持久化
调用第三方接口成功,此时宕机,本地事务执行失败。
由于工行绑定管家卡是第三方接口,所以不受本地事务限制。
方案3:
1.发送事务消息
2.判断事务消息发送状态
3.如果发送成功,执行本地事务:开户持久化+调用绑定管家卡接口
4.判断本地事务:开户持久化+调用绑定管家卡接口返回的结果
5.如果结果是成功:什么也不做
6.如果结果是失败:那么等待回查
参考事务消息源码:https://blog.csdn.net/zycxnanwang/article/details/107431892