RocketMQ事务消息原理简析

news2025/1/21 15:38:20

零、业务场景

在项目中,经常遇到这样一个场景,需要保证数据持久化和消息发送要么同时成功,要么同时失败。比如当用户在交易系统下了一个订单,购物车需要消费订单消息清除加购数据、积分系统需要变更用户积分、短信平台需要给买家发送提醒等,交易系统要将订单落入DB和发送订单消息保证一致,不能本地事务回滚,订单没有生成但是发送了创建订单消息,下游系统产生脏数据,也不能订单已经创建,但是下游系统没有感知继而无法履约,影响用户体验。
如果让我们自己实现的话,当然也是有办法的。比如在业务数据库中建立一张消息表用于存储消息,将业务数据和消息数据放在同一个事务中进行存储,就可以利用数据库事务保证同时原子性。后续可以定时扫描消息表,将消息数据再发送出去。
当然也可以用现成的解决方案,RocketMQ从4.3.0版本开始,支持事务消息。我们只需要编写对应的本地事务执行方法executeLocalTransaction和本地事务执行结果检查方法checkLocalTransaction,RocketMQ会自动调用本地事务执行。如果本地事务执行成功,下游才能消费到消息,如果本地事务执行失败,下游是无法感知到这条消息的

一、使用方法

使用RocketMQ发送事务消息,只有消息发送和普通消息发送有所区别。参见官方示例:
// TransactionProducer.java
// 需要自定义一个TransactionListener用于执行事务executeLocalTransaction和事务执行结果回查checkLocalTransaction 代码在下面
TransactionListener transactionListener = new TransactionListenerImpl();
// 事务消息发送producer
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// 创建一个线程池 用于Broker回查本地事务执行状态 如果这里没有创建,RocketMQ会自动创建一个线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
  Thread thread = new Thread(r);
  thread.setName("client-transaction-msg-check-thread");
  return thread;
});

producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();

String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < MESSAGE_COUNT; i++) {
  try {
    Message msg =
      new Message(TOPIC, tags[i % tags.length], "KEY" + i,
                  ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 发送事务消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    System.out.printf("%s%n", sendResult);

    Thread.sleep(10);
  } catch (MQClientException | UnsupportedEncodingException e) {
    e.printStackTrace();
  }
}
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * 执行本地事务
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 本地事务执行状态回查
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
本地事务的执行状态,有三种结果:
  • LocalTransactionState.COMMIT_MESSAGE:事务执行成功,Broker会处理消息供下游消费
  • LocalTransactionState.ROLLBACK_MESSAGE:事务被回滚,Broker会删除消息,下游感知不到消息
  • LocalTransactionState.UNKNOW:事务的执行结果未知,比如事务还在执行中,稍后Broker会回重复回查,直到超过最大时间或者最大次数

二、原理解析

0、整体流程

在这里插入图片描述

  1. Producer发送事务消息
  2. Broker端SendMessageProcessor收到消息后,判断如果是一条事务消息,会将消息原来的topic和队列id存储到消息拓展中,设置新的topic为RMQ_SYS_TRANS_HALF_TOPIC然后 进行存储,然后通知Producer
  3. Producer收到Broker消息发送成功后,开始执行本地事务
  4. 本地事务执行完毕,Producer将事务执行状态通知Broker
  5. Broker端EndTransactionProcessor收到事务执行状态,从RMQ_SYS_TRANS_HALF_TOPIC中取出消息。如果事务执行成功,则从消息拓展中取出原本的topic和队列id,存储到真实的topic和队列id中,存储到RMQ_SYS_TRANS_OP_HALF_TOPIC主题中;如果是事务回滚,只把消息存储到RMQ_SYS_TRANS_OP_HALF_TOPIC主题中
  6. 如果Broker没有收到Producer事务执行状态的通知,Broker端TransactionalMessageCheckService会主动定时从RMQ_SYS_TRANS_HALF_TOPIC中捞取消息,判断是否有需要回查的消息
  7. 如果有需要回查的消息,Broker端TransactionalMessageCheckService会向Producer回查事务状态
  8. Producer执行TransactionListener的checkLocalTransaction方法,查询事务执行状态
  9. Producer查询本地事务状态之后再执行上述第4步和第5步

1、Producer发送消息

// 编写TransactionListener实现类用于执行本地事务和本地事务回查
TransactionListener transactionListener = new TransactionListenerImpl();
// 发送事务消息专用的TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);

producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// 新建一个线程池用于异步执行从Broker过来的事务回查 如果这里不新建 Broker也会自动创建一个
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
    Thread thread = new Thread(r);
    thread.setName("client-transaction-msg-check-thread");
    return thread;
});

producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();

String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < MESSAGE_COUNT; i++) {
    try {
        Message msg =
            new Message(TOPIC, tags[i % tags.length], "KEY" + i,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 发送事务消息
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.printf("%s%n", sendResult);

        Thread.sleep(10);
    } catch (MQClientException | UnsupportedEncodingException e) {
        e.printStackTrace();
    }
}
// 发送事务消息
public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    // 校验是否设置TransactionListener 发送事务消息必须要有TransactionListener
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }

    // ignore DelayTimeLevel parameter
    if (msg.getDelayTimeLevel() != 0) {
        // 事务消息不支持延迟发送
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    // 标记prepare消息 Broker根据这个判断是否是一条事务消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    // 设置消息生产者组 为了查询事务消息本地事务状态时 从该生产者组中随机选择一个消息生产者即可
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    // ......
    // 对发送结果的处理稍后解析
}

2、Broker接收事务消息

// asyncSendMessage方法
CompletableFuture<PutMessageResult> putMessageResult = null;
// 依据消息是否有MessageConst.PROPERTY_TRANSACTION_PREPARED判断是否事务消息
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(transFlag)) {
    // 处理事务消息
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");
        return CompletableFuture.completedFuture(response);
    }
    // 保存事务消息
    putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
    // 保存消息
    putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    // 备份原本的topic和队列
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    // 设置新的topic为RMQ_SYS_TRANS_HALF_TOPIC
	msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    // 队列是写死的 只有一个 也就是说是顺序处理的
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

3、Producer处理发送消息结果

// sendMessageInTransaction方法
try {
    // 事务消息发送结果
    sendResult = this.send(msg);
} catch (Exception e) {
    throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
// 处理事务消息发送结果
switch (sendResult.getSendStatus()) {
    // 发送成功的情况
    case SEND_OK: {
        try {
            if (sendResult.getTransactionId() != null) {
                msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
            }
            String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (null != transactionId && !"".equals(transactionId)) {
                msg.setTransactionId(transactionId);
            }
            if (null != localTransactionExecuter) {
                // 这个已经废弃了 不会进入这里
                localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
            } else if (transactionListener != null) {
                // 这里执行本地事务
                log.debug("Used new transaction API");
                localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
            }
            if (null == localTransactionState) {
                localTransactionState = LocalTransactionState.UNKNOW;
            }

            if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                log.info("executeLocalTransactionBranch return {}", localTransactionState);
                log.info(msg.toString());
            }
        } catch (Throwable e) {
            // 有catch逻辑 是考虑到事务执行异常的场景
            log.info("executeLocalTransactionBranch exception", e);
            log.info(msg.toString());
            localException = e;
        }
    }
    break;
    case FLUSH_DISK_TIMEOUT:
    case FLUSH_SLAVE_TIMEOUT:
    case SLAVE_NOT_AVAILABLE:
        localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
        break;
    default:
        break;
}

4、Producer通知Broker事务执行状态

