RocketMQ5.0.0事务消息

news2024/11/25 2:59:53

目录

一、事务消息概览

二、事务消息实现机制

1. 事务消息发送流程

        1):发送事务消息类图

        2):生产端发送事务消息

        3):Broker存储事务消息

2. 生产者提交或回滚事务消息

        1):生产者发送提交或回滚事务请求

        2):Broker处理生产者请求

3. Broker回查事务状态

        1):Broker执行回查任务

        2):线程池执行异步回查

三、事务消息实例

1. 事务消息监听类

2. 事务消息生产者

3. 事务消息消费者

4. 业务代码

四、参考资料


一、事务消息概览

        RocketMQ事务消息的实现原理基于两阶段提交和定时事务状态回查来决定消息最终是提交还是回滚,消费者可以消费事务提交的消息,如下图所示。事务消息的作用:确保本地业务与消息在一个事务内,本地业务成功执行,消费者才能消费消息

事务消息实现流程

        事务消息实现流程分3个部分:

  • 应用程序:事务内完成相关业务数据入库后,需要同步调用RocketMQ消息发送接口,发送状态为prepare消息。消息发送成功后,RocketMQ服务器会回调RocketMQ消息发送者的事件监听器,记录消息的本地事务状态,事务状态与本地业务操作同属一个事务,确保消息发送与本地事务的原子性
  • Broker服务器存储消息:存储器Broker收到类型为prepare的消息时,会首先备份消息的原主题与原消息消费队列到消息扩展属性中,然后修改消息存储主题RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
  • Broker定时任务:存储器Broker开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC的消息,向消息发送端(应用程序)发起消息事务状态回查,应用程序根据保存的事务状态反馈消息服务器事务的状态(提交、回滚、未知),如果是提交或回滚,则消息服务器提交或回滚消息,如果是未知,待下一次回查, RocketMQ 允许设置一条消息的回查间隔与回查次数,如果超过回查次数后依然无法获知消息的事务状态,则默认回滚消息。 

二、事务消息实现机制

1. 事务消息发送流程

        1):发送事务消息类图

        org.apache.rocketmq.client.producer.TransactionMQProducer事务消息生产者类,其UML类图如下。

        org.apache.rocketmq.client.producer.TransactionListener是事务监听器接口,用户必须自定义事务监听器实现类复写两个方法:

  • TransactionListener#executeLocalTransaction():执行本地事务(如:保存事务消息ID),与业务逻辑在一个事务内,供状态回查使用。
  • TransactionListener#checkLocalTransaction():回查本地事务状态,即:事务是否提交或回滚或未知,若是未知,则继续下次查询。

        下图是发送事务消息总流程图,下面小节详细介绍消息的发送过程。

        2):生产端发送事务消息

        org.apache.rocketmq.client.producer.TransactionMQProducer#sendMessageInTransaction是发送事务消息的入口方法,其调用链如下。

        org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction是发送事务消息的核心方法,其代码如下。注意事项:

  • 设置消息为prepare:为后续发送消息前改变消息topic,而原topic和消费队列添加到消息扩展属性中;
  • 设置消息生产组:为后续Broker定时查询事务状态时,从生产组中随机选择一个即可;
  • 调用结束事务endTransaction():调用该方法时,本地业务逻辑及事务可能完成,直接执行结束事务后,就不用后续定期回查事务状态;若本地事务没有完成,才定期回查事务状态
/**
 * 发送事务消息核心逻辑
 * step1:获取生产者的事务监听器,自定义实现类,接口是{@link TransactionListener}
 * step2:检查消息延迟级别,若是事务消息则忽略延迟参数
 * step3:检查消息是否符合规则,如:topic规范、消息体不能为空、消息体长度默认不能大于4MB
 * step4:设置消息为prepare消息 + 设置消息生产组(目的:监听器查询事务状态时,从生产组中随机选择一个即可)
 * step5:发送事务(与普通消息发送流程相同)并返回结果
 *        {@link DefaultMQProducerImpl#send(Message)}
 * step6:消息发送成功后,获取事务ID和记录本地事务状态(保存事务ID与处理业务处在同一事务中,供服务端事务消息状态回查提供依据)
 * step7:结束事务,事务状态:提交、回滚、不处理,注意:状态UNKNOW时,则业务事务并没有提交,需要状态回查
 *        {@link DefaultMQProducerImpl#endTransaction()}
 * @param msg 事务消息内容
 * @param localTransactionExecuter 本地事务Executer
 * @param arg 传入事务监听器的参数
 * @return
 */
