RocketMQ底层源码解析——事务消息的实现

news2024/9/25 4:36:18

1. 简介

RocketMQ自身实现了事务消息,可以通过这个机制来实现一些对数据一致性有强需求的场景,保证上下游数据的一致性。
在这里插入图片描述

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。
  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
  • 积分系统状态变更:变更用户积分,更新用户积分表。
  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

引入RocketMQ之后保证上下游数据的一致性。
在这里插入图片描述
使用普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。 而基于 RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。

整个事务消息的详细交互流程如下图所示:
在这里插入图片描述
事务消息详细步骤:

  1. 生产者将半事务消息发送至 RocketMQ Broker。
  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  6. 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置。

2. 实战

分别启动:namesrv、broker

producer示例代码:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
		producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }

    static class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);

        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        return LocalTransactionState.COMMIT_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
}

消费者示例代码:

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

       
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setNamesrvAddr("127.0.0.1:9876");

       
        consumer.subscribe("TopicTest1234", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
   
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

将producer和consumer的topic都设置为:TopicTest1234。然后再启动consumer和producer。可以看到TransactionProducer控制台输出日志:
在这里插入图片描述
Consumer控制台输出日志:
在这里插入图片描述

3. 原理分析

分别启动namesrv和broker服务,随后运行:org.apache.rocketmq.example.transaction.TransactionProducer.java,构建出一个TransactionListenerImpl对象之后,添加到TransactionProducer中,为半消息的发送以及本地事务校验做做准备。

核心入口:TransactionProducer#sendMessageInTransaction()

    @Override
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException {
        if (null == this.transactionListener) {
            throw new MQClientException("TransactionListener is null", null);
        }

        // 包装Topic,判断是否是重试Topic以及DLQ的Topic
        msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
        // 发送事务消息
        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
    }

DefaultMQProducerImpl#sendMessageInTransaction()

    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        // 获取事务监听器,就程序开头的时候传入的TransactionListenerImpl对象
        TransactionListener transactionListener = getCheckListener();
        // 新版本推荐事务监听器,已不使用localTransactionExecuter
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }

        // 设置DelayTimeLevel 参数
        if (msg.getDelayTimeLevel() != 0) {
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        }

        // 消息校验
        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        // 设置属性:TRAN_MSG为true
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        // 设置属性:PGROUP为producerGroup
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
        	// ① 发送半消息
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    // 事务ID不为空
                    if (sendResult.getTransactionId() != null) {
                        // 设置事务id
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    if (null != localTransactionExecuter) {
                    	// 执行本地事务检查
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                    	// ② 执行本地事务检查                        
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

        try {
        	// ③ 调用endTransaction判断是否结束事务
            this.endTransaction(msg, sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

		// 构建事务消息发送结果
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }

在该方法中核心步骤分别位于①②③标识处。

①:调用链路为最终抵达:DefaultMQProducerImpl#sendDefaultImpl(),也就是producer发消息核心方法。
在sendDefaultImpl()中有一处比较关键的代码:

				// 事务
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }

此处代码给这条消息标识上事务消息的标志,最后通过mQClientAPIImpl往broker发送一条RPC的请求。

Broker端接受到请求之后经过请求code的分发,这条请求将由SendMessageProcessor#asyncSendMessage进行处理。

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
		// 相关代码省略...
		
        // 事务消息标识,从请求头中获取
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (Boolean.parseBoolean(transFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            // 事务消息处理
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            // 将消息内容存储到CommitLog文件
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    }

asyncSendMessage方法处会从消息属性中获取到transFlag标识,判断到这条消息是一条半消息,然后调用this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner)。

方法调用链最终来到TrasactionalMessageBridge#asyncPutHalfMessage(),

    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
        return store.asyncPutMessage(parseHalfMessageInner(messageInner));
    }
   /**
     * 消息事务半消息
     * @param msgInner
     * @return
     */
    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        // 设置真正要发送的Topic
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        // 设置真正要发送的queueId
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        // 设置半消息Topic
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        // 设置半消息queueId
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }

parseHalfMessageInner方法做了两件事,将这条半消息真正要发送的Topic以及queueId另外存起来,然后将这条半消息要发送到指定存储半消息的的RMQ_SYS_TRANS_HALF_TOPIC中,然后将queueId设置为0。(存储半消息的Topic只有1个queueId)
最后将这条半消息存入到CommitLog中,则步骤①执行完成。

② 步骤①发送半消息到CommitLog中存储之后,此时消息发送结果为SEND_OK,接着将调用TransactionListener#executeLocalTransaction()方法,检查本地事务的状态,此处由开发者自行实现代码逻辑。

③ 检查本地事务结束之后,会调用endTransaction()方法来尝试结束此次的事务消息。

public void endTransaction(
        final Message msg,
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        // 获取消息事务id
        String transactionId = sendResult.getTransactionId();
        // 获取broker地址
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        // 设置消息comimtlog偏移量
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
            case COMMIT_MESSAGE:	// 提交消息
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:	// 回滚消息
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:	// 未知状态
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        // 往broker发送endTransactionOneway请求
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }

broker端EndTransactionProcessor#processRequest接收到endTransaction请求:

@Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final EndTransactionRequestHeader requestHeader =
            (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
        LOGGER.debug("Transaction request:{}", requestHeader);
        if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
            return response;
        }
		// 判断是事务check类型
        if (requestHeader.getFromTransactionCheck()) {
            switch (requestHeader.getCommitOrRollback()) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    return null;
                }

                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());

                    break;
                }

                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    break;
                }
                default:
                    return null;
            }
        } else {
            switch (requestHeader.getCommitOrRollback()) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    return null;
                }

                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    break;
                }

                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    break;
                }
                default:
                    return null;
            }
        }
        OperationResult result = new OperationResult();
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
            // 这里提交requestHeader,实际是从commitlog中获取halfMessage
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                // 检查halfMessage状态
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                	// ④ 半消息处理成功之后,调用endMessageTransaction()发起事务消息结束
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    // 清楚事务标识
                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    // ⑤ 发送真正的完整的消息
                    RemotingCommand sendResult = sendFinalMessage(msgInner);
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                    	// ⑦ 删除半消息
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                return res;
            }
        } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
            // ⑥ 发起消息回滚,获取要回滚的半消息
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                // 查询半消息
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    // ⑦  删除半消息
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return res;
            }
        }
        response.setCode(result.getResponseCode());
        response.setRemark(result.getResponseRemark());
        return response;
    }

④ 调用endMessageTransaction