try {
    // 事务消息收尾工作 通知Broker干活
    this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
    log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
public void endTransaction(
    final Message msg,
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    // 设置本地事务执行状态
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    // 发送消息给Broker
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}
public void endTransactionOneway(
    final String addr,
    final EndTransactionRequestHeader requestHeader,
    final String remark,
    final long timeoutMillis
) throws RemotingException, InterruptedException {
    // 指定Broker使用EndTransactionProcessor处理
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);

    request.setRemark(remark);
    // 单向消息 不考虑发送结果 
    // 也就是说 是可能发送失败的 发送失败之后Broker会回查
    this.remotingClient.invokeOneway(addr, request, timeoutMillis);
}

5、Broker处理事务执行状态通知

// processRequest方法
// 上面的代理是Broker向Producer回查事务后的处理 稍后解析
else {
    // 发送半消息之后产生的调用
    switch (requestHeader.getCommitOrRollback()) {
        // 如果事务执行不是commit或者rollback 直接返回 不再进行下面的逻辑
        case MessageSysFlag.TRANSACTION_NOT_TYPE: {
            LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."
                    + "RequestHeader: {} Remark: {}",
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                requestHeader.toString(),
                request.getRemark());
            return null;
        }

        // 如果事务执行是commit 接着下面的处理
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
            break;
        }
    	// 如果事务执行是rollback 打印异常日志 接着下面的处理
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
            LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
                    + "RequestHeader: {} Remark: {}",
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                requestHeader.toString(),
                request.getRemark());
            break;
        }
        default:
            return null;
    }
}
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    // 根据偏移量 取出topic是RMQ_SYS_TRANS_HALF_TOPIC的消息
    // 第2步Broker保存消息之后 会把偏移量通知Producer Producer再传到这里
    result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
    // 事务执行状态是commit
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 从消息中取出原来的topic和队列等信息 构建真实消息
            MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
            msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
            // 清除事务消息相关标记 防止循环处理
            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
            // 保存真实消息
            RemotingCommand sendResult = sendFinalMessage(msgInner);
            if (sendResult.getCode() == ResponseCode.SUCCESS) {
                // 删除消息RMQ_SYS_TRANS_HALF_TOPIC
                // 实际上是投递到RMQ_SYS_TRANS_OP_HALF_TOPIC中 并标记删除 
                // 这里为什么还要投递到RMQ_SYS_TRANS_OP_HALF_TOPIC中 不直接删除呢 后面还需要根据这个判断是否是重复处理等
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return sendResult;
        }
        return res;
    }
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
    // 如果事务状态是rollback 删除消息RMQ_SYS_TRANS_HALF_TOPIC 投递到RMQ_SYS_TRANS_OP_HALF_TOPIC中并标记删除
    // 比commit少了一个投递真实主题的步骤
    result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
        }
        return res;
    }
}

6、Broker主动捞取消息

TransactionalMessageCheckService类实现Runnable接口,在Broker启动的时候,回调用BrokerController的start方法,在start方法中,会调用TransactionalMessageCheckService的start方法启动线程,run方法是一个死循环,默认每6秒执行一次

