RocketMQ5.0--事务消息

news2025/1/3 2:39:57

RocketMQ5.0–事务消息

一、事务消息概览
RocketMQ事务消息的实现原理基于两阶段提交和定时事务状态回查来决定消息最终是提交还是回滚,消费者可以消费事务提交的消息,如下图所示。事务消息的作用:确保本地业务与消息在一个事务内,本地业务成功执行,消费者才能消费消息。

在这里插入图片描述
事务消息实现流程分3个部分:

  • 应用程序:事务内完成相关业务数据入库后,需要同步调用RocketMQ消息发送接口,发送状态为prepare消息。消息发送成功后,RocketMQ服务器会回调RocketMQ消息发送者的事件监听器,记录消息的本地事务状态,事务状态与本地业务操作同属一个事务,确保消息发送与本地事务的原子性。
  • Broker服务器存储消息:存储器Broker收到类型为prepare的消息时,会首先备份消息的原主题与原消息消费队列到消息扩展属性中,然后修改消息存储主题RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中。
  • Broker定时任务:存储器Broker开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC的消息,向消息发送端(应用程序)发起消息事务状态回查,应用程序根据保存的事务状态反馈消息服务器事务的状态(提交、回滚、未知),如果是提交或回滚,则消息服务器提交或回滚消息,如果是未知,待下一次回查,RocketMQ 允许设置一条消息的回查间隔与回查次数,如果超过回查次数后依然无法获知消息的事务状态,则默认回滚消息。

在这里插入图片描述
二、事务消息实现机制

1. 事务消息发送流程

1):发送事务消息类图

org.apache.rocketmq.client.producer.TransactionMQProducer事务消息生产者类,其UML类图如下。
在这里插入图片描述
org.apache.rocketmq.client.producer.TransactionListener是事务监听器接口,用户必须自定义事务监听器实现类复写两个方法:

  • TransactionListener#executeLocalTransaction():执行本地事务(如:保存事务消息ID),与业务逻辑在一个事务内,供状态回查使用。
  • TransactionListener#checkLocalTransaction():回查本地事务状态,即:事务是否提交或回滚或未知,若是未知,则继续下次查询。

下图是发送事务消息总流程图,下面小节详细介绍消息的发送过程。
在这里插入图片描述
2):生产端发送事务消息
org.apache.rocketmq.client.producer.TransactionMQProducer#sendMessageInTransaction是发送事务消息的入口方法,其调用链如下。
在这里插入图片描述
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction是发送事务消息的核心方法,其代码如下。注意事项:

  • 设置消息为prepare:为后续发送消息前改变消息topic,而原topic和消费队列添加到消息扩展属性中;
  • 设置消息生产组:为后续Broker定时查询事务状态时,从生产组中随机选择一个即可;
  • 调用结束事务endTransaction():调用该方法时,本地业务逻辑及事务可能完成,直接执行结束事务后,就不用后续定期回查事务状态;若本地事务没有完成,才定期回查事务状态。
/**
 * 发送事务消息核心逻辑
 * step1:获取生产者的事务监听器,自定义实现类,接口是{@link TransactionListener}
 * step2:检查消息延迟级别,若是事务消息则忽略延迟参数
 * step3:检查消息是否符合规则,如:topic规范、消息体不能为空、消息体长度默认不能大于4MB
 * step4:设置消息为prepare消息 + 设置消息生产组(目的:监听器查询事务状态时,从生产组中随机选择一个即可)
 * step5:发送事务(与普通消息发送流程相同)并返回结果
 *        {@link DefaultMQProducerImpl#send(Message)}
 * step6:消息发送成功后,获取事务ID和记录本地事务状态(保存事务ID与处理业务处在同一事务中,供服务端事务消息状态回查提供依据)
 * step7:结束事务,事务状态:提交、回滚、不处理,注意:状态UNKNOW时,则业务事务并没有提交,需要状态回查
 *        {@link DefaultMQProducerImpl#endTransaction()}
 * @param msg 事务消息内容
 * @param localTransactionExecuter 本地事务Executer
 * @param arg 传入事务监听器的参数
 * @return
 */
public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    // 获取生产者的事务监听器
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }
 
    // ignore DelayTimeLevel parameter 忽略延迟消息参数
    if (msg.getDelayTimeLevel() != 0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }
 
    // 检查消息是否符合规则
    Validators.checkMessage(msg, this.defaultMQProducer);
 
    SendResult sendResult = null;
    // 设置消息为prepare消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    // 设置消息生产组,目的:监听器查询事务状态时,从生产组中随机选择一个即可
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        // 发送事务(与普通消息发送流程相同)
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }
 
    // 根据消息发送结果,作响应的本地事务状态处理
    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        /*
            消息发送成功,获取事务ID和记录本地事务状态
            注意:保存事务ID与处理业务处在同一事务中,供服务端事务消息状态回查提供依据
         */
        case SEND_OK: {
            try {
                // 从消息发送结果获取事务ID
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
 
                // 记录本地事务状态
                if (null != localTransactionExecuter) {
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) {
                    log.debug("Used new transaction API");
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }
 
                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }
 
    try {
        // 结束事务,事务状态:提交、回滚、不处理,注意:状态UNKNOW时,则业务事务并没有提交,需要状态回查
        this.endTransaction(msg, sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }
 
    TransactionSendResult transactionSendResult = new TransactionSendResult();
    transactionSendResult.setSendStatus(sendResult.getSendStatus());
    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
    transactionSendResult.setMsgId(sendResult.getMsgId());
    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
    transactionSendResult.setTransactionId(sendResult.getTransactionId());
    transactionSendResult.setLocalTransactionState(localTransactionState);
    return transactionSendResult;
}

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl发送消息,若是prepare消息时,添加事务消息标识到系统标识中。

/**
 * 发送消息
 * step1:从MQClientInstance获取broker地址,若没有,则从NameServer获取,再没有则抛出异常;
 * step2:不是批量发送消息,则为消息分配一个全局消息ID;
 * step3:设置消息是否压缩,消息体 > 4KB,则压缩;
 * step4:是否是事务Prepare消息,若是写入事务prepare标签;
 * step5:执行发送前钩子函数;
 * step6:根据不同发送方式,发送消息;
 * step7:执行发送后钩子函数;
 */
private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 
            ......
 
            // 是否是事务Prepare消息
            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (Boolean.parseBoolean(tranMsg)) {
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; // 事务消息标签
            }       
            ......
}

