RocketMQ底层源码解——事务消息的实现

news2024/9/28 23:37:15

1. 简介

RocketMQ自身实现了事务消息,可以通过这个机制来实现一些对数据一致性有强需求的场景,保证上下游数据的一致性。
在这里插入图片描述

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。
  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
  • 积分系统状态变更:变更用户积分,更新用户积分表。
  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

引入RocketMQ之后保证上下游数据的一致性。
在这里插入图片描述
使用普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。 而基于 RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。

整个事务消息的详细交互流程如下图所示:
在这里插入图片描述
事务消息详细步骤:

  1. 生产者将半事务消息发送至 RocketMQ Broker。
  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  6. 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置。

2. 实战

分别启动:namesrv、broker

producer示例代码:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
		producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }

    static class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);

        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        return LocalTransactionState.COMMIT_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
}

消费者示例代码:

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

       
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setNamesrvAddr("127.0.0.1:9876");

       
        consumer.subscribe("TopicTest1234", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
   
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

将producer和consumer的topic都设置为:TopicTest1234。然后再启动consumer和producer。可以看到TransactionProducer控制台输出日志:
在这里插入图片描述
Consumer控制台输出日志:
在这里插入图片描述

3. 原理分析

分别启动namesrv和broker服务,随后运行:org.apache.rocketmq.example.transaction.TransactionProducer.java,构建出一个TransactionListenerImpl对象之后,添加到TransactionProducer中,为半消息的发送以及本地事务校验做做准备。

核心入口:TransactionProducer#sendMessageInTransaction()

    @Override
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException {
        if (null == this.transactionListener) {
            throw new MQClientException("TransactionListener is null", null);
        }

        // 包装Topic,判断是否是重试Topic以及DLQ的Topic
        msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
        // 发送事务消息
        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
    }

DefaultMQProducerImpl#sendMessageInTransaction()

    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        // 获取事务监听器,就程序开头的时候传入的TransactionListenerImpl对象
        TransactionListener transactionListener = getCheckListener();
        // 新版本推荐事务监听器,已不使用localTransactionExecuter
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }

        // 设置DelayTimeLevel 参数
        if (msg.getDelayTimeLevel() != 0) {
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        }

        // 消息校验
        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        // 设置属性:TRAN_MSG为true
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        // 设置属性:PGROUP为producerGroup
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
        	// ① 发送半消息
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    // 事务ID不为空
                    if (sendResult.getTransactionId() != null) {
                        // 设置事务id
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    if (null != localTransactionExecuter) {
                    	// 执行本地事务检查
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                    	// ② 执行本地事务检查                        
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

        try {
        	// ③ 调用endTransaction判断是否结束事务
            this.endTransaction(msg, sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

		// 构建事务消息发送结果
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }

在该方法中核心步骤分别位于①②③标识处。

①:调用链路为最终抵达:DefaultMQProducerImpl#sendDefaultImpl(),也就是producer发消息核心方法。
在sendDefaultImpl()中有一处比较关键的代码:

				// 事务
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }

此处代码给这条消息标识上事务消息的标志,最后通过mQClientAPIImpl往broker发送一条RPC的请求。

Broker端接受到请求之后经过请求code的分发,这条请求将由SendMessageProcessor#asyncSendMessage进行处理。

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
		// 相关代码省略...
		
        // 事务消息标识,从请求头中获取
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (Boolean.parseBoolean(transFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            // 事务消息处理
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            // 将消息内容存储到CommitLog文件
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    }

asyncSendMessage方法处会从消息属性中获取到transFlag标识,判断到这条消息是一条半消息,然后调用this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner)。

方法调用链最终来到TrasactionalMessageBridge#asyncPutHalfMessage(),

    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
        return store.asyncPutMessage(parseHalfMessageInner(messageInner));
    }
   /**
     * 消息事务半消息
     * @param msgInner
     * @return
     */
    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        // 设置真正要发送的Topic
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        // 设置真正要发送的queueId
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        // 设置半消息Topic
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        // 设置半消息queueId
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }

parseHalfMessageInner方法做了两件事,将这条半消息真正要发送的Topic以及queueId另外存起来,然后将这条半消息要发送到指定存储半消息的的RMQ_SYS_TRANS_HALF_TOPIC中,然后将queueId设置为0。(存储半消息的Topic只有1个queueId)
最后将这条半消息存入到CommitLog中,则步骤①执行完成。

② 步骤①发送半消息到CommitLog中存储之后,此时消息发送结果为SEND_OK,接着将调用TransactionListener#executeLocalTransaction()方法,检查本地事务的状态,此处由开发者自行实现代码逻辑。

③ 检查本地事务结束之后,会调用endTransaction()方法来尝试结束此次的事务消息。

public void endTransaction(
        final Message msg,
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        // 获取消息事务id
        String transactionId = sendResult.getTransactionId();
        // 获取broker地址
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        // 设置消息comimtlog偏移量
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
            case COMMIT_MESSAGE:	// 提交消息
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:	// 回滚消息
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:	// 未知状态
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        // 往broker发送endTransactionOneway请求
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }

broker端EndTransactionProcessor#processRequest接收到endTransaction请求:

@Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final EndTransactionRequestHeader requestHeader =
            (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
        LOGGER.debug("Transaction request:{}", requestHeader);
        if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
            return response;
        }
		// 判断是事务check类型
        if (requestHeader.getFromTransactionCheck()) {
            switch (requestHeader.getCommitOrRollback()) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    return null;
                }

                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());

                    break;
                }

                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    break;
                }
                default:
                    return null;
            }
        } else {
            switch (requestHeader.getCommitOrRollback()) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    return null;
                }

                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    break;
                }

                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    break;
                }
                default:
                    return null;
            }
        }
        OperationResult result = new OperationResult();
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
            // 这里提交requestHeader,实际是从commitlog中获取halfMessage
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                // 检查halfMessage状态
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                	// ④ 半消息处理成功之后,调用endMessageTransaction()发起事务消息结束
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    // 清楚事务标识
                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    // ⑤ 发送真正的完整的消息
                    RemotingCommand sendResult = sendFinalMessage(msgInner);
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                    	// ⑦ 删除半消息
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                return res;
            }
        } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
            // ⑥ 发起消息回滚,获取要回滚的半消息
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                // 查询半消息
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    // ⑦  删除半消息
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return res;
            }
        }
        response.setCode(result.getResponseCode());
        response.setRemark(result.getResponseRemark());
        return response;
    }

④ 调用endMessageTransaction