public void run() {
    log.info("Start transaction check service thread!");
    long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
    while (!this.isStopped()) {
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}
循环中实际执行的是这个方法
protected void onWaitEnd() {
    // 事务过期时间 只有当消息存储时间加上这个过期时间大于系统当前时间 才对消息执行事务回查 否则在下一次周期中执行事务回查操作
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    // 事务回查最大检测次数 如果超过最大检测次数还是无法获知消息的事务状态 不会再会回查 直接丢弃相当于回滚事务
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
    try {
        // 获取事务半消息主题下的全部队列 然后依次处理
        String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
        Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
        if (msgQueues == null || msgQueues.size() == 0) {
            log.warn("The queue of topic is empty :" + topic);
            return;
        }
        log.debug("Check topic={}, queues={}", topic, msgQueues);
        for (MessageQueue messageQueue : msgQueues) {
            long startTime = System.currentTimeMillis();
            MessageQueue opQueue = getOpQueue(messageQueue);
            // RMQ_SYS_TRANS_HALF_TOPIC消息消费进度
            long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
            // 收到事务消息提交或者回滚请求后的MQ_SYS_TRANS_OP_HALF_TOPIC中消息消费进度
            long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
            log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
            if (halfOffset < 0 || opOffset < 0) {
                log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                    halfOffset, opOffset);
                continue;
            }

            List<Long> doneOpOffset = new ArrayList<>();
            HashMap<Long, Long> removeMap = new HashMap<>();
            // 根据当前的处理进度 依次从已处理队列MQ_SYS_TRANS_OP_HALF_TOPIC拉取32条消息
            PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
            if (null == pullResult) {
                log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                    messageQueue, halfOffset, opOffset);
                continue;
            }
            // single thread
            // 获取空消息的次数
            int getMessageNullCount = 1;
            // 当前处理半消息的进度
            long newOffset = halfOffset;
            // 当前处理消息的队列偏移量
            long i = halfOffset;
            while (true) {
                // 超过时常等待下次调度
                if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                    log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                    break;
                }
                // 如果该消息已被处理 继续处理下一条消息
                if (removeMap.containsKey(i)) {
                    log.debug("Half offset {} has been committed/rolled back", i);
                    Long removedOpOffset = removeMap.remove(i);
                    doneOpOffset.add(removedOpOffset);
                } else {
                    // 获取消息
                    GetResult getResult = getHalfMsg(messageQueue, i);
                    MessageExt msgExt = getResult.getMsg();
                    if (msgExt == null) {
                        // 超过重试次数 直接跳出 结束该消息队列的事务回查状态
                        if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                            break;
                        }
                        // 没有新的消息而返回 结束该消息队列的事务回查
                        if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                            log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                messageQueue, getMessageNullCount, getResult.getPullResult());
                            break;
                        } else {
                            log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                i, messageQueue, getMessageNullCount, getResult.getPullResult());
                            // 其它原因 重新拉取
                            i = getResult.getPullResult().getNextBeginOffset();
                            newOffset = i;
                            continue;
                        }
                    }

                    // needDiscard 如果该消息回查的次数超过允许回查的最大次数 该消息将被丢弃 事务消息提交失败 每回查一次 在消息属性中+1 默认回查最大次数为5
                    // needSkip 如果事务消息超过文件的过期时间 默认72小时 则跳过该消息
                    if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                        listener.resolveDiscardMsg(msgExt);
                        newOffset = i + 1;
                        i++;
                        continue;
                    }
                    if (msgExt.getStoreTimestamp() >= startTime) {
                        log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                            new Date(msgExt.getStoreTimestamp()));
                        break;
                    }

                    // 消息已存储时间 当前系统时间减去消息存储时间
                    long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                    // checkImmunityTime 立即检测事务消息的时间 设计的意义是 应用程序在发送事务消息后 事务不会马上提交 该时间就是假设事务消息发送成功后 应用程序事务提交时间 在这段时间内 RocketMQ任务事务未提交 不应该在这个时间段内向应用程序发送回查请求
                    // transactionTimeout 事务消息的超时时间 这个时间是从OP拉取消息的最后一条消息存储时间与check方法开始的时间 如果时间差超过了transactionTimeout 就算时间小于checkImmunityTime 也发送事务回查指令
                    long checkImmunityTime = transactionTimeout;
                    // PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS 消息事务消息回查的最晚时间 单位为秒 指的是程序发送事务消息 可以指定该事务消息的有效时间 只有在这个时间内收到回查消息才有效 默认为null
                    String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                    if (null != checkImmunityTimeStr) {
                        checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                        if (valueOfCurrentMinusBorn < checkImmunityTime) {
                            if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                newOffset = i + 1;
                                i++;
                                continue;
                            }
                        }
                    } else {
                        // 如果当前时间没过应用程序事务结束时间 跳出本次处理
                        if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                            log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                            break;
                        }
                    }
                    List<MessageExt> opMsg = pullResult.getMsgFoundList();
                    // 如果OP队列中没有已处理消息并且已经超过应用程序事务结束时间transactionTimeout
                    // 或者
                    // 操作队列不为空并且最后一条消息的存储时间已经超过transactionTimeout
                    boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                        || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                        || (valueOfCurrentMinusBorn <= -1);

                    if (isNeedCheck) {
                        // 这里回查是异步处理的 所以在回查之前 需要把消息重新投递到队列中 以便下次check
                        if (!putBackHalfMsgQueue(msgExt, i)) {
                            continue;
                        }
                        // 执行回查逻辑
                        listener.resolveHalfMsg(msgExt);
                    } else {
                        // 如果无法判断是否发送回查消息 则加载更多的已处理消息进行筛选
                        pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                        log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                            messageQueue, pullResult);
                        continue;
                    }
                }
                newOffset = i + 1;
                i++;
            }
            if (newOffset != halfOffset) {
                // 保存半消息的回查进度
                transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
            }
            long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
            if (newOpOffset != opOffset) {
                // 保存OP进度
                transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
            }
        }
    } catch (Throwable e) {
        log.error("Check error", e);
    }

}