3):Broker存储事务消息
普通消息发送与Broker存储消息参考资料 《RocketMQ5.0.0消息发送》、《RocketMQ5.0.0消息存储<二>_消息存储流程》。org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage该方法是普通消息的处理,解析系统标识中是否有事务消息标签,若是:

  • 判断Broker是否禁止事务消息存储
  • 事务消息时,执行事务消息存储逻辑:异步
    TransactionalMessageService#asyncPrepareMessage、同步TransactionalMessageService#prepareMessage
/**
 * 存储之前,对消息的处理
 * step1:预发送处理,如:检查消息、主题是否符合规范
 * step2:发送消息时,是否指定消费队列,若没有则随机选择
 * step3:消息是否进入重试或延迟队列中(重试次数失败)
 * step4:消息是否是事务消息,若是则存储为prepare消息
 * step5:BrokerConfig#asyncSendEnable是否开启异步存储,默认开启true
 *        (异步存储、同步存储)
 */
public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
    final RemotingCommand request,
    final SendMessageContext sendMessageContext,
    final SendMessageRequestHeader requestHeader,
    final TopicQueueMappingContext mappingContext,
    final SendMessageCallback sendMessageCallback) throws RemotingCommandException {
    
    ......
 
    // Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    // 事务标签
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    boolean sendTransactionPrepareMessage = false;
    if (Boolean.parseBoolean(traFlag)
        && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
        // Broker禁止事务消息存储
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                    + "] sending transaction message is forbidden");
            return response;
        }
        sendTransactionPrepareMessage = true;
    }
 
    long beginTimeMillis = this.brokerController.getMessageStore().now();
 
    // 消息是否异步存储
    if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
        CompletableFuture<PutMessageResult> asyncPutMessageFuture;
        if (sendTransactionPrepareMessage) { // 事务prepare操作
            asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
 
        final int finalQueueIdInt = queueIdInt;
        final MessageExtBrokerInner finalMsgInner = msgInner;
        asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {
            RemotingCommand responseFuture =
                handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext,
                    ctx, finalQueueIdInt, beginTimeMillis, mappingContext);
            if (responseFuture != null) {
                doResponse(ctx, request, responseFuture);
            }
            sendMessageCallback.onComplete(sendMessageContext, response);
        }, this.brokerController.getPutMessageFutureExecutor());
        // Returns null to release the send message thread
        return null;
    }
    // 消息同步存储
    else {
        PutMessageResult putMessageResult = null;
        // 事务消息存储
        if (sendTransactionPrepareMessage) {
            putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
        } else {
            // 同步存储消息
            putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }
        handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext);
        sendMessageCallback.onComplete(sendMessageContext, response);
        return response;
    }
}

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner该方法是事务消息与普通消息的主要区别。将事务消息原主题、消息队列ID放入消息扩展属性中,然后将消息topic变更为RMQ_SYS_TRANS_HALF_TOPIC,消息队列ID为0之后再存储消息,目的是:不会被消费者消费,而是定时任务查询生产者事务状态后,恢复原始消息供消费者消费。如下代码所示。

/**
 * 这里是事务消息与普通消息的主要区别:
 * 将事务消息的原始topic、消息队列ID 存放到消息属性中,
 * 并更改为:topic变更为:RMQ_SYS_TRANS_HALF_TOPIC,消息队列ID:0
 * 所以:不会被消费者消费,而是定时任务查询生产者事务状态后,恢复原始消息供消费者消费
 */
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    // 消息的原始topic、消息队列ID 存放到消息属性中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
 
    // topic变更为:RMQ_SYS_TRANS_HALF_TOPIC,消息队列ID:0
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

2. 生产者提交或回滚事务消息
1):生产者发送提交或回滚事务请求
根据上小节发送事务消息的核心方法DefaultMQProducerImpl#sendMessageInTransaction看出,生产者会调用DefaultMQProducerImpl#endTransaction来结束消息事务。原因是本地业务逻辑及本地事务可能完成,直接执行结束事务后,就不用后续定期回查事务状态;若本地事务没有完成(UNKNOW),才定期回查事务状态。如下代码所示。

发送消息时,默认本地事务状态为未知(UNKNOW),然后发送消息成功后,执行监听器的executeLocalTransaction()方法并返回本地事务状态。当生产者发送请求来结束事务消息时,本地事务状态可能处于提交(COMMIT_MESSAGE)、回滚(ROLLBACK_MESSAGE)、未知(UNKNOW)。

/**
 * 生产者根据本地事务状态触发Broker的消息提交或回滚
 * 注意:状态UNKNOW时,则业务事务并没有提交,需要状态回查;而提交或回滚时,事务消息从prepare阶段到提交或回滚阶段
 *         prepare阶段:topic为:RMQ_SYS_TRANS_HALF_TOPIC
 *         已处理(提交或回滚)的topic:RMQ_SYS_TRANS_OP_HALF_TOPIC
 * Broker端处理请求入口:{@link org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest}
 */