public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    // 获取生产者的事务监听器
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }

    // ignore DelayTimeLevel parameter 忽略延迟消息参数
    if (msg.getDelayTimeLevel() != 0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    // 检查消息是否符合规则
    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    // 设置消息为prepare消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    // 设置消息生产组,目的:监听器查询事务状态时,从生产组中随机选择一个即可
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        // 发送事务(与普通消息发送流程相同)
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    // 根据消息发送结果,作响应的本地事务状态处理
    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        /*
            消息发送成功,获取事务ID和记录本地事务状态
            注意:保存事务ID与处理业务处在同一事务中,供服务端事务消息状态回查提供依据
         */
        case SEND_OK: {
            try {
                // 从消息发送结果获取事务ID
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }

                // 记录本地事务状态
                if (null != localTransactionExecuter) {
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) {
                    log.debug("Used new transaction API");
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

    try {
        // 结束事务,事务状态:提交、回滚、不处理,注意:状态UNKNOW时,则业务事务并没有提交,需要状态回查
        this.endTransaction(msg, sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    TransactionSendResult transactionSendResult = new TransactionSendResult();
    transactionSendResult.setSendStatus(sendResult.getSendStatus());
    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
    transactionSendResult.setMsgId(sendResult.getMsgId());
    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
    transactionSendResult.setTransactionId(sendResult.getTransactionId());
    transactionSendResult.setLocalTransactionState(localTransactionState);
    return transactionSendResult;
}

        org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl发送消息,若是prepare消息时,添加事务消息标识到系统标识中。

/**
 * 发送消息
 * step1:从MQClientInstance获取broker地址,若没有,则从NameServer获取,再没有则抛出异常;
 * step2:不是批量发送消息,则为消息分配一个全局消息ID;
 * step3:设置消息是否压缩,消息体 > 4KB,则压缩;
 * step4:是否是事务Prepare消息,若是写入事务prepare标签;
 * step5:执行发送前钩子函数;
 * step6:根据不同发送方式,发送消息;
 * step7:执行发送后钩子函数;
 */
private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

            ......

            // 是否是事务Prepare消息
            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (Boolean.parseBoolean(tranMsg)) {
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; // 事务消息标签
            }       
            ......
}

        3):Broker存储事务消息

        普通消息发送与Broker存储消息参考资料  《RocketMQ5.0.0消息发送》、《RocketMQ5.0.0消息存储<二>_消息存储流程》。org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage该方法是普通消息的处理,解析系统标识中是否有事务消息标签,若是:

  • 判断Broker是否禁止事务消息存储
  • 事务消息时,执行事务消息存储逻辑:异步TransactionalMessageService#asyncPrepareMessage、同步TransactionalMessageService#prepareMessage
/**
 * 存储之前,对消息的处理
 * step1:预发送处理,如:检查消息、主题是否符合规范
 * step2:发送消息时,是否指定消费队列,若没有则随机选择
 * step3:消息是否进入重试或延迟队列中(重试次数失败)
 * step4:消息是否是事务消息,若是则存储为prepare消息
 * step5:BrokerConfig#asyncSendEnable是否开启异步存储,默认开启true
 *        (异步存储、同步存储)
 */
public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
    final RemotingCommand request,
    final SendMessageContext sendMessageContext,
    final SendMessageRequestHeader requestHeader,
    final TopicQueueMappingContext mappingContext,
    final SendMessageCallback sendMessageCallback) throws RemotingCommandException {
    
    ......

    // Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    // 事务标签
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    boolean sendTransactionPrepareMessage = false;
    if (Boolean.parseBoolean(traFlag)
        && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
        // Broker禁止事务消息存储
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                    + "] sending transaction message is forbidden");
            return response;
        }
        sendTransactionPrepareMessage = true;
    }

    long beginTimeMillis = this.brokerController.getMessageStore().now();

    // 消息是否异步存储
    if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
        CompletableFuture<PutMessageResult> asyncPutMessageFuture;
        if (sendTransactionPrepareMessage) { // 事务prepare操作
            asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }

        final int finalQueueIdInt = queueIdInt;
        final MessageExtBrokerInner finalMsgInner = msgInner;
        asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {
            RemotingCommand responseFuture =
                handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext,
                    ctx, finalQueueIdInt, beginTimeMillis, mappingContext);
            if (responseFuture != null) {
                doResponse(ctx, request, responseFuture);
            }
            sendMessageCallback.onComplete(sendMessageContext, response);
        }, this.brokerController.getPutMessageFutureExecutor());
        // Returns null to release the send message thread
        return null;
    }
    // 消息同步存储
    else {
        PutMessageResult putMessageResult = null;
        // 事务消息存储
        if (sendTransactionPrepareMessage) {
            putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
        } else {
            // 同步存储消息
            putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }
        handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext);
        sendMessageCallback.onComplete(sendMessageContext, response);
        return response;
    }
}

        org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner该方法是事务消息与普通消息的主要区别。将事务消息原主题、消息队列ID放入消息扩展属性中,然后将消息topic变更为RMQ_SYS_TRANS_HALF_TOPIC,消息队列ID为0之后再存储消息,目的是:不会被消费者消费,而是定时任务查询生产者事务状态后,恢复原始消息供消费者消费。如下代码所示。

/**
 * 这里是事务消息与普通消息的主要区别:
 * 将事务消息的原始topic、消息队列ID 存放到消息属性中,
 * 并更改为:topic变更为:RMQ_SYS_TRANS_HALF_TOPIC,消息队列ID:0
 * 所以:不会被消费者消费,而是定时任务查询生产者事务状态后,恢复原始消息供消费者消费
 */
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    // 消息的原始topic、消息队列ID 存放到消息属性中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));

    // topic变更为:RMQ_SYS_TRANS_HALF_TOPIC,消息队列ID:0
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

2. 生产者提交或回滚事务消息

        1):生产者发送提交或回滚事务请求

        根据上小节发送事务消息的核心方法DefaultMQProducerImpl#sendMessageInTransaction看出,生产者会调用DefaultMQProducerImpl#endTransaction来结束消息事务。原因是本地业务逻辑及本地事务可能完成,直接执行结束事务后,就不用后续定期回查事务状态;若本地事务没有完成(UNKNOW),才定期回查事务状态。如下代码所示。

        发送消息时,默认本地事务状态为未知(UNKNOW),然后发送消息成功后,执行监听器的executeLocalTransaction()方法并返回本地事务状态。当生产者发送请求来结束事务消息时,本地事务状态可能处于提交(COMMIT_MESSAGE)、回滚(ROLLBACK_MESSAGE)、未知(UNKNOW)

/**
 * 生产者根据本地事务状态触发Broker的消息提交或回滚
 * 注意:状态UNKNOW时,则业务事务并没有提交,需要状态回查;而提交或回滚时,事务消息从prepare阶段到提交或回滚阶段
 *         prepare阶段:topic为:RMQ_SYS_TRANS_HALF_TOPIC
 *         已处理(提交或回滚)的topic:RMQ_SYS_TRANS_OP_HALF_TOPIC
 * Broker端处理请求入口:{@link org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest}
 */