7、Broker主动回查事务状态

public void resolveHalfMsg(final MessageExt msgExt) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                sendCheckMessage(msgExt);
            } catch (Exception e) {
                LOGGER.error("Send check message error!", e);
            }
        }
    });
}

public void sendCheckMessage(MessageExt msgExt) throws Exception {
    CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
    checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
    checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
    checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
    checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
    msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
    msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
    msgExt.setStoreSize(0);
    String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
    // 从同一个生产者组中选择一个Producer进行回查
    // 所以同一个生产者组中如果部分机器出现宕机、发布重启等问题 也不会影响回查
    Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
    if (channel != null) {
        brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
    } else {
        LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
    }
}

public void checkProducerTransactionState(
    final String group,
    final Channel channel,
    final CheckTransactionStateRequestHeader requestHeader,
    final MessageExt messageExt) throws Exception {
    // 给Producer发送消息时 指定类型是CHECK_TRANSACTION_STATE
    RemotingCommand request =
        RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
    request.setBody(MessageDecoder.encode(messageExt, false));
    try {
        this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
    } catch (Exception e) {
        log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}",
                group, messageExt.getMsgId(), e.toString());
    }
}

9、Producer本地事务回查

public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
    switch (request.getCode()) {
        case RequestCode.CHECK_TRANSACTION_STATE:
            // 判断是Broker事务回查 检查本地事务执行状态
            return this.checkTransactionState(ctx, request);
        // ......省略部分代码
    }
    return null;
}
public void checkTransactionState(final String addr, final MessageExt msg,
    final CheckTransactionStateRequestHeader header) {
    Runnable request = new Runnable() {
        //...省略部分代码 下文解析
    };

    // 这里正是用新建TransactionMQProducer时创建的线程池异步执行提高效率
    this.checkExecutor.submit(request);
}
public void run() {
    TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
    TransactionListener transactionListener = getCheckListener();
    if (transactionCheckListener != null || transactionListener != null) {
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable exception = null;
        try {
            if (transactionCheckListener != null) {
                localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
            } else if (transactionListener != null) {
                // 检查本地事务
                log.debug("Used new check API in transaction message");
                localTransactionState = transactionListener.checkLocalTransaction(message);
            } else {
                log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
            }
        } catch (Throwable e) {
            log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
            exception = e;
        }

        // 将本地事务状态通知Broker
        // 和第四步Producer第一次尝试通知Broker一样 也是单向发送 可能发送失败
        this.processTransactionState(
            localTransactionState,
            group,
            exception);
    } else {
        log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/161081.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何选择显示检波器

之所以介绍频谱仪的显示检波器&#xff0c;是因为在宽带信号功率测试、功率谱密度测试及相噪测试等应用中&#xff0c;对显示检波器的选择有一定的要求。如果选择的检波器不合适&#xff0c;那么将无法准确完成测试。本文的目的也是想让初学者对这一块内容有更多的认识&#xf…

新手运营适合哪个跨境电商平台

很多企业的网站被收录却没有排名&#xff0c;关键词优化不上去&#xff0c;网站也没有什么流量&#xff0c;不断更新文章&#xff0c;即使是原创&#xff0c;也排不上去&#xff0c;这究竟是由于哪些原因造成的呢&#xff1f;米贸搜作为专业的SEO平台&#xff0c;整理了以下几种…

茶叶为啥那么贵?

60000个嫩芽才制作一斤好茶 茶叶采摘成本太高 如何通过机器人采茶&#xff1f; 趣讲大白话&#xff1a;茶叶贵是有道理滴 *********** 浙江理工大学智能采茶机器人 能自主识别茶树芽叶 控制机械臂进行精准采摘 芽叶识别准确率能达到82%左右 平均采摘速度2.5秒/颗 采摘成功率达…

【NI Multisim 14.0原理图的设计——简单电路设计】

目录 &#x1f95d;&#x1f95d;序言 &#x1f34d;1.使用菜单命令 &#x1f34d;2.右键快捷命令 &#x1f34d;3. 使用快捷键 &#x1f95d;&#x1f95d;一、放置导线 &#x1f34d;1.自动连线 &#x1f34d;2. 手动连线 &#x1f34d; 3.设置导线的属性 &#x1f3…

单目ADAS系列教程-相机基础篇

文章目录前言相机相关的4大坐标系像素坐标系与图像坐标系的转换图像坐标系与相机坐标系的转换相机坐标系与世界坐标系的转换相机畸变标定方法小结前言 PS&#xff1a;本文仅讨论针孔模型相机&#xff0c;其余类型相机并不涉及&#xff01; 相机基础包括相机内参&#xff0c;相…

(1)WireShark

1.工具简介(1)定义WireShark是一个网络封包分析软件。网络封包分析软件的功能是抓取网络封包&#xff0c;并尽可能显示出最为详细的网络封包资料。使用WinPACA作为接口&#xff0c;直接与网卡进行数据报文交换。(2)嗅探器工作原理收集&#xff1a;从网络线缆上收集原始二进制数…

Redis客户端命令基础操作二

目录 Redis中五种常用的结构&#xff1a; 字符串&#xff1a;String 字符串可以存储三种类型的值:字节串、整数、浮点数 列表&#xff1a;List Redis中五种常用的结构&#xff1a; 字符串(string)、列表(list)、集合(set)、散列(hash)、有序集合(zset)客户端基础命令操作 字…

Java项目:旅游网站管理系统设计和实现(java+springboot+jsp+mysql+spring)

源码获取&#xff1a;博客首页 "资源" 里下载&#xff01; 运行环境: java jdk 1.8 IDE环境&#xff1a; IDEA tomcat环境&#xff1a; Tomcat 7.x,8.x,9.x版本均可 主要功能说明&#xff1a; 管理员角色包含以下功能&#xff1a;管理员登录,用户管理,旅游路线管…

集装箱号识别率99.98%+实时返回结果高泛化,全球领先飞瞳引擎集装箱识别检测云服务全球三千企业用户,集装箱信息识别铅封识别免费

飞瞳引擎™AI集装箱识别检测云服务全球三千企业使用&#xff0c;顶尖AI科技集装箱号识别率99.98%以上高泛化性高鲁棒性&#xff0c;可二次开发或小程序拍照使用&#xff0c;集装箱号铅封号识别API免费实时返回结果。CIMCAI是全球规模领先应用范围领先&#xff0c;核心技术领先的…

帆软数据决策平台连接SAP RFC实例

一、介绍由于SAP ABAP开发出来的报表很单一&#xff0c;形式很有限&#xff0c;而且调整报表格式和形式都显得特别的鸡肋&#xff0c;所以现在将SAP系统通过RFC接口模式接入到帆软报表数据决策平台下展示。本文将详细介绍如何将数据从SAP传输到帆软平台上。二、准备工作首先得先…

色氨酸代谢与肠内外健康稳态

谷禾健康 色氨酸&#xff08;Tryptophan&#xff0c;简称 Try&#xff09;是人体必需氨基酸&#xff0c;也是唯一含有吲哚结构的氨基酸&#xff0c;由食物尤其膳食蛋白质提供&#xff0c;是正常细胞稳态所必需的&#xff0c;是维持细胞生长和协调机体对环境和饮食线索的反应&am…

Java基础06——字符串

Java基础06——字符串一、String1. 字符串特点2. 创建字符串对象的两种方式直接赋值new3. 字符串常用方法a. 比较b. 遍历c. 截取d. 替换二、StringBuilder1. StringBuilder概述2. StringBuilder构造方法3. StringBuilder常用方法三、StringJoiner1. StringJoiner概述2. StringJ…

mysqldump binlog增量恢复会导致数据重复

1. mysqldump时间很长&#xff0c;导出第一个表和导出最后一个表的时间可能过去几个小时&#xff0c;如果期间不锁库&#xff0c;使用binlog增量恢复的时候&#xff0c;如果从备份开始的binlog开始恢复&#xff0c;备份期间别的表的改动通过应用binlog日志会再次被应用一次。导…

如何做好舆情管控,TOOM舆情监控服务工作经验总结

网络舆情监测剖析是实时控制网络舆情动态的一项基本工作&#xff0c;也是妥当处置网络有害信息&#xff0c;制定有效宣扬策略&#xff0c;准确引诱舆论导向的主要前提与根据 。接下来简单了解如何做好舆情管控&#xff0c;TOOM舆情监控服务工作经验总结。 一、如何做好舆情管控…

priority_queue 优先级队列(堆) 的模拟实现

目录 一、优先级队列的模板参数列表 二、优先级队列的构造函数&#xff08;建堆 nlogn&#xff09; AdjustDown() 向下调整&#xff1a; 建堆的时间复杂度&#xff1a; 三、pop()接口 &#xff08;堆顶元素的删除&#xff1a; logn&#xff09; 四、push()接口 &#xff…

算法第九期——DFS(深度优先搜索)对树的应用

树 树是一种特殊的图 。 特点&#xff1a; 若树有n个点,则有n-1条边。树有连通性但没有回路。从一个点出发可以到达任意一个&#xff0c;而且路径是唯一的。树的重心u&#xff08;最平衡的点&#xff09;: 以树上任意一个结点为根计算它的子树的结点数&#xff0c;如果结点…

1578_AURIX_TC275_MTU中的ECC检测、错误追踪以及运行模式

全部学习汇总&#xff1a; GreyZhang/g_TC275: happy hacking for TC275! (github.com) EOV其实是体现了一个错误递增的概念&#xff0c;而且这个是积累到了一定的度。至于具体的规则&#xff0c;其实后面还有更加详细的信息。关于ECC错误纠正使能&#xff0c;相应的处理可能跟…

产品更新!数维图编辑器超10项功能升级

新的一年我们加紧了更新迭代的速度&#xff0c;覆盖数维图三大可视化编辑器产品&#xff0c;超10项功能升级优化。我们将继续保持每天更新的产品升级节奏&#xff0c;满足不同行业用户的更多需求&#xff0c;为用户带来极致的产品使用体验。以下是主要的亮点功能更新汇总&#…

用R语言理解连续性和导数

文章目录微分1 连续性2 求导微分 1 连续性 众所周知微分的几何意义是斜率&#xff0c;然而斜率最初的定义只涉及直线&#xff0c;指的是ykxbykxbykxb中的kkk&#xff0c;而对任意曲线yf(x)yf(x)yf(x)而言&#xff0c;若想谈其斜率&#xff0c;就必须先做出其切线&#xff0c;…

#A. 毛毛虫树

Description给你一棵树希望你找出一条链来&#xff0c;这条链上的点&#xff0c;及这些点直接相连的点&#xff0c;加起来点数尽可能的多FormatInput第一行两个整数N&#xff0c;M&#xff0c;分别表示树中结点个数和树的边数。接下来M行&#xff0c;每行两个整数a, b表示点a和…