private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        // 取出真正要发送是Topic
        msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        // 取出真正要发送的queueid
        msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(msgExt.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
        msgInner.setWaitStoreMsgOK(false);
        msgInner.setTransactionId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        msgInner.setSysFlag(msgExt.getSysFlag());
        TopicFilterType topicFilterType =
            (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
                : TopicFilterType.SINGLE_TAG;
        long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
        msgInner.setTagsCode(tagsCodeValue);
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
        return msgInner;
    }

该方法将这条半消息设置成真正发送的Topic以及queueId。

⑤ 调用EndTransactionProcessor#sendFinalMessage(),发送真正的消息。最终调用MessageStore#putMessage()将该条消息存入CommitLog中。

⑥ 调用TransactionalMessageServiceImpl#rollbackMessage()对半消息进行回滚,但该方法实则是读取到半消息,然后再检查这条半消息。

⑦ 发送真正的消息或者是回滚半消息成功之后,随后调用TransactionalMessageServiceImpl#deletePrepareMessage()删除半消息。

经过①~⑦的步骤,整个RocketMQ的事务消息流程也就结束了,但是这里有一个关键点还没有讲解,当半消息发送成功了,本地事务执行成功发送本地事务状态时发生了broker断电或者是本地事务状态没有发送成功时,该如何保证整个流程能够正常运行呢?答案就是broker端会在启动时启动一个定时任务区检查本地事务的状态,也就是方法:TransactionListener#checkLocalTransaction()。

也就是Broker端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。此外,需要注意的是事务消息的生产组名称 ProducerGroupName不能随意设置。事务消息有回查机制,回查时Broker端如果发现原始生产者已经崩溃,则会联系同一生产者组的其他生产者实例回查本地事务执行情况以Commit或Rollback半事务消息。

参考

  • RocketMQ官网-事务消息发送

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/340533.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于javaee的电影碟片租赁管理系统的设计

技术&#xff1a;Java、JSP、框架等摘要&#xff1a;随着信息技术在管理中的广泛应用&#xff0c;管理信息系统(MIS)的实施在技术上逐渐成熟。为了适应时代的发展&#xff0c;降低管理成本&#xff0c;提高工作效率&#xff0c;企业需要加强对内部资源(人、钱、物)的有效管理&a…

Android测试包安装方式汇总

背景&#xff1a;作为一名测试&#xff0c;尤其是移动端测试&#xff0c;掌握app的安装方式是必备的基本技能&#xff0c;因此将Android测试包不同格式不同方式的安装方式进行一个总结分享​&#xff0c;仅供大家学习参考。 一、设备调试准备 1、设备打开开发者模式&#xff…

医学生考研考博太卷,一篇文章轻松助力上岸(一)

考研考博太卷了&#xff0c;卷不过&#xff0c;想没想过本科发一篇文章呢&#xff1f; 330分考研人淘汰390分考研人这个故事&#xff0c;大家应该都知道吧。 本专栏带你六个月内&#xff0c;搞定一篇文章&#xff0c;本科生发文章也很容易。 在卷考研的同时&#xff0c;再卷…

应用场景一:西门子PLC通过桥接器连接MQTT服务器

应用场景描述&#xff1a; 云平台、MES等数据采集、设备管理系统&#xff0c;需要通过MQTT的方式&#xff0c;上传和下发数据&#xff0c;MQTT服务器可以获取PLC的实时状态数据&#xff0c;也可以下发控制指令。桥接器提供4G、WIFI和有线三种连接方式。 网络拓扑&#xff1a;…

GRBL源码简单分析

结构体说明 GRBL里面的速度规划是带运动段前瞻的&#xff0c;所以有规划运动段数据和微小运动段的区分 这里的“规划运动段”对应的数据结构是plan_block_t&#xff0c;前瞻和加减速会使用到&#xff0c;也就是通过解析G代码后出来的直接直线数据或是圆弧插补出来的拟合直线数据…

【链式二叉树】数据结构链式二叉树的(万字详解)

前言&#xff1a; 在上一篇博客中&#xff0c;我们已经详解学习了堆的基本知识&#xff0c;今天带大家进入的是二叉树的另外一种存储方式----“链式二叉树”的学习&#xff0c;主要用到的就是“递归思想”&#xff01;&#xff01; 本文目录1.链式二叉树的实现1.1前置说明1.2结…

【蓝桥杯单片机】Keil5中怎么添加STC头文件;从烧录软件中添加显示添加成功后新建工程时依旧找不到

蓝桥杯单片机的芯片型号&#xff1a;IAP15F2K61S2 添加头文件&#xff1a;STC15F2K60S2.H 【1】如何通过烧录软件添加STC头文件&#xff1a; 从ATC-ISP的Keil仿真设置中添加&#xff08;同时自动下载仿真驱动&#xff09;仔细阅读添加说明 KEIL5添加STC芯片库_Initdev的博客-…

UVa The Morning after Halloween 万圣节后的早晨 双向BFS

题目链接&#xff1a;The Morning after Halloween 题目描述&#xff1a; 给定一个二维矩阵&#xff0c;图中有障碍物和字母&#xff0c;你需要把小写字母移动到对应的大写字母位置&#xff0c;不同的小写字母可以同时移动&#xff08;上下左右四个方向或者保持不动 &#xff0…

概论_第8章_假设检验的基本步骤__假设检验的类型

一. 假设检验的基本步骤如下&#xff1a;第1步 根据实际问题提出原假设 及备择假设 , 要求 与 有且仅有一个为真&#xff1b;第2步 选取适当的检验统计量&#xff0c; 并在原假设 成立的条件下确定该检验统计量的分布&#xff1b;第3步 按问题的具体要求&#xff0c; 选取适当…

【java】OpenFeign源码解析学习

本文主要针对 spring-cloud-starter-openfeign 的 2.2.3.RELEASE 版本进行源码的解析。 OpenFeign是什么&#xff1f; 作为Spring Cloud的子项目之一&#xff0c;Spring Cloud OpenFeign以将OpenFeign集成到Spring Boot应用中的方式&#xff0c;为微服务架构下服务之间的调用提…

SQL Serve 日志体系结构

SQL Server 事务日志记录着 undo 及 redo 日志&#xff0c;为了保证数据库在崩溃后恢复&#xff0c;或者在正常数据库操作期间进行回滚&#xff0c;保证数据库事务完整性和持久化。如果没有事务日志记录&#xff0c;数据库在事务上将不一致&#xff0c;并且在数据库崩溃后可能导…

ThinkPHP5酒店预订管理系统

有需要请私信或看评论链接哦 可远程调试 ThinkPHP5酒店预订管理系统一 介绍 此酒店预订管理系统基于ThinkPHP5框架开发&#xff0c;数据库mysql&#xff0c;采用了ueditor富文本编辑器。系统角色分为用户&#xff0c;员工和管理员。用户可注册登录并预订酒店和评论等&#xff…

SpringCloud AlibabaSeata1.5.2的安装

目录 一、分布式问题 二、Seate简介 &#xff08;一&#xff09;官网 &#xff08;二&#xff09;Seate分布式事务的过程 &#xff08;三&#xff09; 分布式事务处理过程 &#xff08;四&#xff09;下载地址 三、Seata-Server安装 &#xff08;一&#xff09;官网 …

《Spring源码深度分析》第8章 数据库连接JDBC

目录标题前言一、数据库连接方式1.JDBC连接数据库2.Spring Jdbc连接数据库(JdbcTemplate)二、JdbcTemplate源码分析1.update/save功能的实现源码分析入口(关键)基础方法execute1.获取数据库连接池2.应用用户设定的输入参数3. 调用回调函数处理4. 资源释放Update中的回调函数2.q…

TreeSet 与 TreeMap And HashSet 与 HashMap

目录 Map TreeMap put()方法 : get()方法 : Set> entrySet() (重) : foreach遍历 : Set 哈希表 哈希冲突 : 冲突避免 : 冲突解决 ---- > 比散列(开放地址法) : 开散列 (链地址法 . 开链法) 简介 : 在Java中 , TreeSet 与 TreeMap 利用搜索树实现 Ma…

【项目精选】javaEE健康管理系统(论文+开题报告+答辩PPT+源代码+数据库+讲解视频)

点击下载源码 javaEE健康管理系统主要功能包括&#xff1a;教师登录退出、教师饮食管理、教师健康日志、体检管理等等。本系统结构如下&#xff1a; &#xff08;1&#xff09;用户模块&#xff1a; 实现登录功能 实现用户登录的退出 实现用户注册 &#xff08;2&#xff09;教…

运筹系列78:cbc使用介绍

1. 上手 1.1 快速使用 首先是简单的调用测试&#xff0c;在mac上首先安装clp的库&#xff1a;brew install coin-or-tools/coinor/cbc&#xff0c;然后新建项目进行调用&#xff0c;各项配置如下&#xff0c;注意要添加的library和directory比较多&#xff1a; 1.2 命令行方…

锁机制面试题

你是怎么理解乐观锁和悲观锁的&#xff0c;具体怎么实现呢&#xff1f; 悲观锁认为多个线程访问同一个共享变量冲突的概率较大 , 在每次访问共享变量之前都去真正加锁. 乐观锁认为多个线程访问同一个共享变量冲突的概率不大.,并不会真的加锁, 而是直接尝试访问数据 . 在访问的…

AcWing 165. 小猫爬山(DFS + 剪枝优化)

AcWing 165. 小猫爬山&#xff08;DFS 剪枝优化&#xff09;一、问题二、分析1、贪心想法的误区2、正解三、代码一、问题 二、分析 这道题其实总结下来&#xff0c;就是一句话&#xff0c;让更多的小猫坐在一辆车上&#xff0c;从而减少车的数量。 1、贪心想法的误区 这道题…

Spring Security in Action 第四章 SpringSecurity处理密码的相关讨论

本专栏将从基础开始&#xff0c;循序渐进&#xff0c;以实战为线索&#xff0c;逐步深入SpringSecurity相关知识相关知识&#xff0c;打造完整的SpringSecurity学习步骤&#xff0c;提升工程化编码能力和思维能力&#xff0c;写出高质量代码。希望大家都能够从中有所收获&#…