RocketMQ事务消息源码解析

news2024/11/16 23:51:06

RocketMQ提供了事务消息的功能,采用2PC(两阶段协议)+补偿机制(事务回查)的分布式事务功能,通过这种方式能达到分布式事务的最终一致。

一. 概述

  • 半事务消息:指的是发送至broker但是还没被commit的消息,在半事务消息被确认之前都是无法被消费者消费的。
  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,broker 通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(commit 或是 rollback),该询问过程即消息回查。

二. 交互流程

img

事务消息发送步骤如下:

  1. 发送方将半事务消息发送至broker。

  2. broker将消息持久化成功之后,向发送方返回 Ack,确认消息已经发送成功,此时消息为半事务消息。

  3. 发送方开始执行本地事务逻辑。

  4. 发送方根据本地事务执行结果向服务端提交二次确认(commit 或是 rollback),服务端收到 commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 rollback 状态则“删除”半事务消息,订阅方将不会接受该消息。

  5. 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。

  6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。

三. 示例代码

public static void main(String[] args) throws MQClientException, InterruptedException {
    TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });

    producer.setExecutorService(executorService);
    producer.setTransactionListener(new TransactionListener() {
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // 执行业务代码 ....

            // 最终返回业务代码的事务状态
            return LocalTransactionState.COMMIT_MESSAGE;
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            // 回查本地事务状态
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    });
    producer.start();

    String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
    for (int i = 0; i < 10; i++) {
        try {
            Message msg =
                new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf("%s%n", sendResult);

            Thread.sleep(10);
        } catch (MQClientException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    for (int i = 0; i < 100000; i++) {
        Thread.sleep(1000);
    }
    producer.shutdown();
}

TransactionMQProducer 支持事务消息的生产者,继承自 DefaultMQProducer,在默认生产者上进行了扩展,支持发送事务消息。它拥有一个线程池 executorService 用来异步执行本地事务和回查事务,还需要注册 TransactionListener 事务监听器,里面包含了执行本地事务和回查事务的逻辑。

三. 源码分析

整个事务消息的处理流程,可以分为以下五个步骤:

  1. Half消息发送
  2. Half消息存储
  3. 提交事务状态
  4. 处理事务状态
  5. 事务回查

3.1 Half消息发送

除事务回查外,事务消息的时序图大致如下:

TransactionMQProducer 是RocketMQ提供的,支持发送事务消息的生产者,它继承自 DefaultMQProducer,也是一个外观类,代码非常的简单,核心逻辑依然在 DefaultMQProducerImpl,属性如下:

public class TransactionMQProducer extends DefaultMQProducer {
    // 事务回查监听
    private TransactionCheckListener transactionCheckListener;
    // 回查线程池最小线程数
    private int checkThreadPoolMinSize = 1;
    // 回查线程池最大线程数
    private int checkThreadPoolMaxSize = 1;
    // 最大回查请求数,阻塞队列容量
    private int checkRequestHoldMax = 2000;
    // 执行本地事务/事务回查的线程池
    private ExecutorService executorService;
    // 事务监听器:本地事务、事务回查逻辑
    private TransactionListener transactionListener;
}

启动 TransactionMQProducer,必须先注册 TransactionListener,实现本地事务的执行逻辑和事务回查逻辑,Producer在发送Half消息成功后会自动执行executeLocalTransaction,在Broker请求事务回查时自动执行checkLocalTransaction

然后,Producer就可以启动了,在启动默认Producer之前,会对checkExecutorcheckRequestQueue进行初始化,如果没有设置线程池会自动创建。

public void initTransactionEnv() {
    TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
    if (producer.getExecutorService() != null) {
        this.checkExecutor = producer.getExecutorService();
    } else {
        // 事务回查请求队列
        this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
        // 事务回查线程池
        this.checkExecutor = new ThreadPoolExecutor(
            producer.getCheckThreadPoolMinSize(),
            producer.getCheckThreadPoolMaxSize(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.checkRequestQueue);
    }
}

初始化完成以后,就是Producer的正常启动逻辑,这里不再赘述。

发送事务消息对应的方法是sendMessageInTransaction

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    // 判断检查本地事务Listener是否存在
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }

    // ignore DelayTimeLevel parameter
    if (msg.getDelayTimeLevel() != 0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    // 检查消息内容
    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    // 标识当前消息为事务消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        // 发送消息
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                if (null != localTransactionExecuter) {
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) {
                    // 发送消息成功,执行本地事务
                    log.debug("Used new transaction API");
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

    try {
        // 执行endTransaction方法,如果半消息发送失败或本地事务执行失败告诉服务端是删除半消息,
        // 半消息发送成功且本地事务执行成功则告诉broker提交半消息
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

   // .......
}

它主要做了以下事情:

  1. 设置属性TRAN_MSG=true
  2. 同步发送Half消息
  3. 消息发送成功,执行本地事务
  4. 提交本地事务状态

Tips:事务消息不支持延时,会在发送前,自动忽略延迟级别。

if (msg.getDelayTimeLevel() != 0) {
    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}

Broker判断是否是事务消息的依据是通过Properties的TRAN_MSG属性判断的,Producer在发送消息前会进行设置。

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");

消息属性设置完毕,调用send方法同步发送给Broker,并获取发送状态。

SendResult sendResult = this.send(msg);	

3.2 Half消息存储

Half消息发送到Broker,Broker要负责存储,且此时消息对Consumer是不可见的,看看它是如何处理的。

SendMessageProcessor 会对接收到的消息进行判断,如果是事务消息,会转交给TransactionalMessageService 处理,普通消息直接转交给 MessageStore 处理。

// org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage

// 是否是事务消息?通过TRAN_MSG属性判断
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
    // 事务消息的处理
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
            + "] sending transaction message is forbidden");
        return CompletableFuture.completedFuture(response);
    }
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
}