public void endTransaction(
    final Message msg,
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    // 消息ID
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    // 事务ID
    String transactionId = sendResult.getTransactionId();
    // 根据发送结果返回的消息队列ID(修改后的消息队列ID为0)的broker名称
    final String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(defaultMQProducer.queueWithNamespace(sendResult.getMessageQueue()));
    // 获取Broker地址
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName);
    // 组装结束事务请求头
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    // 根据本地事务状态,设置请求头事务状态
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    // 结束前调用钩子函数
    doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;

    // 单向发送结束事务请求到目的Broker
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

        2):Broker处理生产者请求

        org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest是Broker处理结束事务消息请求的核心方法,其代码如下,注意事项:

  • 本地事务状态为UNKNOW时,Broker不做处理,后续定时任务回查事务状态来结束事务
  • 事务提交或回滚的区别联系:

        区别:提交状态时,还原原始消息并提交到Commitlog文件内存映射,后删除prepare消息;

                   回滚状态时,直接删除prepare消息;

        联系:提交、回滚状态时都要删除prepare消息(删除表示该事务消息已处理),删除不是物理删除,而是prepare阶段的topic为RMQ_SYS_TRANS_HALF_TOPIC,修改为RMQ_SYS_TRANS_OP_HALF_TOPIC

/**
 * 处理生产者结束本地事务请求
 * 生产者结束本地事务总入口:{@link DefaultMQProducerImpl#endTransaction}
 * step1:判断Broker为从服务器,则直接返回
 * step2:提交:获取prepare阶段消息:{@link TransactionalMessageServiceImpl#commitMessage}
 *            检查prepare事务消息:{@link EndTransactionProcessor#checkPrepareMessage}
 *            还原原始消息:{@link EndTransactionProcessor#endMessageTransaction}
 *            存储到Commitlog,供消费者消费:{@link EndTransactionProcessor#endMessageTransaction}
 *            删除prepare消息(修改topic):{@link TransactionalMessageServiceImpl#deletePrepareMessage}
 *       回滚:获取prepare阶段消息:{@link TransactionalMessageServiceImpl#commitMessage}
 *            检查prepare事务消息:{@link EndTransactionProcessor#checkPrepareMessage}
 *            删除prepare消息(修改topic):{@link TransactionalMessageServiceImpl#deletePrepareMessage}
 * 注意:
 *  a.提交或回滚的区别:是否还原原始消息,并存储到Commitlog文件 供消费者消费
 *  b.prepare阶段的topic:RMQ_SYS_TRANS_HALF_TOPIC
 *    已处理(提交或回滚)的topic:RMQ_SYS_TRANS_OP_HALF_TOPIC
 */
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
    RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final EndTransactionRequestHeader requestHeader =
        (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
    LOGGER.debug("Transaction request:{}", requestHeader);
    // Broker为从服务器,则直接返回
    if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
        response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
        LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
        return response;
    }

    // 事务检查,fromTransactionCheck默认false
    if (requestHeader.getFromTransactionCheck()) {
        switch (requestHeader.getCommitOrRollback()) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                return null;
            }

            case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());

                break;
            }

            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                break;
            }
            default:
                return null;
        }
    } else {
        // 事务类型(依据是生产者本地事务状态)
        switch (requestHeader.getCommitOrRollback()) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                return null;
            }

            case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                break;
            }

            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                break;
            }
            default:
                return null;
        }
    }
    OperationResult result = new OperationResult();
    // 生产者本地事务提交
    if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
        // 根据偏移量,获取事务消息(prepare阶段)
        result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
        // 查找prepare事务消息成功
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            // 检查prepare事务消息
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                // 还原原始消息(prepare)
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                // 原始消息,存储到Commitlog文件,供消费者消费
                RemotingCommand sendResult = sendFinalMessage(msgInner);
                // 存储成功后,删除prepare阶段的消息
                // prepare阶段的topic:RMQ_SYS_TRANS_HALF_TOPIC,改为 已处理(提交或回滚)的topic:RMQ_SYS_TRANS_OP_HALF_TOPIC
                if (sendResult.getCode() == ResponseCode.SUCCESS) {
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return sendResult;
            }
            return res;
        }
    }
    // 生产者本地事务回滚
    else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
        // 根据偏移量,获取事务消息(prepare阶段)
        result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            // 删除prepare阶段的消息
            // prepare阶段的topic:RMQ_SYS_TRANS_HALF_TOPIC,改为 已处理(提交或回滚)的topic:RMQ_SYS_TRANS_OP_HALF_TOPIC
            if (res.getCode() == ResponseCode.SUCCESS) {
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return res;
        }
    }
    response.setCode(result.getResponseCode());
    response.setRemark(result.getResponseRemark());
    return response;
}

3. Broker回查事务状态

        根据上两小节介绍,本地事务状态为UNKNOW时,生产者结束本地事务请求时,Broker不做任何处理,而是Broker定时任务回查事务状态。TransactionalMessageCheckService线程默认1min定时周期检测topic为RMQ_SYS_TRANS_HALF_TOPIC消息,回查事务状态。broker.conf配置transactionChecklnterval来改变默认值,单位为毫秒。如下所示是Broker事务消息UML图。

        下图所示是事务消息回查状态流程图。 

        1):Broker执行回查任务

        org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#run执行检查任务,每1min执行周期,调用onWaitEnd()方法。如下代码所示,其中两个重要参数:

  • transactionTimeOut:事务过期时间,默认6s。只有当前时间 > 存储时间 + 事务过期时间 时,才执行事务状态回查,否则待下次查询。
  • transactionCheckMax:回查状态最大次数,默认15。大于该值则不会继续回查,丢弃(相当于回滚)。