public void endTransaction(
    final Message msg,
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    // 消息ID
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    // 事务ID
    String transactionId = sendResult.getTransactionId();
    // 根据发送结果返回的消息队列ID(修改后的消息队列ID为0)的broker名称
    final String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(defaultMQProducer.queueWithNamespace(sendResult.getMessageQueue()));
    // 获取Broker地址
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName);
    // 组装结束事务请求头
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    // 根据本地事务状态,设置请求头事务状态
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }
 
    // 结束前调用钩子函数
    doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
 
    // 单向发送结束事务请求到目的Broker
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

2):Broker处理生产者请求
org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest是Broker处理结束事务消息请求的核心方法,其代码如下,注意事项:

  • 本地事务状态为UNKNOW时,Broker不做处理,后续定时任务回查事务状态来结束事务;

  • 事务提交或回滚的区别联系:
    区别:提交状态时,还原原始消息并提交到Commitlog文件内存映射,后删除prepare消息;

    回滚状态时,直接删除prepare消息;

    联系:提交、回滚状态时都要删除prepare消息(删除表示该事务消息已处理),删除不是物理删除,而是prepare阶段的topic为RMQ_SYS_TRANS_HALF_TOPIC,修改为RMQ_SYS_TRANS_OP_HALF_TOPIC。

/**
 - 处理生产者结束本地事务请求
 - 生产者结束本地事务总入口:{@link DefaultMQProducerImpl#endTransaction}
 - step1:判断Broker为从服务器,则直接返回
 - step2:提交:获取prepare阶段消息:{@link TransactionalMessageServiceImpl#commitMessage}
 -            检查prepare事务消息:{@link EndTransactionProcessor#checkPrepareMessage}
 -            还原原始消息:{@link EndTransactionProcessor#endMessageTransaction}
 -            存储到Commitlog,供消费者消费:{@link EndTransactionProcessor#endMessageTransaction}
 -            删除prepare消息(修改topic):{@link TransactionalMessageServiceImpl#deletePrepareMessage}
 -       回滚:获取prepare阶段消息:{@link TransactionalMessageServiceImpl#commitMessage}
 -            检查prepare事务消息:{@link EndTransactionProcessor#checkPrepareMessage}
 -            删除prepare消息(修改topic):{@link TransactionalMessageServiceImpl#deletePrepareMessage}
 - 注意:
 -  a.提交或回滚的区别:是否还原原始消息,并存储到Commitlog文件 供消费者消费
 -  b.prepare阶段的topic:RMQ_SYS_TRANS_HALF_TOPIC
 -    已处理(提交或回滚)的topic:RMQ_SYS_TRANS_OP_HALF_TOPIC
 */
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
    RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final EndTransactionRequestHeader requestHeader =
        (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
    LOGGER.debug("Transaction request:{}", requestHeader);
    // Broker为从服务器,则直接返回
    if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
        response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
        LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
        return response;
    }
 
    // 事务检查,fromTransactionCheck默认false
    if (requestHeader.getFromTransactionCheck()) {
        switch (requestHeader.getCommitOrRollback()) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                return null;
            }
 
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
 
                break;
            }
 
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                break;
            }
            default:
                return null;
        }
    } else {
        // 事务类型(依据是生产者本地事务状态)
        switch (requestHeader.getCommitOrRollback()) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                return null;
            }
 
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                break;
            }
 
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                break;
            }
            default:
                return null;
        }
    }
    OperationResult result = new OperationResult();
    // 生产者本地事务提交
    if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
        // 根据偏移量,获取事务消息(prepare阶段)
        result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
        // 查找prepare事务消息成功
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            // 检查prepare事务消息
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                // 还原原始消息(prepare)
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                // 原始消息,存储到Commitlog文件,供消费者消费
                RemotingCommand sendResult = sendFinalMessage(msgInner);
                // 存储成功后,删除prepare阶段的消息
                // prepare阶段的topic:RMQ_SYS_TRANS_HALF_TOPIC,改为 已处理(提交或回滚)的topic:RMQ_SYS_TRANS_OP_HALF_TOPIC
                if (sendResult.getCode() == ResponseCode.SUCCESS) {
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return sendResult;
            }
            return res;
        }
    }
    // 生产者本地事务回滚
    else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
        // 根据偏移量,获取事务消息(prepare阶段)
        result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            // 删除prepare阶段的消息
            // prepare阶段的topic:RMQ_SYS_TRANS_HALF_TOPIC,改为 已处理(提交或回滚)的topic:RMQ_SYS_TRANS_OP_HALF_TOPIC
            if (res.getCode() == ResponseCode.SUCCESS) {
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return res;
        }
    }
    response.setCode(result.getResponseCode());
    response.setRemark(result.getResponseRemark());
    return response;
}

3. Broker回查事务状态
根据上两小节介绍,本地事务状态为UNKNOW时,生产者结束本地事务请求时,Broker不做任何处理,而是Broker定时任务回查事务状态。TransactionalMessageCheckService线程默认1min定时周期检测topic为RMQ_SYS_TRANS_HALF_TOPIC消息,回查事务状态。broker.conf配置transactionChecklnterval来改变默认值,单位为毫秒。如下所示是Broker事务消息UML图。
在这里插入图片描述
下图所示是事务消息回查状态流程图。
在这里插入图片描述
1):Broker执行回查任务
org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#run执行检查任务,每1min执行周期,调用onWaitEnd()方法。如下代码所示,其中两个重要参数:

  • transactionTimeOut:事务过期时间,默认6s。只有当前时间 > 存储时间 + 事务过期时间 时,才执行事务状态回查,否则待下次查询。
  • transactionCheckMax:回查状态最大次数,默认15。大于该值则不会继续回查,丢弃(相当于回滚)。