TransactionalMessageService 使用了桥接模式,大部分操作会交给桥接类 TransactionalMessageBridge 执行。在处理Half消息时,为了不让Consumer可见,会像处理延迟消息一样,改写Topic和queueId,将消息统一扔到RMQ_SYS_TRANS_HALF_TOPIC这个Topic下,默认的queueId为0。同时,为了后续消息Commit时重新写入正常消息,必须将真实的Topic和queueId等属性先保留到Properties中。

// org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    // 真实的Topic和queueId存储到Properties
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
                                String.valueOf(msgInner.getQueueId()));
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    // 改写Topic为:RMQ_SYS_TRANS_HALF_TOPIC
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

消息的Topic被改写后,正常写入CommitLog,但不会对Consumer可见。

3.3 提交事务状态

Broker将消息写入CommitLog后,会返回结果SendResult,如果发送成功,Producer开始执行本地事务:

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
if (sendResult.getTransactionId() != null) {
    // 设置事务ID
    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
    msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
    // 老的本地事务处理,已经被废弃
    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
    // 执行本地事务,获取事务状态
    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
    localTransactionState = LocalTransactionState.UNKNOW;
}

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
    log.info("executeLocalTransactionBranch return {}", localTransactionState);
    log.info(msg.toString());
}

将本地事务执行状态 LocalTransactionState 提交到 Broker,方法是endTransaction。先根据 MessageQueue 找到Broker的主机地址,然后构建提交事务请求头 EndTransactionRequestHeader 并设置相关属性,请求头属性如下:

public class EndTransactionRequestHeader implements CommandCustomHeader {
    // 生产者组
    private String producerGroup;
    // ConsumeQueue Offset
    private Long tranStateTableOffset;
    // 消息所在CommitLog偏移量
    private Long commitLogOffset;
    // 事务状态
    private Integer commitOrRollback;
    // 是否Broker发起的回查?
    private Boolean fromTransactionCheck = false;
    // 消息ID
    private String msgId;
    // 事务ID
    private String transactionId;
}

事务状态 commitOrRollback 用数字表示,8代表Commit、12代表Rollback、0代表未知状态。请求头构建好以后,通过Netty发送数据包给Broker,对应的RequestCode为END_TRANSACTION

public void endTransaction(
    final Message msg,
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    // 解析MessageId,内含消息Offset
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    // 获取MessageQueue所在Broker的Master主机地址
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    // 创建请求头
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    // 设置事务ID和偏移量
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    // 设置事务状态
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }
    // 执行钩子函数
    doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    // 发送请求
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
                                                                   this.defaultMQProducer.getSendMsgTimeout());
}

3.4 处理事务状态

Broker通过 EndTransactionProcessor 类来处理 Producer 提交的事务请求,首先做校验,确保是Master处理该请求,因为Slave是没有写权限的