/**
 * 默认1min检查一次事务状态,事务到期后,调用生产者的事务状态
 */
@Override
public void run() {
    log.info("Start transaction check service thread!");
    while (!this.isStopped()) {
        // 检查时间间隔,默认1min,通过broker.conf配置transactionCheckInterval
        long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
        // 等待1min,执行onWaitEnd()方法
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}

@Override
protected void onWaitEnd() {
    // 事务过期时间(默认6s):当前时间 > 存储时间 + 事务过期时间 时,才执行事务状态回查
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    // 事务回查最大次数(默认15),大于该值则不会继续回查,丢弃(相当于回滚)
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    // 事务状态回查实现逻辑
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

        org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check是状态回查的核心逻辑方法,如下代码所示。注意事项:

  • fillOpRemoveMap()方法目的: 根据当前的处理进度(opOffset)依次从已处理队列拉取32条消息,方便判断当前处理的消息是否已经处理过,如果处理过则无须再次发送事务状态回查请求,避免重复发送事务回查请求(异步回查状态)
  • 判断是否需要发送事务状态回查isNeedCheck:

                a. 已处理事务消息为空 + prepare消息已存储的时间大于立即检查时间;

                b. 已处理事务消息不为空 + 已处理事务消息(32条)最后一个消息的已存储的时间大于事务超时时间。

  • putBackHalfMsgQueue()方法目的:发送事务状态回查前,再次将消息存储到prepare阶段的Commitlog文件。原因是:

                a. 发送事务状态回查是异步请求,无法立刻知道处理结果;

                b. MQ是顺序写,已经修改检查次数的当前消息,修改已存储的消息,性能无法保证

  • MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS:消息事务消息回查请求的最晚时间,单位为秒,指的是程序发送事务消息时,可以用户指定该事务消息的有效时间,只有在这个时间内收到回查消息才有效,默认为null。 
/**
 * 事务状态回查实现逻辑
 * step1:根据prepare阶段的事务消息topic,获取消息队列集合并遍历
 * step2:根据消息队列messageQueue,获取已处理队列opQueue
 * step3:根据当前处理进度 从已处理的消费队列 拉取32条消息
 *        {@link TransactionalMessageServiceImpl#fillOpRemoveMap}
 * step4:判定一个周期是否超出执行最长总时间、消息已被处理
 * step5:获取prepare阶段的待处理消息
 *        needDiscard()方法:判定prepare事务消息回查次数是否大于最大回查次数,超出则丢弃(事务回滚)
 *        needSkip()方法:判定prepare事务消息已存储的时间是否超出文件过期时间
 * step6:判定prepare消息是否在用户指定有效时间内或是否超出事务超时时间
 * step7:判断是否需要发送事务状态回查
 *        a.已处理事务消息为空 + prepare消息已存储的时间大于立即检查时间
 *        b.已处理事务消息不为空 + 已处理事务消息(32条)最后一个消息的已存储的时间大于事务超时时间
 * step7:需要回查,先存储新的消息到prepare,再回查状态
 *        {@link AbstractTransactionalMessageCheckListener#resolveHalfMsg}
 * step8:不需要回查,继续step3
 * step9:更新prepare阶段事务消息队列的回查进度
 *        更新已处理消息队列的进度
 */
@Override
public void check(long transactionTimeout, int transactionCheckMax,
    AbstractTransactionalMessageCheckListener listener) {
    try {
        // prepare阶段的事务消息topic
        String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
        // 获取对应的消费队列集合
        Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
        if (msgQueues == null || msgQueues.size() == 0) {
            log.warn("The queue of topic is empty :" + topic);
            return;
        }
        log.debug("Check topic={}, queues={}", topic, msgQueues);
        for (MessageQueue messageQueue : msgQueues) {
            long startTime = System.currentTimeMillis();
            // 根据prepare阶段消费队列获取已处理的消费队列
            MessageQueue opQueue = getOpQueue(messageQueue);
            // prepare阶段消费队列的处理偏移量
            long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
            // 已处理的消费队列的处理偏移量
            long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
            log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
            if (halfOffset < 0 || opOffset < 0) {
                log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                    halfOffset, opOffset);
                continue;
            }

            // 已处理消息偏移量集合
            List<Long/* opOffset */> doneOpOffset = new ArrayList<>();
            // halfOffset 与 opOffset映射关系
            HashMap<Long/* halfOffset */, Long/* opOffset */> removeMap = new HashMap<>();
            // 根据opOffset 从已处理的消费队列 往后拉取32条消息
            PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
            if (null == pullResult) {
                log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                    messageQueue, halfOffset, opOffset);
                continue;
            }
            // single thread
            int getMessageNullCount = 1; // 空消息次数
            long newOffset = halfOffset; // 当前处理prepare阶段消费队列的最新进度
            long i = halfOffset;         // 当前处理prepare阶段消费队列的队列偏移量
            while (true) {
                // 周期内的执行最长总时间(默认60s),超出时则跳出,待一下周期执行
                if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                    log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                    break;
                }
                // 消息已被处理
                if (removeMap.containsKey(i)) {
                    log.debug("Half offset {} has been committed/rolled back", i);
                    Long removedOpOffset = removeMap.remove(i);
                    doneOpOffset.add(removedOpOffset);
                }
                // 消息未被处理
                else {
                    // 获取prepare阶段的待处理消息
                    GetResult getResult = getHalfMsg(messageQueue, i);
                    MessageExt msgExt = getResult.getMsg();
                    // 消息为null
                    if (msgExt == null) {
                        // 空消息次数 > 最大空消息重试次数
                        if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                            break;
                        }
                        // 没有新的消息,导致消息为null
                        if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                            log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                messageQueue, getMessageNullCount, getResult.getPullResult());
                            break;
                        }
                        // 其他异常时,设置参数重新拉取
                        else {
                            log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                i, messageQueue, getMessageNullCount, getResult.getPullResult());
                            i = getResult.getPullResult().getNextBeginOffset();
                            newOffset = i;
                            continue;
                        }
                    }

                    // needDiscard()方法:判定prepare事务消息回查次数是否大于最大回查次数,超出则丢弃(事务回滚)
                    // needSkip()方法:判定prepare事务消息已存储的时间是否超出文件过期时间
                    if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                        listener.resolveDiscardMsg(msgExt);
                        newOffset = i + 1;
                        i++;
                        continue;
                    }
                    // 存储时间 > 当前开始检测时间
                    if (msgExt.getStoreTimestamp() >= startTime) {
                        log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                            new Date(msgExt.getStoreTimestamp()));
                        break;
                    }

                    // prepare事务消息已存储的时间
                    long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                    // 立即检查事务状态的时间,默认值MQ内设置事务超时时间transactionTimeout,则回查事务状态
                    long checkImmunityTime = transactionTimeout;
                    // 用户指定回查事务状态的最晚时间,默认为null(超出时,事务回滚,即:查询状态必须在最晚时间内有效)
                    String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                    if (null != checkImmunityTimeStr) { // 用户设置
                        // 获取回查事务状态时间
                        checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                        // 已存储的时间 在 立即回查事务状态时间 范围内,则说明用户指定查询状态在最晚时间内
                        if (valueOfCurrentMinusBorn < checkImmunityTime) {
                            // 发送事务状态回查
                            if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                newOffset = i + 1;
                                i++;
                                continue;
                            }
                        }
                    }
                    // 用户未设置,采用MQ内置transactionTimeout事务超时时间
                    else {
                        // 已存储的时间 不到事务超时时间,则下一次查询
                        if (0 <= valueOfCurrentMinusBorn && valueOfCurrentMinusBorn < checkImmunityTime) {
                            log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                            break;
                        }
                    }
                    // 获取已处理的32条消息
                    List<MessageExt> opMsg = pullResult.getMsgFoundList();

                    /*
                     * 判断是否需要发送事务状态回查
                     * a.已处理事务消息为空 + prepare消息已存储的时间大于立即检查时间
                     * b.已处理事务消息不为空 + 已处理事务消息(32条)最后一个消息的已存储的时间大于事务超时时间
                     */
                    boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime
                        || opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout
                        || valueOfCurrentMinusBorn <= -1;

                    // 需要回查
                    if (isNeedCheck) {
                        // 发送事务状态回查前,再次将消息存储到prepare阶段的Commitlog文件
                        if (!putBackHalfMsgQueue(msgExt, i)) { // 存储失败,则继续
                            continue;
                        }
                        // 存储成功,异步发送回查事务状态请求
                        listener.resolveHalfMsg(msgExt);
                    }
                    // 不需要回查
                    else {
                        // 继续拉取已处理的消息
                        pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                        log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                            messageQueue, pullResult);
                        continue;
                    }
                }
                newOffset = i + 1;
                i++;
            }

            // 更新prepare阶段事务消息队列的回查进度
            if (newOffset != halfOffset) {
                transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
            }

            // 更新已处理消息队列的进度
            long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
            if (newOpOffset != opOffset) {
                transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
            }
        }
    } catch (Throwable e) {
        log.error("Check error", e);
    }

}

        代码看出(下小节讲述),使用线程池来异步发送回查消息,为了回查消息进度保存的简化,只要发送了回查消息,当前回查进度会向前推动。如果回查失败,回查前重新增加的prepare消息将可以再次发送回查消息;那如果回查消息发送成功,会不会下一次又重复发送回查消息呢?根据已处理队列中的消息来判断是否重复,如果回查消息发送成功并且Broker完成提交或回滚操作,该消息会发送到已处理队列中(删除的prepare消息),然后首先会通过fillOpRemoveMap()根据处理进度获取一批已处理的消息,来与消息判断是否重复。由于fillOpRemoveMap()一次拉取32条消息, 那又如何保证一定能拉取到与当前消息的处理记录呢?其实就是isNeedCheck为false时,如果此批消息最后一条未超过事务延迟消息,则继续拉取更多消息进行判断。

        2):线程池执行异步回查

        org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener#resolveHalfMsg是执行事务状态回查入口方法,回查任务添加到线程池异步执行回查状态。sendCheckMessage()发送事务回查状态的核心方法,如下代码。