/**
 * 默认1min检查一次事务状态,事务到期后,调用生产者的事务状态
 */
@Override
public void run() {
    log.info("Start transaction check service thread!");
    while (!this.isStopped()) {
        // 检查时间间隔,默认1min,通过broker.conf配置transactionCheckInterval
        long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
        // 等待1min,执行onWaitEnd()方法
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}
 
@Override
protected void onWaitEnd() {
    // 事务过期时间(默认6s):当前时间 > 存储时间 + 事务过期时间 时,才执行事务状态回查
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    // 事务回查最大次数(默认15),大于该值则不会继续回查,丢弃(相当于回滚)
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    // 事务状态回查实现逻辑
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check是状态回查的核心逻辑方法,如下代码所示。注意事项:

  • fillOpRemoveMap()方法目的:
    根据当前的处理进度(opOffset)依次从已处理队列拉取32条消息,方便判断当前处理的消息是否已经处理过,如果处理过则无须再次发送事务状态回查请求,避免重复发送事务回查请求(异步回查状态)。

  • 判断是否需要发送事务状态回查isNeedCheck:
    a. 已处理事务消息为空 + prepare消息已存储的时间大于立即检查时间;
    b. 已处理事务消息不为空 + 已处理事务消息(32条)最后一个消息的已存储的时间大于事务超时时间。

  • putBackHalfMsgQueue()方法目的:发送事务状态回查前,再次将消息存储到prepare阶段的Commitlog文件。原因是:
    a. 发送事务状态回查是异步请求,无法立刻知道处理结果;
    b. MQ是顺序写,已经修改检查次数的当前消息,修改已存储的消息,性能无法保证。

  • MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS:消息事务消息回查请求的最晚时间,单位为秒,指的是程序发送事务消息时,可以用户指定该事务消息的有效时间,只有在这个时间内收到回查消息才有效,默认为null。

/**
 * 事务状态回查实现逻辑
 * step1:根据prepare阶段的事务消息topic,获取消息队列集合并遍历
 * step2:根据消息队列messageQueue,获取已处理队列opQueue
 * step3:根据当前处理进度 从已处理的消费队列 拉取32条消息
 *        {@link TransactionalMessageServiceImpl#fillOpRemoveMap}
 * step4:判定一个周期是否超出执行最长总时间、消息已被处理
 * step5:获取prepare阶段的待处理消息
 *        needDiscard()方法:判定prepare事务消息回查次数是否大于最大回查次数,超出则丢弃(事务回滚)
 *        needSkip()方法:判定prepare事务消息已存储的时间是否超出文件过期时间
 * step6:判定prepare消息是否在用户指定有效时间内或是否超出事务超时时间
 * step7:判断是否需要发送事务状态回查
 *        a.已处理事务消息为空 + prepare消息已存储的时间大于立即检查时间
 *        b.已处理事务消息不为空 + 已处理事务消息(32条)最后一个消息的已存储的时间大于事务超时时间
 * step7:需要回查,先存储新的消息到prepare,再回查状态
 *        {@link AbstractTransactionalMessageCheckListener#resolveHalfMsg}
 * step8:不需要回查,继续step3
 * step9:更新prepare阶段事务消息队列的回查进度
 *        更新已处理消息队列的进度
 */
@Override
public void check(long transactionTimeout, int transactionCheckMax,
    AbstractTransactionalMessageCheckListener listener) {
    try {
        // prepare阶段的事务消息topic
        String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
        // 获取对应的消费队列集合
        Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
        if (msgQueues == null || msgQueues.size() == 0) {
            log.warn("The queue of topic is empty :" + topic);
            return;
        }
        log.debug("Check topic={}, queues={}", topic, msgQueues);
        for (MessageQueue messageQueue : msgQueues) {
            long startTime = System.currentTimeMillis();
            // 根据prepare阶段消费队列获取已处理的消费队列
            MessageQueue opQueue = getOpQueue(messageQueue);
            // prepare阶段消费队列的处理偏移量
            long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
            // 已处理的消费队列的处理偏移量
            long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
            log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
            if (halfOffset < 0 || opOffset < 0) {
                log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                    halfOffset, opOffset);
                continue;
            }
 
            // 已处理消息偏移量集合
            List<Long/* opOffset */> doneOpOffset = new ArrayList<>();
            // halfOffset 与 opOffset映射关系
            HashMap<Long/* halfOffset */, Long/* opOffset */> removeMap = new HashMap<>();
            // 根据opOffset 从已处理的消费队列 往后拉取32条消息
            PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
            if (null == pullResult) {
                log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                    messageQueue, halfOffset, opOffset);
                continue;
            }
            // single thread
            int getMessageNullCount = 1; // 空消息次数
            long newOffset = halfOffset; // 当前处理prepare阶段消费队列的最新进度
            long i = halfOffset;         // 当前处理prepare阶段消费队列的队列偏移量
            while (true) {
                // 周期内的执行最长总时间(默认60s),超出时则跳出,待一下周期执行
                if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                    log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                    break;
                }
                // 消息已被处理
                if (removeMap.containsKey(i)) {
                    log.debug("Half offset {} has been committed/rolled back", i);
                    Long removedOpOffset = removeMap.remove(i);
                    doneOpOffset.add(removedOpOffset);
                }
                // 消息未被处理
                else {
                    // 获取prepare阶段的待处理消息
                    GetResult getResult = getHalfMsg(messageQueue, i);
                    MessageExt msgExt = getResult.getMsg();
                    // 消息为null
                    if (msgExt == null) {
                        // 空消息次数 > 最大空消息重试次数
                        if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                            break;
                        }
                        // 没有新的消息,导致消息为null
                        if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                            log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                messageQueue, getMessageNullCount, getResult.getPullResult());
                            break;
                        }
                        // 其他异常时,设置参数重新拉取
                        else {
                            log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                i, messageQueue, getMessageNullCount, getResult.getPullResult());
                            i = getResult.getPullResult().getNextBeginOffset();
                            newOffset = i;
                            continue;
                        }
                    }
 
                    // needDiscard()方法:判定prepare事务消息回查次数是否大于最大回查次数,超出则丢弃(事务回滚)
                    // needSkip()方法:判定prepare事务消息已存储的时间是否超出文件过期时间
                    if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                        listener.resolveDiscardMsg(msgExt);
                        newOffset = i + 1;
                        i++;
                        continue;
                    }
                    // 存储时间 > 当前开始检测时间
                    if (msgExt.getStoreTimestamp() >= startTime) {
                        log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                            new Date(msgExt.getStoreTimestamp()));
                        break;
                    }
 
                    // prepare事务消息已存储的时间
                    long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                    // 立即检查事务状态的时间,默认值MQ内设置事务超时时间transactionTimeout,则回查事务状态
                    long checkImmunityTime = transactionTimeout;
                    // 用户指定回查事务状态的最晚时间,默认为null(超出时,事务回滚,即:查询状态必须在最晚时间内有效)
                    String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                    if (null != checkImmunityTimeStr) { // 用户设置
                        // 获取回查事务状态时间
                        checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                        // 已存储的时间 在 立即回查事务状态时间 范围内,则说明用户指定查询状态在最晚时间内
                        if (valueOfCurrentMinusBorn < checkImmunityTime) {
                            // 发送事务状态回查
                            if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                newOffset = i + 1;
                                i++;
                                continue;
                            }
                        }
                    }
                    // 用户未设置,采用MQ内置transactionTimeout事务超时时间
                    else {
                        // 已存储的时间 不到事务超时时间,则下一次查询
                        if (0 <= valueOfCurrentMinusBorn && valueOfCurrentMinusBorn < checkImmunityTime) {
                            log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                            break;
                        }
                    }
                    // 获取已处理的32条消息
                    List<MessageExt> opMsg = pullResult.getMsgFoundList();
 
                    /*
                     * 判断是否需要发送事务状态回查
                     * a.已处理事务消息为空 + prepare消息已存储的时间大于立即检查时间
                     * b.已处理事务消息不为空 + 已处理事务消息(32条)最后一个消息的已存储的时间大于事务超时时间
                     */
                    boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime
                        || opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout
                        || valueOfCurrentMinusBorn <= -1;
 
                    // 需要回查
                    if (isNeedCheck) {
                        // 发送事务状态回查前,再次将消息存储到prepare阶段的Commitlog文件
                        if (!putBackHalfMsgQueue(msgExt, i)) { // 存储失败,则继续
                            continue;
                        }
                        // 存储成功,异步发送回查事务状态请求
                        listener.resolveHalfMsg(msgExt);
                    }
                    // 不需要回查
                    else {
                        // 继续拉取已处理的消息
                        pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                        log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                            messageQueue, pullResult);
                        continue;
                    }
                }
                newOffset = i + 1;
                i++;
            }
 
            // 更新prepare阶段事务消息队列的回查进度
            if (newOffset != halfOffset) {
                transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
            }
 
            // 更新已处理消息队列的进度
            long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
            if (newOpOffset != opOffset) {
                transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
            }
        }
    } catch (Throwable e) {
        log.error("Check error", e);
    }
 
}