// org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
    response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
    LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
    return response;
}

然后,解析请求头,获取事务状态,如果是commit操作,则根据请求头里的 CommitLogOffset 读取出完整的消息,从 Properties 中恢复消息真实的 TopicqueueId 等属性,再调用 sendFinalMessage 方法将消息重新写入 CommitLog,稍后构建好 ConsumeQueue 消息对 Consumer 就可见了,最后调用deletePrepareMessage方法删除Half消息。

if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    // 提交事务消息
    result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 创建新的Message,恢复真实的Topic、queueId等属性,重新写入CommitLog
            MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
            msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
            RemotingCommand sendResult = sendFinalMessage(msgInner);
            if (sendResult.getCode() == ResponseCode.SUCCESS) {
                /**
                         * 事务消息提交,删除Half消息
                         * Half消息不会真的被删除,通过写入Op消息来标记它被处理。
                         */
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return sendResult;
        }
        return res;
    }
}

如果是Rollback,处理就更加简单了,因为消息本来就对Consumer是不可见的,只需要删除Half消息即可。

else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
    // 回滚事务消息,事实上没做任何处理
    result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 写入Op消息,代表Half消息被处理
            this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
        }
        return res;
    }
}

实际上,Half消息并不会删除,因为CommitLog是顺序写的,不可能删除单个消息。「删除Half消息」仅仅是给该消息打上一个标记,代表它的最终状态已知,不需要再回查了。RocketMQ通过引入Op消息来给Half消息打标记,Half消息状态确认后,会写入一条消息到Op队列,对应的Topic为MQ_SYS_TRANS_OP_HALF_TOPIC,反之Op队列中不存在的,就是状态未确认,需要回查的Half消息。

3.5 事务回查

Broker端发起消息事务回查的时序图如下:

Half消息写入成功,可能因为种种原因没有收到Producer的事务状态提交请求。此时,Broker会主动发起事务回查请求给Producer,以决定最终将消息Commit还是Rollback。

Half消息最终状态有没有被确认,是通过Op队列里的消息判断的。Broker服务启动时,会开启TransactionalMessageCheckService线程,每隔60秒进行一次消息回查。为了避免消息被无限次的回查,RocketMQ通过transactionCheckMax属性设置消息回查的最大次数,默认是15次。

// org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd
protected void onWaitEnd() {
    // 回查超时
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    // 回查最大次数
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    // 开始回查
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
}

回查Half消息时,首先要获取Half主题下的所有消息队列。

// 获取RMQ_SYS_TRANS_HALF_TOPIC下所有队列
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
    log.warn("The queue of topic is empty :" + topic);
    return;
}

然后遍历所有的MessageQueue,按个处理所有队列里的待回查的消息。怎么判断消息需要回查呢?前面说过了,通过Op队列判断,因此还需要定位到HalfQueue对应的OpQueue,以及它们的ConsumeQueue偏移量。

// 获取对应的 Op队列
MessageQueue opQueue = getOpQueue(messageQueue);
// 获取ConsumeQueue Offset
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);

然后,从CommitLog读取出完整的消息。

// 根据offset去CommitLog读取出消息
private GetResult getHalfMsg(MessageQueue messageQueue, long offset) {
    GetResult getResult = new GetResult();

    PullResult result = pullHalfMsg(messageQueue, offset, PULL_MSG_RETRY_NUMBER);
    getResult.setPullResult(result);
    List<MessageExt> messageExts = result.getMsgFoundList();
    if (messageExts == null) {
        return getResult;
    }
    getResult.setMsg(messageExts.get(0));
    return getResult;
}

判断回查次数是否已达上限,如果是的话,就统一扔到TRANS_CHECK_MAX_TIME_TOPIC下。

if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
    // 回查次数超过15,丢弃消息,扔到TRANS_CHECK_MAX_TIME_TOPIC
    listener.resolveDiscardMsg(msgExt);
    newOffset = i + 1;
    i++;
    continue;
}

如果判断消息确实需要回查,会调用AbstractTransactionalMessageCheckListener的sendCheckMessage方法,恢复消息真实的Topic、queueId等属性,然后发回给Producer进行事务的回查确认。