// 异步发送回查事务状态请求
public void resolveHalfMsg(final MessageExt msgExt) {
    if (executorService != null) {
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    // prepare事务消息回查事务状态
                    sendCheckMessage(msgExt);
                } catch (Exception e) {
                    LOGGER.error("Send check message error!", e);
                }
            }
        });
    } else {
        LOGGER.error("TransactionalMessageCheckListener not init");
    }
}


/**
 * prepare事务消息回查事务状态
 * 生产者接受请求的总入口:{@link ClientRemotingProcessor#processRequest}
 * @param msgExt prepare事务消息
 */
public void sendCheckMessage(MessageExt msgExt) throws Exception {
    // 组装事务状态回查请求头
    CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
    checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
    checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
    checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
    checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
    // prepare事务消息还原为原始消息
    msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
    msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
    msgExt.setStoreSize(0);
    // 获取生产组
    String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
    // 从生产组内获取一个生产者通道
    Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
    if (channel != null) {
        // 发送回查请求
        brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
    } else {
        LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
    }
}

三、事务消息实例

1. 事务消息监听类

        实现org.apache.rocketmq.client.producer.TransactionListener,该类有两个方法: 

  • executeLocalTransaction():保存本地事务中间表,用于Broker回查事务状态。
  • checkLocalTransaction():Broker定时回查事务状态,根据事务状态提交或回滚事务消息。        