代码看出(下小节讲述),使用线程池来异步发送回查消息,为了回查消息进度保存的简化,只要发送了回查消息,当前回查进度会向前推动。如果回查失败,回查前重新增加的prepare消息将可以再次发送回查消息;那如果回查消息发送成功,会不会下一次又重复发送回查消息呢?根据已处理队列中的消息来判断是否重复,如果回查消息发送成功并且Broker完成提交或回滚操作,该消息会发送到已处理队列中(删除的prepare消息),然后首先会通过fillOpRemoveMap()根据处理进度获取一批已处理的消息,来与消息判断是否重复。由于fillOpRemoveMap()一次拉取32条消息, 那又如何保证一定能拉取到与当前消息的处理记录呢?其实就是isNeedCheck为false时,如果此批消息最后一条未超过事务延迟消息,则继续拉取更多消息进行判断。

2):线程池执行异步回查
org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener#resolveHalfMsg是执行事务状态回查入口方法,回查任务添加到线程池异步执行回查状态。sendCheckMessage()发送事务回查状态的核心方法,如下代码。

// 异步发送回查事务状态请求
public void resolveHalfMsg(final MessageExt msgExt) {
    if (executorService != null) {
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    // prepare事务消息回查事务状态
                    sendCheckMessage(msgExt);
                } catch (Exception e) {
                    LOGGER.error("Send check message error!", e);
                }
            }
        });
    } else {
        LOGGER.error("TransactionalMessageCheckListener not init");
    }
}
 
 
/**
 * prepare事务消息回查事务状态
 * 生产者接受请求的总入口:{@link ClientRemotingProcessor#processRequest}
 * @param msgExt prepare事务消息
 */
