RocketMQ源码阅读-十-事务消息
- 交互流程
- 事务消息发送
- Producer发送事务消息
- Broker处理结束事务请求
- Broker 生成 ConsumeQueue
- 事务消息回查
- Broker发起回查
- Producer 接收回查
- 总结
交互流程
事务消息交互流程图如下:
事务消息发送步骤如下:
- 生产者将半事务消息发送至云消息队列 RocketMQ 版服务端。
- 云消息队列 RocketMQ 版服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。
- 生产者开始执行本地事务逻辑。
- 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
- 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
- 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
事务消息回查步骤如下:
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
事务消息发送
Producer发送事务消息
Producer发送事务消息的代码在类DefaultMQProducerImpl#sendMessageInTransaction方法中,源码如下:
/**
* 发送事务消息
*
* @param msg 消息
* @param tranExecuter 【本地事务】执行器
* @param arg 【本地事务】执行器参数
* @return 事务发送结果
* @throws MQClientException 当 Client 发生异常时
*/
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
throws MQClientException {
if (null == tranExecuter) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
// 发送【Half消息】
SendResult sendResult;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
// 处理发送【Half消息】结果
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
// 发送【Half消息】成功,执行【本地事务】逻辑
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) { // 事务编号。目前开源版本暂时没用到,猜想ONS在使用。
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
// 执行【本地事务】逻辑
localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
// 发送【Half消息】失败,标记【本地事务】状态为回滚
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
// 结束事务:提交消息 COMMIT / ROLLBACK
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
// 返回【事务发送结果】
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
事务的最后结束,执行Commit或者rollback,调用的方法:
/**
* 结束事务:提交消息 COMMIT / ROLLBACK
*
* @param sendResult 发送【Half消息】结果
* @param localTransactionState 【本地事务】状态
* @param localException 执行【本地事务】逻辑产生的异常
* @throws RemotingException 当远程调用发生异常时
* @throws MQBrokerException 当 Broker 发生异常时
* @throws InterruptedException 当线程中断时
* @throws UnknownHostException 当解码消息编号失败是
*/
public void endTransaction(//
final SendResult sendResult, //
final LocalTransactionState localTransactionState, //
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
// 解码消息编号
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
// 创建请求
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 提交消息 COMMIT / ROLLBACK。!!!通信方式为:Oneway!!!
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());
}
至此,Producer对于事务消息的处理就结束了,后面的逻辑交给Broker处理。
Broker处理结束事务请求
Broker处理事务消息在类EndTransactionProcessor中:
/**
* 结束事务,提交/回滚消息
*
* @param ctx ctx
* @param request 请求
* @return 响应
* @throws RemotingCommandException 当解析请求失败时
*/
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
// 打印日志(只处理 COMMIT / ROLLBACK)
if (requestHeader.getFromTransactionCheck()) {
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("check producer[{}] transaction state, but it's pending status."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
return null;
}
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
LOGGER.warn("check producer[{}] transaction state, the producer commit the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
}
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("check producer[{}] transaction state, the producer rollback the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
}
default:
return null;
}
} else {
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("the producer[{}] end transaction in sending message, and it's pending status."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
return null;
}
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
break;
}
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("the producer[{}] end transaction in sending message, rollback the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
}
default:
return null;
}
}
// 查询提交的消息
final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset());
if (msgExt != null) {
// 校验 producerGroup
final String pgroupRead = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (!pgroupRead.equals(requestHeader.getProducerGroup())) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the producer group wrong");
return response;
}
// 校验 队列位置
if (msgExt.getQueueOffset() != requestHeader.getTranStateTableOffset()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the transaction state table offset wrong");
return response;
}
// 校验 CommitLog物理位置
if (msgExt.getCommitLogOffset() != requestHeader.getCommitLogOffset()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the commit log offset wrong");
return response;
}
// 生成消息
MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt);
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(msgExt.getStoreTimestamp());
if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
msgInner.setBody(null);
}
// 存储生成消息
final MessageStore messageStore = this.brokerController.getMessageStore();
final PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
// 处理存储结果
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
// Success
case PUT_OK:
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
break;
// Failed
case CREATE_MAPEDFILE_FAILED:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("create maped file failed.");
break;
case MESSAGE_ILLEGAL:
case PROPERTIES_SIZE_EXCEEDED:
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
break;
case SERVICE_NOT_AVAILABLE:
response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
response.setRemark("service not available now.");
break;
case OS_PAGECACHE_BUSY:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("OS page cache busy, please try another machine");
break;
case UNKNOWN_ERROR:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR");
break;
default:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR DEFAULT");
break;
}
return response;
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("find prepared transaction message failed");
return response;
}
return response;
}
Broker 生成 ConsumeQueue
事务消息,提交(COMMIT)后才生成 ConsumeQueue。
/**
* 执行调度请求
* 1. 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
* 2. 建立 索引信息 到 IndexFile
*
* @param req 调度请求
*/
public void doDispatch(DispatchRequest req) {
// 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: // 非事务消息
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // 事务消息COMMIT
DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE: // 事务消息PREPARED
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: // 事务消息ROLLBACK
break;
}
// 建立 索引信息 到 IndexFile
if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(req);
}
}
/**
* 建立 消息位置信息 到 ConsumeQueue
*
* @param topic 主题
* @param queueId 队列编号
* @param offset commitLog存储位置
* @param size 消息长度
* @param tagsCode 消息tagsCode
* @param storeTimestamp 存储时间
* @param logicOffset 队列位置
*/
public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
long logicOffset) {
ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
}
事务消息回查
在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
Broker发起回查
暂时不看
Producer 接收回查
Producer接收事务消息回查的代码在DefaultMQProducerImpl#checkTransactionState:
/**
* 检查【事务状态】状态
*
* @param addr broker地址
* @param msg 消息
* @param header 请求
*/
@Override
public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
@Override
public void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
if (transactionCheckListener != null) {
// 获取事务执行状态
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}
// 处理事务结果,提交消息 COMMIT / ROLLBACK
this.processTransactionState(//
localTransactionState, //
group, //
exception);
} else {
log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
/**
* 处理事务结果,提交消息 COMMIT / ROLLBACK
*
* @param localTransactionState 【本地事务】状态
* @param producerGroup producerGroup
* @param exception 检查【本地事务】状态发生的异常
*/
private void processTransactionState(//
final LocalTransactionState localTransactionState, //
final String producerGroup, //
final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);
// 设置消息编号
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
switch (localTransactionState) {
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
}
String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}
try {
// 提交消息 COMMIT / ROLLBACK
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};
// 提交执行
this.checkExecutor.submit(request);
}
该方法会调用发送事务消息时,创建的事务消息监听器,来查询本地事务的状态:
/**
* 【事务消息回查】检查监听器
*/
public interface TransactionCheckListener {
/**
* 获取(检查)【本地事务】状态
*
* @param msg 消息
* @return 事务状态
*/
LocalTransactionState checkLocalTransactionState(final MessageExt msg);
}
根据本地事务的状态执行Commit或者Rollback。
总结
本篇分析了RocketMQ关于事务消息的实现。
- Producer发送事务消息会先发送一个half消息,并注册一个事务消息回查监听器
- 本地事务提交后,Producer会发送half消息提交消息
- 本地事务回滚,Producer会发送half消息回滚消息
- 若因网路等问题,迟迟没有收到提交或回滚消息,Broker会发起事务消息反查
- Producer收到事务消息反查请求,执行创建事务消息时注册的监听器,查询本地消息的状态,根据本地消息的状态决定是Commit还是Rollback事务消息