// 发送事务回查消息给Producer
public void sendCheckMessage(MessageExt msgExt) throws Exception {
    CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
    checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
    checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
    checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
    checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
    msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
    msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
    msgExt.setStoreSize(0);
    // 获取消息生产的GroupId
    String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
    // 轮询出一台Producer实例
    Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
    if (channel != null) {
        // 发送回查请求
        brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
    } else {
        LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
    }
}

Broker将回查请求发送给Producer后,Producer会执行checkLocalTransaction方法检查本地事务,然后将事务状态再发送给Broker,重复上述流程。

四. 总结

RocketMQ实现事务消息的原理和实现延迟消息的原理类似,都是通过改写Topic和queueId,暂时将消息先写入一个对Consumer不可见的队列中,然后等待Producer执行本地事务,提交事务状态后再决定将Half消息Commit或者Rollback。同时,可能因为服务宕机或网络抖动等原因,Broker没有收到Producer的事务状态提交请求,为了对二阶段进行补偿,Broker会主动对未确认的Half消息进行事务回查,判断消息的最终状态是否确认,是通过Op队列实现的,Half消息一旦确认事务状态,就会往Op队列中写入一条消息,消息内容是Half消息所在ConsumeQueue的偏移量。

本文参考转载至:

【RocketMQ】事务消息实现原理分析 - 掘金 (juejin.cn)

RocketMq之事务消息实现原理 - 掘金 (juejin.cn)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1276857.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux】24、文件系统、磁盘 IO

文章目录 一、文件系统1.1 索引节点和目录项1.2 虚拟文件系统 VFS1.3 文件系统 I/O1.5 性能观测1.5.1 容量1.5.2 缓存1.5.3 find 命令的缓存 二、磁盘 I/O2.1 通用块层2.2 I/O 栈2.3 磁盘性能指标2.3.1 磁盘 I/O 观测2.3.2 进程 I/O 观测 2.4 案例&#xff1a;找到打大量日志的…

UiPath学习笔记

文章目录 前言RPA介绍UiPath下载安装组件内容 前言 最近有一个项目的采集调研涉及到了客户端的采集&#xff0c;就取了解了一下RPA和UIPATH&#xff0c;记录一下 RPA介绍 RPA&#xff08;Robotic Process Automation&#xff1a;机器人处理自动化&#xff09;&#xff0c;是…

聊聊什么是IO流

目录 Java IOIO 基础Java IO 流了解吗&#xff1f; IO 设计模式1、装饰器模式2、适配器模式适配器模式和装饰器模式有什么区别呢&#xff1f;3、工厂模式4、观察者模式 IO 模型有哪些常见的 IO 模型&#xff1f;BIO(Blocking I/O)NIO (Non-blocking/New I/O)AIO (Asynchronous …

Java包(package)

1、概念 为了更好的组织类&#xff0c;用于区别类名的命名空间&#xff0c;其实就是基于工程的一个文件路径&#xff0c;如&#xff1a; 2、作用 三个作用&#xff1a; 1&#xff09;区分相同名称的类。 2&#xff09;能够较好地管理大量的类。 3&#xff09;控制访问范围。 在…

网站实现验证码功能

一、验证码 一般来说&#xff0c;网站在登录的时候会生成一个验证码来验证是否是人类还是爬虫&#xff0c;还有一个好处是防止恶意人士对密码进行爆破。 二、流程图 三、详细说明 3.1 后端生成验证码 Override public Result<Map<String, String>> getVerifica…

国内哪个超声波清洗机品牌比较好?质量好超声波清洗机总结

近年来超声波清洗机可以说是非常火爆&#xff0c;可以清洗化妆刷、眼镜、牙套等等一些小物件&#xff0c;大物件物品可以入手大型超声波清洗机&#xff0c;总之现在超声波清洗机已经衍生到可以在家使用&#xff0c;不再是在眼镜店看到它的身影或者是一些工业领域上&#xff0c;…

第二节:服务拆分(案例)

一、服务拆分注意事项 1.1 拆分原则 每个微服务&#xff0c;不要重复开发相同业务&#xff08;例如在单体项目中用到了一个查询&#xff0c;这个查询功能能够查询出订单信息、商品信息、用户信息&#xff0c;那么在拆分微服务时就不要将其写在一起了&#xff0c;订单的微服务只…

1、RocketMQ源码分析(一)