public void sendCheckMessage(MessageExt msgExt) throws Exception {
    // 组装事务状态回查请求头
    CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
    checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
    checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
    checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
    checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
    // prepare事务消息还原为原始消息
    msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
    msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
    msgExt.setStoreSize(0);
    // 获取生产组
    String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
    // 从生产组内获取一个生产者通道
    Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
    if (channel != null) {
        // 发送回查请求
        brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
    } else {
        LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
    }
}

三、事务消息实例

1. 事务消息监听类

实现org.apache.rocketmq.client.producer.TransactionListener,该类有两个方法:

  • executeLocalTransaction():保存本地事务中间表,用于Broker回查事务状态。
  • checkLocalTransaction():Broker定时回查事务状态,根据事务状态提交或回滚事务消息。
package com.common.instance.demo.config.rocketmq;
 
import com.common.instance.demo.entity.TMessageTransaction;
import com.common.instance.demo.service.TMessageTransactionService;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
 
/**
 * @description 订单事务消息监听器实现类
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 16:44
 **/
@Component
public class OrderTransactionListenerImpl implements TransactionListener {
 
    @Resource
    private TMessageTransactionService tMessageTransactionService;
 
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
        // 组装事务
        TMessageTransaction tMessageTransaction = packageTMessageTransaction(message);
 
        // 保存事务中间表
        tMessageTransactionService.insert(tMessageTransaction);
 
        // 推荐返回UNKNOW状态,待事务状态回查
        return LocalTransactionState.UNKNOW;
    }
 
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        // 获取用户属性tabId
        String tabId = messageExt.getUserProperty("tabId");
 
        // 查询事务消息
        List<TMessageTransaction> tMessageTransactions = tMessageTransactionService.queryByTabId(tabId);
 
        if (!tMessageTransactions.isEmpty() && tMessageTransactions.size() <= 6) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
 
        LogUtil.error("orderTransaction rollBack, tabId: " + tabId);
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
 
    // 组装事务
    private TMessageTransaction packageTMessageTransaction(Message message) {
        TMessageTransaction tMessageTransaction = new TMessageTransaction();
 
        // 获取用户属性tabId
        String tabId = message.getUserProperty("tabId");
        // 事务ID
        String transactionId = message.getTransactionId();
        tMessageTransaction.setTabId(tabId);
        tMessageTransaction.setTransactionId(transactionId);
        tMessageTransaction.setCreateBy("auto");
        tMessageTransaction.setCreateTime(new Date());
 
        return tMessageTransaction;
    }
 
}

2. 事务消息生产者

package com.common.instance.demo.config.rocketmq;
 
import com.alibaba.fastjson.JSON;
import com.common.instance.demo.entity.WcPendantTab;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
 
/**
 * @description 订单事务消息生产者
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 16:54
 **/
@Component
public class OrderTransactionProducer {
 
    @Resource
    private OrderProducerProperties orderProducerProperties;
 
    @Resource
    private OrderTransactionListenerImpl orderTransactionListener;
 
    private TransactionMQProducer orderTransactionMQProducer;
 
    @PostConstruct
    public void start() {
        try {
            LogUtil.info("start rocketmq: order transactionProducer");
            orderTransactionMQProducer = new TransactionMQProducer(orderProducerProperties.getProducerGroup());
            orderTransactionMQProducer.setNamesrvAddr(orderProducerProperties.getNameSrcAddr());
            orderTransactionMQProducer.setSendMsgTimeout(orderProducerProperties.getSendMsgTimeout());
            // 注册事务监听器
            orderTransactionMQProducer.setTransactionListener(orderTransactionListener);
            orderTransactionMQProducer.start();
        } catch (MQClientException e) {
            LogUtil.error("OrderTransactionProducer.start()", "start rocketmq failed!", e);
        }
    }
 
    public void sendTransactionMessage(WcPendantTab data) {
        sendTransactionMessage(data, orderProducerProperties.getTopic(), orderProducerProperties.getTag(), null);
    }
 
    public void sendTransactionMessage(WcPendantTab data, String topic, String tags, String keys) {
        try {
            // 消息内容
            byte[] msgBody = JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8);
            // 消息对象
            Message message = new Message(topic, tags, keys, msgBody);
            message.putUserProperty("tabId", data.getTabId());
 
            // 发送事务消息
            orderTransactionMQProducer.sendMessageInTransaction(message, null);
        } catch (Exception e) {
            LogUtil.error("OrderTransactionProducer.sendMessage()","send order rocketmq error", e);
        }
    }
 
    @PreDestroy
    public void stop() {
        if (orderTransactionMQProducer != null) {
            orderTransactionMQProducer.shutdown();
        }
    }
 
}

3. 事务消息消费者

package com.common.instance.demo.config.rocketmq;
 
import com.log.util.LogUtil;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;
 
/**
 * @description 订单消费者
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 14:29
 **/
@Component
public class OrderConsumer implements MessageListenerConcurrently {
 
    @Resource
    private OrderConsumerProperties orderConsumerProperties;
 
    private DefaultMQPushConsumer orderMQConsumer;
 