package com.common.instance.demo.config.rocketmq;

import com.common.instance.demo.entity.TMessageTransaction;
import com.common.instance.demo.service.TMessageTransactionService;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;
import java.util.List;

/**
 * @description 订单事务消息监听器实现类
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 16:44
 **/
@Component
public class OrderTransactionListenerImpl implements TransactionListener {

    @Resource
    private TMessageTransactionService tMessageTransactionService;

    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
        // 组装事务
        TMessageTransaction tMessageTransaction = packageTMessageTransaction(message);

        // 保存事务中间表
        tMessageTransactionService.insert(tMessageTransaction);

        // 推荐返回UNKNOW状态,待事务状态回查
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        // 获取用户属性tabId
        String tabId = messageExt.getUserProperty("tabId");

        // 查询事务消息
        List<TMessageTransaction> tMessageTransactions = tMessageTransactionService.queryByTabId(tabId);

        if (!tMessageTransactions.isEmpty() && tMessageTransactions.size() <= 6) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }

        LogUtil.error("orderTransaction rollBack, tabId: " + tabId);
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }

    // 组装事务
    private TMessageTransaction packageTMessageTransaction(Message message) {
        TMessageTransaction tMessageTransaction = new TMessageTransaction();

        // 获取用户属性tabId
        String tabId = message.getUserProperty("tabId");
        // 事务ID
        String transactionId = message.getTransactionId();
        tMessageTransaction.setTabId(tabId);
        tMessageTransaction.setTransactionId(transactionId);
        tMessageTransaction.setCreateBy("auto");
        tMessageTransaction.setCreateTime(new Date());

        return tMessageTransaction;
    }

}

2. 事务消息生产者

package com.common.instance.demo.config.rocketmq;

import com.alibaba.fastjson.JSON;
import com.common.instance.demo.entity.WcPendantTab;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

/**
 * @description 订单事务消息生产者
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 16:54
 **/
@Component
public class OrderTransactionProducer {

    @Resource
    private OrderProducerProperties orderProducerProperties;

    @Resource
    private OrderTransactionListenerImpl orderTransactionListener;

    private TransactionMQProducer orderTransactionMQProducer;

    @PostConstruct
    public void start() {
        try {
            LogUtil.info("start rocketmq: order transactionProducer");
            orderTransactionMQProducer = new TransactionMQProducer(orderProducerProperties.getProducerGroup());
            orderTransactionMQProducer.setNamesrvAddr(orderProducerProperties.getNameSrcAddr());
            orderTransactionMQProducer.setSendMsgTimeout(orderProducerProperties.getSendMsgTimeout());
            // 注册事务监听器
            orderTransactionMQProducer.setTransactionListener(orderTransactionListener);
            orderTransactionMQProducer.start();
        } catch (MQClientException e) {
            LogUtil.error("OrderTransactionProducer.start()", "start rocketmq failed!", e);
        }
    }

    public void sendTransactionMessage(WcPendantTab data) {
        sendTransactionMessage(data, orderProducerProperties.getTopic(), orderProducerProperties.getTag(), null);
    }

    public void sendTransactionMessage(WcPendantTab data, String topic, String tags, String keys) {
        try {
            // 消息内容
            byte[] msgBody = JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8);
            // 消息对象
            Message message = new Message(topic, tags, keys, msgBody);
            message.putUserProperty("tabId", data.getTabId());

            // 发送事务消息
            orderTransactionMQProducer.sendMessageInTransaction(message, null);
        } catch (Exception e) {
            LogUtil.error("OrderTransactionProducer.sendMessage()","send order rocketmq error", e);
        }
    }

    @PreDestroy
    public void stop() {
        if (orderTransactionMQProducer != null) {
            orderTransactionMQProducer.shutdown();
        }
    }

}

3. 事务消息消费者

package com.common.instance.demo.config.rocketmq;

import com.log.util.LogUtil;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;

/**
 * @description 订单消费者
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 14:29
 **/
@Component
public class OrderConsumer implements MessageListenerConcurrently {

    @Resource
    private OrderConsumerProperties orderConsumerProperties;

    private DefaultMQPushConsumer orderMQConsumer;