RocketMQ简单介绍 RabbitMQ的底层是使用erlang语言编写的&#xff0c;不便分析其底层&#xff0c;RocketMQ作为原阿里下经历阿里双十一严格考验的中间件&#xff0c;同时也是使用我们熟悉的java语言编写&#xff0c;我们先把入门的基础必备了解透&#xff0c;然后在去分析源码…

基于WebSocket实现客户聊天室

目录 一、实现聊天室原理 二、聊天室前端代码 三、聊天室后端代码&#xff08;重点&#xff09; 四、聊天室实现效果展示 一、实现聊天室原理 1.1 介绍websocket协议 websocket是一种通信协议&#xff0c;再通过websocket实现弹幕聊天室时候&#xff0c;实现原理是客户端首…

使用K-means把人群分类

1.前言 K-mean 是无监督的聚类算法 算法分类&#xff1a; 2.实现步骤 1.数据加工&#xff1a;把数据转为全数字&#xff08;比如性别男女&#xff0c;转换为0 和 1&#xff09; 2.模型训练 fit 3.预测 3.代码 原数据类似这样(source&#xff1a;http:img-blog.csdnimg.cn…

vmware 安装 AlmaLinux OS 8.6

选择系统镜像 选择镜像 选择安装位置和修改名称 可以自定义硬件&#xff0c;也可以不选择&#xff0c;后面可以再设置 自定义硬件可以设置内存和cpu等信息 安装虚拟机系统 密码如果简单的话需要点击两次done 才能保存

集成开发环境PyCharm的使用【侯小啾python领航计划系列(三)】

集成开发环境 PyCharm 的使用【侯小啾python领航计划系列(三)】 大家好,我是博主侯小啾, 🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ�…

Stable Diffusion AI绘画系列【10】:AI眼中的美丽清晨

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

【数电笔记】卡诺图绘制(逻辑函数的卡诺图化简)

目录 说明&#xff1a; 最小项卡诺图的组成 1. 相邻最小项 2. 卡诺图的组成 2.1 二变量卡诺图 2.2 三表变量卡诺图 2.3 四变量卡诺图 3. 卡诺图中的相邻项&#xff08;几何相邻&#xff09; 说明&#xff1a; 笔记配套视频来源&#xff1a;B站 最小项卡诺图的组成 1. …

算法通关村第十四关-青铜挑战认识堆

大家好我是苏麟 , 今天带大家认识认识堆 . 堆 堆是将一组数据按照完全二叉树的存储顺序&#xff0c;将数据存储在一个一维数组中的结构。 堆有两种结构&#xff0c;一种称为大顶堆&#xff0c;一种称为小顶堆 : 大顶堆 大顶堆的任何一个父节点的值&#xff0c;都大于或等于…

nginx设置用户密码

1.官网 https://nginx.org/en/docs/http/ngx_http_auth_basic_module.html2.语法 3.创建密码 [rootlocalhost ~]# yum install httpd-tools -y4.创建密码文件 完毕&#xff01; [rootlocalhost ~]# htpasswd -b -c /etc/nginx/auth-passwd xp xp666-c 创建passwdfile &#…

免费使用优彩云采集器,3分钟学会优彩云采集器使用【2023最新】

如何高效地采集并聚合原创内容成为了一项关键任务&#xff1f;在这个背景下&#xff0c;本文将深入研究优彩云采集和147SEO采集&#xff0c;实现原创文章采集。 147SEO采集器 对于许多从业者而言&#xff0c;147SEO采集并不陌生。作为一款专注于原创内容采集的工具&#xff0c…

avue页面布局 api 引用

展示 index.vue <template><basic-container><avue-crud :option"option":table-loading"loading":data"data":page"page":permission"permissionList":search.sync"search":before-closebefore…

【LeeCode】24. 两两交换链表中的节点

给你一个链表&#xff0c;两两交换其中相邻的节点&#xff0c;并返回交换后链表的头节点。你必须在不修改节点内部的值的情况下完成本题&#xff08;即&#xff0c;只能进行节点交换&#xff09;。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4] 输出&#xff1a;[2,1,4…

【LeeCode】19.删除链表的倒数第N个节点

给你一个链表&#xff0c;删除链表的倒数第 n 个结点&#xff0c;并且返回链表的头结点。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], n 2 输出&#xff1a;[1,2,3,5] 示例 2&#xff1a; 输入&#xff1a;head [1], n 1 输出&#xff1a;[] 示例 3&#xf…