    @PostConstruct
    public void start() {
        try {
            LogUtil.info("start rocketmq: order consumer");
            orderMQConsumer = new DefaultMQPushConsumer(orderConsumerProperties.getConsumerGroup());
            orderMQConsumer.setNamesrvAddr(orderConsumerProperties.getNameSrcAddr());
            orderMQConsumer.subscribe(orderConsumerProperties.getTopic(), orderConsumerProperties.getTag() == null ? "*":orderConsumerProperties.getTag());
            orderMQConsumer.setConsumeFromWhere(ConsumeFromWhere.valueOf(orderConsumerProperties.getConsumeFromWhere()));
            orderMQConsumer.registerMessageListener(this); // 注册监听器
            orderMQConsumer.start();
        } catch (MQClientException e) {
            LogUtil.error("OrderProducer.start()", "start rocketmq failed!", e);
        }
    }
 
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        int index = 0;
        try {
            for (; index < msgs.size(); index++) {
                // 完整消息
                MessageExt msg = msgs.get(index);
                // 消息内容
                String messageBody = new String(msg.getBody(), StandardCharsets.UTF_8);
 
                LogUtil.info("消费组消息内容:" + messageBody);
            }
        } catch (Exception e) {
            LogUtil.error("OrderConsumer.consumeMessage()", "consume order rocketmq error", e);
        } finally {
            if (index < msgs.size()) {
                // 消费应答
                context.setAckIndex(index + 1);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
 
    @PreDestroy
    public void stop() {
        if (orderMQConsumer != null) {
            orderMQConsumer.shutdown();
        }
    }
 
}

4. 业务代码

package com.common.instance.demo.service.impl;
 
import com.alibaba.fastjson.JSON;
import com.common.instance.demo.config.rocketmq.OrderProducer;
import com.common.instance.demo.config.rocketmq.OrderTransactionProducer;
import com.common.instance.demo.dao.WcPendantTabDao;
import com.common.instance.demo.entity.WcPendantTab;
import com.common.instance.demo.service.WcPendantTabService;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
import javax.annotation.Resource;
import java.util.List;
import java.util.UUID;
 
/**
 *
 * @author tcm
 * @since 2021-01-14 15:02:08
 */
@Service
public class WcPendantTabServiceImpl implements WcPendantTabService {
 
 
    @Transactional
    @Override
    public void testTransactionMessage(WcPendantTab tab) {
        // 保存tab
        wcPendantTabDao.insert(tab);
        // 发送事务消息
        orderTransactionProducer.sendTransactionMessage(tab);
 
        Integer.valueOf("abc");
    }
 
 
}

代码执行结果:事务回滚,事务消息发送后回查事务状态,则回滚事务消息,消费者消费不到消息。

java.lang.NumberFormatException: For input string: "abc"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.valueOf(Integer.java:766)
	at com.common.instance.demo.service.impl.WcPendantTabServiceImpl.testTransactionMessage(WcPendantTabServiceImpl.java:68)
	at com.common.instance.demo.service.impl.WcPendantTabServiceImpl$$FastClassBySpringCGLIB$$9543cf63.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:367)
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
	at com.common.instance.demo.service.impl.WcPendantTabServiceImpl$$EnhancerBySpringCGLIB$$a7272112.testTransactionMessage(<generated>)
	at com.common.instance.demo.controller.WcPendantTabController.testTransactionMessage(WcPendantTabController.java:47)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
	at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
	at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792)
	at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
	at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040)
	at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
	at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:652)
	at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at brave.servlet.TracingFilter.doFilter(TracingFilter.java:68)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at brave.servlet.TracingFilter.doFilter(TracingFilter.java:87)
	at org.springframework.cloud.sleuth.instrument.web.LazyTracingFilter.doFilter(TraceWebServletAutoConfiguration.java:139)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
	at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
	at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
	at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
	at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
	at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
	at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
	at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
	at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
	at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
	at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590)
	at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
	at java.lang.Thread.run(Thread.java:748)
 
2023-02-09 17:26:16.693 ERROR [,,,] 6316 --- [pool-1-thread-1] LOG_ERROR                                : 127.0.0.1||^orderTransaction rollBack, tabId: fde7c1d4cad049c89612afb6c2c29791

四、参考资料
https://blog.csdn.net/weixin_38305440/article/details/107384969
https://zhuanlan.zhihu.com/p/249233648

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/726963.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

成功解决:java file outside of source root

前言 我复制一个很小项目的代码&#xff0c;然后重新命名后。用IDEA打开&#xff0c;发现.java文件的左下方有个橘色的标志。 1、问题文件 这里显示 Java file outside of source root。 查阅资料发现&#xff1a;这个问题是指Java文件不在源代码根目录之内。这可能会导致…

使用pytest命令行实现环境切换

目录 前言 pytest_addoption(parser, pluginmanager) 在conftest.py文件中定义命令行参数 获取命令行参数 设置不同环境的全局变量 定义测试类及测试方法 测试验证 前言 在自动化测试过程中经常需要在不同的环境下进行测试验证&#xff0c;所以写自动化测试代码时需要考…

Android12之IBinder中[[clang::lto_visibility_public]]作用(一百六十)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

MySQL自治平台建设的内核原理及实践(上)

总第565篇 2023年 第017篇 本文整理自美团技术沙龙第75期的主题分享《美团数据库攻防演练建设实践》&#xff0c;系超大规模数据库集群保稳系列&#xff08;内含4个议题的PPT及视频&#xff09;的第4篇文章。 本文作者在演讲后根据同学们的反馈&#xff0c;补充了很多技术细节&…

如何绘制「UML类图」

一、UML类图简介 类图以反映类的结构(属性、操作)以及类之间的关系为主要目的&#xff0c;描述了软件系统的结构&#xff0c;是一种静态建模方法。类图用来描述系统中有意义的概念&#xff0c;包括具体的概念、抽象的概念、实现方面的概念等&#xff0c;是对现实世界中事物的抽…

设计模式--------结构型模式

结构型模式 结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式&#xff0c;前者采用继承机制来组织接口和类&#xff0c;后者釆用组合或聚合来组合对象。 由于组合关系或聚合关系比继承关系耦合度低&#xff0c;满足“合成复用原则”…

web 页面布局:(三)position 坐标系布局