    @PostConstruct
    public void start() {
        try {
            LogUtil.info("start rocketmq: order consumer");
            orderMQConsumer = new DefaultMQPushConsumer(orderConsumerProperties.getConsumerGroup());
            orderMQConsumer.setNamesrvAddr(orderConsumerProperties.getNameSrcAddr());
            orderMQConsumer.subscribe(orderConsumerProperties.getTopic(), orderConsumerProperties.getTag() == null ? "*":orderConsumerProperties.getTag());
            orderMQConsumer.setConsumeFromWhere(ConsumeFromWhere.valueOf(orderConsumerProperties.getConsumeFromWhere()));
            orderMQConsumer.registerMessageListener(this); // 注册监听器
            orderMQConsumer.start();
        } catch (MQClientException e) {
            LogUtil.error("OrderProducer.start()", "start rocketmq failed!", e);
        }
    }

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        int index = 0;
        try {
            for (; index < msgs.size(); index++) {
                // 完整消息
                MessageExt msg = msgs.get(index);
                // 消息内容
                String messageBody = new String(msg.getBody(), StandardCharsets.UTF_8);

                LogUtil.info("消费组消息内容:" + messageBody);
            }
        } catch (Exception e) {
            LogUtil.error("OrderConsumer.consumeMessage()", "consume order rocketmq error", e);
        } finally {
            if (index < msgs.size()) {
                // 消费应答
                context.setAckIndex(index + 1);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    @PreDestroy
    public void stop() {
        if (orderMQConsumer != null) {
            orderMQConsumer.shutdown();
        }
    }

}

4. 业务代码

package com.common.instance.demo.service.impl;

import com.alibaba.fastjson.JSON;
import com.common.instance.demo.config.rocketmq.OrderProducer;
import com.common.instance.demo.config.rocketmq.OrderTransactionProducer;
import com.common.instance.demo.dao.WcPendantTabDao;
import com.common.instance.demo.entity.WcPendantTab;
import com.common.instance.demo.service.WcPendantTabService;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.List;
import java.util.UUID;

/**
 *
 * @author tcm
 * @since 2021-01-14 15:02:08
 */
@Service
public class WcPendantTabServiceImpl implements WcPendantTabService {


    @Transactional
    @Override
    public void testTransactionMessage(WcPendantTab tab) {
        // 保存tab
        wcPendantTabDao.insert(tab);
        // 发送事务消息
        orderTransactionProducer.sendTransactionMessage(tab);

        Integer.valueOf("abc");
    }


}

        代码执行结果:事务回滚,事务消息发送后回查事务状态,则回滚事务消息,消费者消费不到消息。

java.lang.NumberFormatException: For input string: "abc"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.valueOf(Integer.java:766)
	at com.common.instance.demo.service.impl.WcPendantTabServiceImpl.testTransactionMessage(WcPendantTabServiceImpl.java:68)
	at com.common.instance.demo.service.impl.WcPendantTabServiceImpl$$FastClassBySpringCGLIB$$9543cf63.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:367)
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
	at com.common.instance.demo.service.impl.WcPendantTabServiceImpl$$EnhancerBySpringCGLIB$$a7272112.testTransactionMessage(<generated>)
	at com.common.instance.demo.controller.WcPendantTabController.testTransactionMessage(WcPendantTabController.java:47)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
	at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
	at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792)
	at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
	at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040)
	at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
	at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:652)
	at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at brave.servlet.TracingFilter.doFilter(TracingFilter.java:68)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at brave.servlet.TracingFilter.doFilter(TracingFilter.java:87)
	at org.springframework.cloud.sleuth.instrument.web.LazyTracingFilter.doFilter(TraceWebServletAutoConfiguration.java:139)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
	at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
	at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
	at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
	at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
	at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
	at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
	at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
	at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
	at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
	at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590)
	at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
	at java.lang.Thread.run(Thread.java:748)

2023-02-09 17:26:16.693 ERROR [,,,] 6316 --- [pool-1-thread-1] LOG_ERROR                                : 127.0.0.1||^orderTransaction rollBack, tabId: fde7c1d4cad049c89612afb6c2c29791

四、参考资料

RocketMQ 发送事务消息原理分析和代码实现_扎瓦江石的博客-CSDN博客

彻底看懂RocketMQ事务实现原理 - 知乎

RocketMQ5.0.0消息发送_爱我所爱0505的博客-CSDN博客

RocketMQ5.0.0消息存储<二>_消息存储流程_rocketmq 消息写入流程_爱我所爱0505的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/383806.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【谷粒学院】微信扫码支付(224~238)

224.项目第十五天内容介绍 225.课程评论实现过程分析 226.课程支付功能需求分析 1、课程支付说明 &#xff08;1&#xff09;课程分为免费课程和付费课程&#xff0c;如果是免费课程可以直接观看&#xff0c;如果是付费观看的课程&#xff0c;用户需下单支付后才可以观看 &am…

CAD轴测图怎么画?

很多新手设计师小伙伴&#xff0c;不知道CAD轴测图怎么画&#xff1f;其实很简单&#xff0c;浩辰CAD中的超级轴测功能&#xff0c;可以方便地将CAD平面图转化为轴侧图&#xff0c;是绘制管线系统图的好帮手。今天就和小编一起来看看在浩辰CAD软件中通过调用超级轴测命令来绘制…

[数据结构]:08-顺序查找(顺序表指针实现形式)(C语言实现)

目录 前言 已完成内容 顺序查找实现 01-开发环境 02-文件布局 03-代码 01-主函数 02-头文件 03-PSeqListFunction.cpp 结语 前言 此专栏包含408考研数据结构全部内容&#xff0c;除其中使用到C引用外&#xff0c;全为C语言代码。使用C引用主要是为了简化指针的使用&a…

webpack生产环境配置

3 webpack生产环境配置 由于笔记文档没有按照之前的md格式书写&#xff0c;所以排版上代码上存在问题&#x1f622;&#x1f622;&#x1f622;&#x1f622; 09 提取css成单独文件 使用下载插件 npm i mini-css-extract-plugin0.9.0 -D webpack配置此时a,b提取成单独文件,并且…

1378:最短路径(shopth)

1378&#xff1a;最短路径(shopth) 时间限制: 1000 ms 内存限制: 65536 KB 【题目描述】 给出一个有向图G(V, E)&#xff0c;和一个源点v0∈V&#xff0c;请写一个程序输出v0和图G中其它顶点的最短路径。只要所有的有向环权值和都是正的&#xff0c;我们就允许图的边有…

有趣的小知识(二)浏览器内的秘密:了解Cookie基础知识

一、简介 Cookie是一种小型的文本文件&#xff0c;由Web服务器发送给Web浏览器&#xff0c;并存储在用户的计算机硬盘上。它通常用于记录用户的偏好、登录状态、购物车信息等&#xff0c;以便在用户下次访问该网站时能够提供更好的用户体验。Cookie通常包含网站的名称、Cookie的…

