以下面的一个demo例子来分析一下,探索RocketMQ事务消息原理。
public static final String PRODUCER_GROUP = "tran-test";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "Test";
public static void main(String[] args) throws Exception {
TransactionListener transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println(String.format("executeLocalTransaction: %s", msg.getTransactionId()));
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println(String.format("checkLocalTransaction: tranId=%s, commitLogOffset=%s, queueOffset=%s, msgId=%s",
msg.getTransactionId(), msg.getCommitLogOffset(),
msg.getQueueOffset(), msg.getMsgId()));
return LocalTransactionState.UNKNOW;
}
};
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
producer.setTransactionListener(transactionListener);
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.start();
Message msg = new Message(TOPIC, "test".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println(String.format("sendResult: tranId=%s, offsetMsgId=%s, queueOffset=%s msgId=%s",
sendResult.getTransactionId(), sendResult.getOffsetMsgId(),
sendResult.getQueueOffset(), sendResult.getMsgId()));
CountDownLatch countDownLatch = new CountDownLatch(1);
countDownLatch.await();
}
executeLocalTransaction: C0DE00428BEC18B4AAC27F377B6E0000
sendResult: tranId=C0DE00428BEC18B4AAC27F377B6E0000, offsetMsgId=null, queueOffset=82 msgId=C0DE00428BEC18B4AAC27F377B6E0000
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1315411, queueOffset=83, msgId=C0DE004200002A9F0000000000141253
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1315805, queueOffset=84, msgId=C0DE004200002A9F00000000001413DD
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316199, queueOffset=85, msgId=C0DE004200002A9F0000000000141567
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316593, queueOffset=86, msgId=C0DE004200002A9F00000000001416F1
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316987, queueOffset=87, msgId=C0DE004200002A9F000000000014187B
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1317381, queueOffset=88, msgId=C0DE004200002A9F0000000000141A05
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1317775, queueOffset=89, msgId=C0DE004200002A9F0000000000141B8F
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318169, queueOffset=90, msgId=C0DE004200002A9F0000000000141D19
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318563, queueOffset=91, msgId=C0DE004200002A9F0000000000141EA3
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318957, queueOffset=92, msgId=C0DE004200002A9F000000000014202D
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1319352, queueOffset=93, msgId=C0DE004200002A9F00000000001421B8
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1319747, queueOffset=94, msgId=C0DE004200002A9F0000000000142343
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320142, queueOffset=95, msgId=C0DE004200002A9F00000000001424CE
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320537, queueOffset=96, msgId=C0DE004200002A9F0000000000142659
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320932, queueOffset=97, msgId=C0DE004200002A9F00000000001427E4
通过上述例子的输出结果可以发现,checkLocalTransaction中queueOffset、msgId都发生的变化。那么在broker中到底发生了什么呢。
事务消息原理
当客户端发送一个事务消息时,MessageConst.PROPERTY_TRANSACTION_PREPARED="true" 标记这个消息是一个事务消息。
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
broker在收到消息时会取出traFlag,如果traFlag=true消息将交给TransactionalMessageService处理
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
if (Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
sendTransactionPrepareMessage = true;
}
long beginTimeMillis = this.brokerController.getMessageStore().now();
if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
CompletableFuture<PutMessageResult> asyncPutMessageFuture;
if (sendTransactionPrepareMessage) {
//处理事务消息
asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
TransactionalMessageService在保存消息时会将原来的topic使用RMQ_SYS_TRANS_HALF_TOPIC来替换,原topic信息存放在properties中。这样在是先把消息保存下来,而不让Consumer立刻就能收到。 当收到TransactionMQProducer发来的COMMIT_MESSAGE时,再将消息从RMQ_SYS_TRANS_HALF_TOPIC取出替换成原来的topic写入。同时再向RMQ_SYS_TRANS_OP_HALF_TOPIC的topic中也写一份。 broker通过对比RMQ_SYS_TRANS_OP_HALF_TOPIC和RMQ_SYS_TRANS_HALF_TOPIC中是否同时存在来判断事务消息是否结束了。 当收到的不是COMMIT_MESSAGE而是UNKNOW时,TransactionalMessageCheckService会定时回调TransactionMQProducer#checkLocalTransaction查询本地事务状态,默认最多检查15次。
TransactionalMessageCheckService TransactionalMessageCheckService是一个运行在broker中的一个线程,线程默认每1分钟执行一次来检测系统中超时的half事务消息并发起重试。
@Override
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
//每个half queue都有一个对应的op queue
MessageQueue opQueue = getOpQueue(messageQueue);
//获取当前未完成的half queue的offset
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
//获取当前已完成的op queue的offset
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}
......
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
long nextOpOffset = pullResult.getNextBeginOffset();
int putInQueueCount = 0;
int escapeFailCnt = 0;
while (true) {
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
if (removeMap.containsKey(i)) {
......
} else {
//从RMQ_SYS_TRANS_HALF_TOPIC取出half消息
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
......
......
//是否需要丢弃消息
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
......
//判断上次check是否超时
boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime
|| opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout
|| valueOfCurrentMinusBorn <= -1;
if (isNeedCheck) {
//超时
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
putInQueueCount++;
log.info("Check transaction. real_topic={},uniqKey={},offset={},commitLogOffset={}",
msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC),
msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
msgExt.getQueueOffset(), msgExt.getCommitLogOffset());
//重新给TransactionListener发起check请求
listener.resolveHalfMsg(msgExt);
......
......
......
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
上述代码中有三个比较重要的细节,needDiscard、putBackHalfMsgQueue和listener.resolveHalfMsg。 needDiscard:从half queue取出来后判断消息的TRANSACTION_CHECK_TIMES属性是否大于15次。 小于15次,则TRANSACTION_CHECK_TIMES属性值+1。 大于15次,则从RMQ_SYS_TRANS_HALF_TOPIC中丢弃,通过listener.resolveDiscardMsg保存在TRANS_CHECK_MAX_TIME_TOPIC中交由人工处理。 putBackHalfMsgQueue:将消息重新插入一份到RMQ_SYS_TRANS_HALF_TOPIC,因为CommitLog的applyOnly特性不能修改原消息。所以需要重新apply消息导致queueOffset、commitLogOffset、msgId都发生了变化。
private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {
PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
msgExt.setQueueOffset(
putMessageResult.getAppendMessageResult().getLogicsOffset());
msgExt.setCommitLogOffset(
putMessageResult.getAppendMessageResult().getWroteOffset());
msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
log.debug(
"Send check message, the offset={} restored in queueOffset={} "
+ "commitLogOffset={} "
+ "newMsgId={} realMsgId={} topic={}",
offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(),
msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
msgExt.getTopic());
return true;
listener.resolveHalfMsg:通过回调resolveHalfMsg方法向TransactionMQProducer重发check。
public void resolveHalfMsg(final MessageExt msgExt) {
if (executorService != null) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
} else {
LOGGER.error("TransactionalMessageCheckListener not init");
}
}
public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
checkTransactionStateRequestHeader.setBname(brokerController.getBrokerConfig().getBrokerName());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
if (channel != null) {
//取出与broker相连的netty channel发送check消息
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}