web 页面布局&#xff1a;&#xff08;三&#xff09;position 坐标系布局 页面坐标系position 设置fixedrelativeabsolutesticky 应用场景 页面坐标系 因为所有文本内容&#xff0c;是从左向右读&#xff0c;自上而下读&#xff0c;和书写习惯&#xff0c;读书习惯一致&#…

惊呆了!这个银行管理技巧也太厉害了吧

在金融行业&#xff0c;蓄电池监控是一项至关重要的任务。随着金融机构的数字化转型和依赖电力的增加&#xff0c;蓄电池成为保障金融系统正常运行的关键组成部分。 因此&#xff0c;对蓄电池的状态进行监控和维护&#xff0c;确保其高效可靠地工作&#xff0c;对金融行业的稳定…

深度学习06-pytorch从入门到精通

文章目录 概述环境准备安装cuda和cudnn安装pytorch 基础张量定义numpy转换数学函数随机数计算函数矩阵处理函数 自动梯度案例计算图 torchvision模块TransformsDataSetDataLoader自定义数据集 nn模块nn.ModuleCNN图像处理层nn.Conv2dnn.AvgPool2d和nn.MaxPool2dnn.Linearnn.Bat…

TransFuse

方法 Transformer分支 将不同尺度的特征图 t 0 、 t 1 和 t 2 t^0、t^1和t^2 t0、t1和t2保存起来&#xff0c;之后与对应的CNN分支的特征图融合。 CNN分支 以基于ResNet的模型为例&#xff0c;模型通常有五个块&#xff0c;每个块对特征图进行两倍下采样。我们获取第 4( g 0…

vue父组件调用子组件的Form表单校验

需求1&#xff1a;父组件调用子组件的表单验证方法&#xff0c;验证成功&#xff0c;继续进行接下来的操作&#xff0c;验证失败&#xff0c;提示用户并返回。 需求2&#xff1a;父组件校验多个子组件的表单验证方法&#xff0c;验证成功&#xff0c;继续进行接下来的操作&…

cookie/session/token(持续更新)

1.cookie 1.1概念 cookie是服务器产生的保存在客户端的一小段文本信息,格式是字典形式,键值对形式 cookie有两类: 1.会话级cookie:保存在内存,随浏览器关闭自动消息 2.持久化cookie:保存在硬盘,浏览器关闭不会直接消失,生命周期取决于失效时间 1.2如何查看cookie以及格式…

vue3+vite+element pro + pnpm 创建的monorepo项目

vue3+vite+element pro + pnpm 创建的monorepo项目 欢迎使用河码桌面技术说明界面欢迎使用河码桌面 欢迎使用河码桌面,河码桌面是一个基于vue3+vite+element pro + pnpm 创建的monorepo项目,项目采用的是类操作系统的web界面,操作起来简单又方便,符合用户习惯,又没有操作…

GlusterFs部署及使用

目录 一、部署和使用 1. clusterfs服务器初始化 2. 部署glusterfs 3. 创建volume 4. 客户端挂载和使用 5. k8s使用glusterfs作为后端存储&#xff08;静态供给glusterfs存储&#xff09; 5.1 集群所有节点安装glusterfs客户端 5.2 k8s创建资源对象使用glusterfs存储 5.…

Spring Boot 中的声明式事务是什么,如何使用

Spring Boot 中的声明式事务是什么&#xff0c;如何使用 简介 在数据库操作中&#xff0c;事务是一组操作的集合&#xff0c;这些操作在一个逻辑单元内执行&#xff0c;如果其中任何一个操作失败&#xff0c;则整个事务都会被回滚&#xff0c;使得数据库回到事务执行之前的状…

不看产品、不重销售,聊聊Forrester推荐的CLG战略

近日&#xff0c;Forrester的分析师们发现&#xff0c;企业&#xff08;尤其是B2B企业&#xff09;对于CLG战略的关注空前增加。而且&#xff0c;越来越多SaaS企业选择CLG战略来取代以前的PLG&#xff08;产品导向型增长&#xff09;或SLG&#xff08;销售导向型增长&#xff0…

SpringSecurity之基本原理

目录 核心过滤器 FilterSecurityInterceptor ExceptionTranslationFilter UsernamePasswordAuthenticationFilter BasicAuthenticationFilter 过滤器加载过程 重要接口 UserDetailsService接口 PasswordEncoder接口 springSecurity本质上就是一个过滤器链&#xff0c;…

RabbitMQ系列(16)--用SpringBoot整合RabbitMQ

1、新建项目 2、新建Spring项目 3、选择合适的SpringBoot版本&#xff0c;依赖在这里可以先不选&#xff0c;可以在项目生成后在pom.xml文件里批量的导入依赖 4、设置项目的Maven (1)打开设置 (2)在搜索框里输入Maven搜索Maven设置&#xff0c;然后根据自己的实际情况设置Mave…

优化篇--Vue模版语法做动态渲染

Vue模版语法做动态渲染&#xff0c;随便记录一下 <a-row v-for"(row, rowIndex) in footerConfig" :key"rowIndex"><a-col span"4" v-for"(col, colIndex) in row" :key"colIndex"><span class"total-l…

新来的资深java不会lambda表达式,中级开发都笑拉了--lambda流这么难吗,教你怎么玩早学早吃香

因为一个人就全体 虽然没提名字但是说的意思和报身份证没区别, 我自己看着都尴尬… 标题就是事情的经过,和同事的聊天记录在最下面 前言 Lambda表达式是优化代码的工具&#xff0c;使得代码更简洁、易读&#xff0c;符合现代开发的趋势,以及推动函数式编程在Java开发中的流行…