2023年工业自动化,机器人与控制工程国际会议(IARCE 2023)

2023年工业自动化&#xff0c;机器人与控制工程国际会议&#xff08;IARCE 2023&#xff09; 重要信息 会议网址&#xff1a;www.iarce.org 会议时间&#xff1a;2023年10月27-29日 召开地点&#xff1a;中国成都 截稿时间&#xff1a;2023年9月21日 录用通知&#xff1a;…

[创业之路-57] :商业计划书BP如何书写?总体框架!

引言&#xff1a;BP (Buiness Plan) &#xff0c;即商业计划书&#xff0c;本质上还是一份计划&#xff0c;是一份商业计划&#xff0c;即一种关于如何赚钱的计划&#xff0c;是一份通过组建公司&#xff0c;运营项目&#xff0c;进而赚钱的项目计划。什么是商业&#xff1f;商…

【计算机网络】ISO/OSI参考模型与TCP/IP模型

ISO/OSI参考模型与TCP/IP模型 一、ISO/OSI参考模型&#xff08;七层&#xff09; 一个理论模型&#xff0c;并未商用。 OSI参考模型有七层&#xff0c;自上而下分别为 应用层 &#xff1a; 所有能产生网络流量的应用程序都属于是应用层 典型协议&#xff1a;FTP&#xff08…

分布式锁详解

文章目录分布式锁是什么基于 Redis 实现分布式锁如何基于Redis实现一个简单的分布式锁为什么要给锁设置一个过期时间&#xff1f;如何实现锁的优雅续期&#xff1f;如何实现可重入锁&#xff1f;Redis 如何解决集群情况下分布式锁的可靠性&#xff1f;分布式锁是什么 java单机…

从客户端的角度来看移动端IM即时通讯的消息可靠性和送达机制

如何确保IM 不丢消息是个相对复杂的话题&#xff0c;从客户端发送数据到服务器&#xff0c;再从服务器抵达目标客户端&#xff0c;最终在 UI 成功展示&#xff0c;其间涉及的环节很多&#xff0c;这里只取其中一环「接收端如何确保消息不丢失」来探讨&#xff0c;粗略聊下我接触…

crmeb商城部署(踩坑及解决方法)

源码地址&#xff1a; https://gitee.com/ZhongBangKeJi/CRMEB 原版是PHP版&#xff0c;我也不懂PHP&#xff0c;但看到功能很全&#xff0c;而且有docker-compose的脚本可以部署&#xff0c;并且教程也很完善&#xff0c;就拿来部署试下。 所以也适用于和我一样&#xff0c;…

【NodeJs】NodeJs专题之理解企业BFF框架原理

BFF—服务于前端的后端 一、什么是BFF框架 BFF框架指的是一种逻辑分成,而非一种新技术即 Backend For Frontend&#xff08;服务于前端的后端&#xff09;&#xff0c;也就是服务器设计 API 时会考虑前端的使用&#xff0c;并在服务端直接进行业务逻辑的处理&#xff0c;又称…

每日记录自己的Android项目(一)

Jetpack之Navigation今天新创建一个项目&#xff0c;选的是这个。首先映入眼帘的是一个这样的界面。由ViewBinding绑定好XML布局和根布局和标题栏。这个应该是Navigationg里面的内容还有个字段private AppBarConfiguration appBarConfiguration;Overrideprotected void onCreat…

C# 序列化时“检测到循环引用”错误的彻底解决方案

目录 一&#xff0c;问题表现 二、没有技术含量的解决方案 三、本人彻底的解决方案 简要说明 贴代码 思路解析 思路 一&#xff0c;问题表现 示例代码如下&#xff1a; [Serializable] public class NodeTest {public NodeTest (){new List<NodeTest> ();}p…

SpringBoot之@ConfigurationProperties、@EnableConfigurationProperties

ConfigurationProperties 这个注解不仅可以为yml某个类注入还可以为第三方bean绑定属性 为yml某个类注入 只要将对应的yml类对象声明实体pojo并交给spring容器管理&#xff0c;再在类上使用ConfigurationProperties绑定对应的类名即可 涉及到两个知识点&#xff0c;这个类对…

C语言中的数据储存规则

写在开头 关于复习的相关内容其实从一开始就列出了大纲&#xff0c;但是迟迟没有开始复习&#xff0c;一方面是因为学校学业却是繁忙&#xff0c;另一方面还是内心对旧知识掌握不熟练需要再学一遍的畏惧和懒惰&#xff0c;但如今&#xff0c;复习必须开始了。今天我从C语言的最…

Linux/MacOS 生成双击可执行文件

双击可执行文件包含两种&#xff1a;终端shell脚本 Unix可执行文件 1.终端shell脚本 随意新建一个文件&#xff08;可使用command键N&#xff0c;前提是有已打开的文件&#xff09;&#xff0c;输入shell格式的测试代码&#xff0c;比如&#xff1a; #! /bin/sh echo “h…

双喜临门|炼石荣获2023年中国网络和数据安全高峰论坛双奖项

2023年2月23日-24日&#xff0c;工业和信息化部、四川省人民政府联合主办以“新征程 新思路 高质量发展”为主题的“2023年中国网络和数据安全产业高峰论坛”在成都隆重召开。工信安全中心第一届网络安全高成长性企业“勇攀之星”正式揭晓&#xff0c;炼石以高成长性、高创新性…

【vue】iframe相关问题

一、刷新iframe页面iframe的地址没有改变的话&#xff0c;每打开一次iframe页面&#xff0c;都不会主动更新页面的。以下有几种方法&#xff0c;都可以实现&#xff0c;每打开一次页面&#xff0c;就刷新一下给iframe添加key<template><div id"Iframe">&…