private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        // 取出真正要发送是Topic
        msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        // 取出真正要发送的queueid
        msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(msgExt.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
        msgInner.setWaitStoreMsgOK(false);
        msgInner.setTransactionId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        msgInner.setSysFlag(msgExt.getSysFlag());
        TopicFilterType topicFilterType =
            (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
                : TopicFilterType.SINGLE_TAG;
        long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
        msgInner.setTagsCode(tagsCodeValue);
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
        return msgInner;
    }

该方法将这条半消息设置成真正发送的Topic以及queueId。

⑤ 调用EndTransactionProcessor#sendFinalMessage(),发送真正的消息。最终调用MessageStore#putMessage()将该条消息存入CommitLog中。

⑥ 调用TransactionalMessageServiceImpl#rollbackMessage()对半消息进行回滚,但该方法实则是读取到半消息,然后再检查这条半消息。

⑦ 发送真正的消息或者是回滚半消息成功之后,随后调用TransactionalMessageServiceImpl#deletePrepareMessage()删除半消息。

经过①~⑦的步骤,整个RocketMQ的事务消息流程也就结束了,但是这里有一个关键点还没有讲解,当半消息发送成功了,本地事务执行成功发送本地事务状态时发生了broker断电或者是本地事务状态没有发送成功时,该如何保证整个流程能够正常运行呢?答案就是broker端会在启动时启动一个定时任务区检查本地事务的状态,也就是方法:TransactionListener#checkLocalTransaction()。

也就是Broker端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。此外,需要注意的是事务消息的生产组名称 ProducerGroupName不能随意设置。事务消息有回查机制,回查时Broker端如果发现原始生产者已经崩溃,则会联系同一生产者组的其他生产者实例回查本地事务执行情况以Commit或Rollback半事务消息。

参考

  • RocketMQ官网-事务消息发送

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/339417.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux高级命令之压缩和解压缩命令

压缩和解压缩命令学习目标能够使用tar命令完成文件的压缩和解压缩1. 压缩格式的介绍Linux默认支持的压缩格式:.gz.bz2.zip说明:.gz和.bz2的压缩包需要使用tar命令来压缩和解压缩.zip的压缩包需要使用zip命令来压缩&#xff0c;使用unzip命令来解压缩压缩目的:节省磁盘空间2. ta…

如何在VMware虚拟机上安装运行Mac OS系统(详细图文教程)

一、安装前准备 虚拟机运行软件&#xff1a;VMware Workstation Pro&#xff0c;版本&#xff1a;16.0.0 。VMware Mac OS支持套件&#xff1a;Unlocker。Mac OS系统镜像。 如果VMware 在没有安装Unlocker的情况下启动&#xff0c;在选择客户机操作系统时没有支持Mac OS的选项…

Mock.js初步使用(浏览器端)

Mock.js&#xff1a;生成随机数据&#xff0c;拦截 Ajax 请求。官方地址&#xff1a;http://mockjs.com/第一个demodemo.html<!DOCTYPE html> <html> <head><meta charset"utf-8"><title>mockjs demo</title> </head> <…

STM32单片机OLED显示

OLED接口电路STM32单片机OLED显示程序源代码#include "sys.h"#define OLED_RST_Clr() PCout(13)0 //RST#define OLED_RST_Set() PCout(13)1 //RST#define OLED_RS_Clr() PBout(4)0 //DC#define OLED_RS_Set() PBout(4)1 //DC#define OLED_SCLK_Clr()PCout(15)0 //SCL…

详解Python文件pyinstaller打包

本文python文件打包用到的是pyinstaller库并且以如下格式的文件为例 其中bird.py用到了images文件夹当中的png pyinstaller有两种打包方式: 方法1:文件夹模式 onedir 在终端用命令 pyinstaller -D flappybird.py执行完后文件格式如下 可以看到多了.idea,pycache,build,dis…

Linux系列 备份与分享文档

作者简介&#xff1a;一名在校云计算网络运维学生、每天分享网络运维的学习经验、和学习笔记。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​​ 目录 前言 一.备份与分享文档 1.使用压缩和解压缩工具 &#xff08;1&…

Java零基础教程——数据类型

目录数据类型数据类型的分类运算符算术运算符符号做连接符的识别自增、自减运算符赋值运算符关系运算符逻辑运算符短路逻辑运算符三元运算符运算符优先级数据类型 数据类型的分类 引用数据类型&#xff08;除基本数据类型之外的&#xff0c;如String &#xff09; 基本数据类…

【STM32】【HAL库】遥控关灯2 分机

相关连接 【STM32】【HAL库】遥控关灯0 概述 【STM32】【HAL库】遥控关灯1主机 【STM32】【HAL库】遥控关灯2 分机 【STM32】【HAL库】遥控关灯3 遥控器 需求 接收RF433和红外信号,根据信号内容控制舵机 硬件设计 主控采用stm32F103c6 STM32 433接收 其他接口 软件设计 接…

[SSD固态硬盘技术 14] GC垃圾回收太重要了

今天介绍臭名昭著的垃圾收集 过程(或“GC”),maybe 这是对JAVA 工程师而言。当遇到GC导致速度降低时候, 他们真的想跳脚。 我想到我的小孩打疫苗,哭的哇哇叫, 在他的眼里疫苗应该也是讨厌的吧, 但事实真的如此吗? 但首先,让我们考虑一下如果根本没有 GC,闪存系统会发…

【Shell1】shell语法,ssh/build/scp/upgrade,环境变量,自动升级bmc,bmc_wtd,

文章目录1.shell语法&#xff1a;Shell是用C语言编写的程序&#xff0c;它是用户使用Linux的桥梁&#xff0c;硬件>内核(os)>shell>文件系统1.1 变量&#xff1a;readonly定义只读变量&#xff0c;unset删除变量1.2 函数&#xff1a;shell脚本传递的参数中包含空格&am…

微信小程序 学生选课系统--nodejs+vue

系统分为学生和管理员&#xff0c;教师三个角色 学生小程序端的主要功能有&#xff1a; 1.用户注册和登陆系统 2.查看选课介绍信息 3.查看查看课程分类 4.查看课程详情&#xff0c;在线选课&#xff0c;提交选课信息 5.在线搜索课程信息 6.用户个人中心修改个人资料 7.用户查看…

数据中心的 TCP-Delay ACK 与 RTO, RACK

TCP 对 RTO 有个最小值限制&#xff0c;一般限制为 MIN_RTO 200ms。之所以有这个限制&#xff0c;在于要适应 Delay ACK&#xff0c;而 Delay ACK 的意义&#xff0c;不多说&#xff0c;摘自 RFC1122&#xff1a; MIN_RTO 应该足够大&#xff0c;以覆盖 Delay ACK 的影响&…

【Day6】合并两个排序链表与合并k个已排序的链表,java代码实现

前言&#xff1a; 大家好&#xff0c;我是良辰丫&#x1f680;&#x1f680;&#x1f680;&#xff0c;今天与大家一起做两道牛客网的链表题&#xff0c;好久写关于链表题的博客了&#xff0c;这两道题可以帮大家巩固一下链表知识&#xff0c;我把两道题的链接放到下面&#xf…

【C++之容器篇】造轮子:模拟实现vector类

目录前言一、项目结构1. vector的简介2. 项目结构二、vector的底层结构三、默认成员函数1. 构造函数(1)无参构造函数2. 拷贝构造函数3. 析构函数4. 赋值运算符重载函数四、迭代器1. 普通对象的正向迭代器2. const 对象的正向迭代器五、容量接口1. size()2. capacity()3. reserv…

分布式-分布式理论笔记

分布式系统的特点 分布式系统技术就是用来解决集中式架构的性能瓶颈问题&#xff0c;来适应快速发展的业务规模&#xff0c;是建立在网络之上的硬件或者软件系统&#xff0c;彼此之间通过消息等方式进行通信和协调。 特点 具有可扩展性&#xff0c;不出现单点故障、服务或者…

Hadoop之——WordCount案例与执行本地jar包

目录 一、WordCount代码 (一)WordCount简介 1.wordcount.txt (二)WordCount的java代码 1.WordCountMapper 2.WordCountReduce 3.WordCountDriver (三)IDEA运行结果 (四)Hadoop运行wordcount 1.在HDFS上新建一个文件目录 2.新建一个文件&#xff0c;并上传至该目录下…

商品秒杀接口压测及优化

目录一、生成测试用户二、jmeter压测三、秒杀接口优化1、优化第一步&#xff1a;解决超卖2、优化第二步&#xff1a;Redis重复抢购3、优化第三步&#xff1a;Redis预减库存①商品初始化②预减库存一、生成测试用户 将UserUtils工具类导入到zmall-user模块中&#xff0c;运行生…

【STM32】【HAL库】遥控关灯1主机

相关连接 【STM32】【HAL库】遥控关灯0 概述 【STM32】【HAL库】遥控关灯1主机 【STM32】【HAL库】遥控关灯2 分机 【STM32】【HAL库】遥控关灯3 遥控器 需求 主机需要以下功能: 接收来自物联网平台的命令发送RF433信号给从机接收RF433信号和红外信号驱动舵机动作 方案设计…

【计算机网络期末复习】第二章 物理层

✍个人博客&#xff1a;https://blog.csdn.net/Newin2020?spm1011.2415.3001.5343 &#x1f4e3;专栏定位&#xff1a;为想复习学校计算机网络课程的同学提供重点大纲&#xff0c;帮助大家渡过期末考~ &#x1f4da;专栏地址&#xff1a; ❤️如果有收获的话&#xff0c;欢迎点…

Verilog语法之数学函数

Verilog-2005支持一些简单的数学函数&#xff0c;其参数的数据类型只能是integer和real型。 Integer型数学函数 $clog2是一个以2为底的对数函数&#xff0c;其结果向上取整&#xff0c;返回值典型的格式&#xff1a; integer result; result $clog2(n); 最典型的应用就是通过…