RocketMQ 消费者之顺序消费和流程详解附源码解析

news2025/1/13 2:57:48

1. 背景

本文是 RocketMQ 消费者系列的第六篇,上一篇主要介绍并发消费,而本片主要介绍 RocketMQ 顺序消费的设计和流程。

我把 RocketMQ 消费分成如下几个步骤

  1. 重平衡
  2. 消费者拉取消息
  3. Broker 接收拉取请求后从存储中查询消息并返回
  4. 消费者消费消息

顺序消息逻辑主要在步骤 4 当中承载,然而为了能够实现顺序的消费,在步骤 1 和 2 中就需要有相应的操作以锁定消费队列,确保顺序消费。


RocketMQ 支持 2 种消费方式,即并发消费和顺序消费。

并发消费是比较常用的一种消费方式。并发有两层含义:

  1. 队列间并发消费:一个 Topic 可以有 N 个队列,每个队列都可以被一个消费者消费,最多可以有 N 个消费者同时进行消费
  2. 队列中的多个消息并发消费:消费者执行消费逻辑时,使用一个消费线程池进行消费,该线程池默认有 20 个线程同时进行消费,所以也有可能并发消费一个队列中的多个消息

由于 RocketMQ 的 Topic 和队列的设计,让同一 Topic 下的并发消费无法避免,因为只要有多个队列让多个消费者同时消费,就必定存在并发消费。所以 RocketMQ 仅支持单个队列中的消息的顺序消费。

如果要做到 Topic 维度严格的顺序,那么需要将 Topic 的队列数设为 1。

2. 概要设计

顺序消费的设计的要点主要有两个:

  1. 保证消费速度,即消费线程池同时消费多个队列的消息。

  2. 保证顺序性,即一个队列同一时间只能被一个消费者的一个线程消费。

第一点可以在并发消费的基础上修改消费线程池的逻辑,第二点需要用到多个锁来保证。

2.1 消费线程池逻辑

2.1.1 消费线程池顺序消费

并发消费的消费线程池,每个线程的消费任务是:消费一批(默认一条)消息。消费任务由拉取任务提交,存在同时运行的多个任务都是消费同一个队列的可能。

顺序消费的消费任务也由拉取任务提交,逻辑改成了:持续消费一个队列的消息,直到该队列的消息消费完或者超过最大消费时间(1分钟)。这样可以保证每个队列最多被一个消费线程处理,每个消费线程也最大程度上被使用。1 分钟的超时时间是为了防止某个队列消息过多或消费过慢,让其他队列也有被消费到的机会。

2.1.2 保证一个队列同时仅被一个消费线程消费

在上述逻辑的基础上,可能存在同时有多个同一队列的消费任务被提交到消费线程池的情况,需要保证一个队列最多仅被一个线程消费。

RocketMQ 为每个消息队列都初始化一把锁(消息队列锁),消费任务开始时获取锁,消费任务结束时释放锁,保证就算有多个线程同时消费一个队列,但同时最多只有一个线程真正在执行消费(其他线程都在等待锁释放)。

这个实现可能导致消费线程池中多个线程同时等待一把锁,似乎是一个可以优化的点。比如通过修改消息拉取线程, 保证该队列消费完之前不拉取新的消息(拉取消息前判断该队列的消息是否被消费完,如果未被消费完则跳过拉取操作,将该拉取请求放入拉取队列末尾)

2.2 重平衡时顺序消费

消费任务中添加的消息队列锁保证了同一消费者中的顺序消费,但保证消费线程池顺序消费就可以实现顺序消费了吗?NO。顺序消费还有一个敌人——重平衡。

重平衡可能会将一个队列的消费负载从一个消费者切换到另一个消费者。老消费者的消费任务可能还没有结束,新消费者开始消费,这样也存在两个消费者同时消费一个队列的情况。


2.2.1 Broker 管理的分布式锁

为了跨消费者进行消费队列负载的协调,引入分布式锁是最好的选择,RocketMQ 让 Broker 来管理和保存该锁,我们姑且把这个锁叫做 Broker锁

消费者在重平衡后,对于新分配到的队列,会尝试向 Broker 请求 Broker锁 ;丢弃的队列则向 Broker 释放锁。如果请求到锁才开始拉取,请求不到则等到下次重平衡再次请求。

Broker锁 在 Broker 端被设了过期时间,1 分钟不更新则过期。顺序消费服务的定时任务会每 20s 向 Broker 发送请求,请求和更新自己负载的消费队列的 Broker锁

在消息拉取和顺序消费服务每消费一批消息前,都会判断是否持有 Broker锁,来保证没有多个消费者在重平衡后同时消费一个队列。

还有一个消费锁,用来保证在有消息正在被顺序消费时,不能释放 Broker锁

3. 详细设计

3.1 Broker锁

3.1.1 设计

Broker锁 是在 Broker 上保存和管理的分布式锁,它的存在是为了解决重平衡后可能的不同消费者并行消费问题。由 Broker 端的 RebalanceLockManager 管理。

每个消息队列都可能会有一个锁,每个消费组的消费应该隔离,所以每个消费组都有一组锁。锁的容器是两层 Map:

ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry/* client ID & update timestamp */>> mqLockTable

为了避免一些异常情况,如消费者异常退出,没有主动解锁,导致队列一直上锁,这里为 Broker锁 设置了超时时间(默认 1 分钟)。

由消费者向 Broker 发请求进行加解锁、查询是否加锁等操作。

alt
3.1.2 加解锁时机

消费者重平衡后,调用 updateProcessQueueTableInRebalance,如上图粉色部分所示。

对于要丢弃的队列,尝试释放 Broker锁;对于对分配到的队列尝试请求 Broker锁

3.1.3 加解锁流程

加锁的过程是向 Broker 发请求,锁管理器将锁创建后放入锁容器 mqLockTable 并更新加锁时间;解锁则是将 LockEntry 从锁容器中移除。

上图展示了加锁流程:

  1. 重平衡后更新处理队列,尝试加锁(或者更新现有锁),向 Broker 发请求
  2. MQClientAPIImpl 中准备锁定请求,调用 NettyRemotingClient 向 Broker 发送请求
  3. Broker 端 AdminBrokerProcessor 处理加锁请求
  4. RebalanceLockManager 最终处理加锁逻辑,先获取锁容器的改动锁。然后为没有加锁的队列新建 LockEntry,为已有的 LockEntry 更新加锁时间和客户端 ID。最后释放锁容器的锁。
3.1.4 边界条件:消费消息时重平衡

存在一种边界条件,消费消息耗时较长的情况下,可能存在重平衡后要释放某个队列的 Broker锁,但是这个队列仍有消息正在被消费。

这种情况下不能贸然释放 Broker锁,因为如果释放,其他消费者会获取 Broker锁并开始消费,可能会重复且并发地消费那批没有消费完的消息,这里就存在并发问题。

RocketMQ 的做法是如果队列有消息正被消费,则不会释放它的 Broker锁,等到下一次重平衡(默认 20s 周期)再判断是否可以释放。

具体的做法是引入一个新的锁 消费锁,一批消息消费时加锁,消费完后解锁。

重平衡后尝试释放 Broker锁 前,先试着获取 消费锁,如果 1s 内获取不到,则跳过。

3.2 消费线程池顺序消费逻辑

3.2.1 消费线程池顺序消费设计

每个消费者都有一个消费线程池(默认 20 个线程)来处理消息拉取任务拉取成功后提交的消费任务。

消息拉取任务将拉取到的消息都放到消息队列对应的处理队列中,然后将处理队列整个放入消费任务,提交给消费线程池。

每个消费任务都持续消费一个队列,直到消费时间到期(1 分钟)或者处理队列中消息消费完。

为了保证消费线程池中没有多个线程消费同一个队列,为每个消费队列初始化消费队列锁(MQ Lock),并在消费任务执行时加锁

3.2.2 消费线程池顺序消费任务流程
alt
  1. 检查处理队列是否丢弃(是否仍由该消费者负载该队列)
  2. 获取消费队列锁(保证消费线程池中消费这个队列的最多只有一个线程)
  3. 保证处理队列的 Broker锁 已经获取且未过期
  4. 从处理队列中取一批消息(默认 1 条)
  5. 对于重试消息,从消息属性中获取它真正的 Topic 并恢复
  6. 执行消费前钩子函数
  7. 消费锁,执行消费逻辑,释放 消费锁
  8. 根据消费结果设置返回状态
  9. 执行消费后钩子函数
  10. 处理这批消息的消费结果
    1. 检查消费结果
    2. 如果消费失败,广播模式:打印错误日志;集群模式:将消息发回 Broker,等待重新消费
    3. 将消息从处理队列移除
    4. 提交消费进度

3.3 顺序消费流程

alt
  1. 消费流程以重平衡为起点。重平衡后向 Broker 发送请求,释放和申请 Broker锁
  2. 为该消费者负载的队列新建消息拉取请求,进行消息拉取
  3. pullMessage:拉取前再次判断队列 Broker锁 的状态。消息拉取线程从拉取任务队列中获取任务并执行拉取
  4. 拉取成功后的 PullCallback 将拉取到的消息放入处理队列,然后给顺序消费服务提交顺序消费任务
  5. 顺序消费服务的消费线程池执行消费任务,执行逻辑上面讲过

4. 源码解析

4.1 Broker锁

4.1.1 Broker锁 管理器
public class RebalanceLockManager {
    // 锁容器读写锁
    private final Lock lock = new ReentrantLock();
    // 消息队列锁定状态表
    private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
        new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
    /**
     * 批量锁定消息队列
     *
     * @param group 消费组
     * @param mqs 要锁定的消息队列
     * @param clientId 客户端 ID
     * @return 锁定成功的消息队列
     */

    public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
        final String clientId)
 
{
        // 要锁定的队列中已经锁定的队列
        Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
        // 之前没有锁定,需要锁定的队列
        Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());

        for (MessageQueue mq : mqs) {
            if (this.isLocked(group, mq, clientId)) {
                lockedMqs.add(mq);
            } else {
                notLockedMqs.add(mq);
            }
        }

        // 锁定需要锁定的队列
        if (!notLockedMqs.isEmpty()) {
            try {
                this.lock.lockInterruptibly();
                try {
                    ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
                    if (null == groupValue) {
                        groupValue = new ConcurrentHashMap<>(32);
                        this.mqLockTable.put(group, groupValue);
                    }

                    for (MessageQueue mq : notLockedMqs) {
                        // 为队列新建锁定标识,加入锁定状态表
                        LockEntry lockEntry = groupValue.get(mq);
                        if (null == lockEntry) {
                            lockEntry = new LockEntry();
                            lockEntry.setClientId(clientId);
                            groupValue.put(mq, lockEntry);
                        }

                        if (lockEntry.isLocked(clientId)) {
                            lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                            lockedMqs.add(mq);
                            continue;
                        }

                        String oldClientId = lockEntry.getClientId();

                        if (lockEntry.isExpired()) {
                            lockEntry.setClientId(clientId);
                            lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                            lockedMqs.add(mq);
                            continue;
                        }
                    }
                } finally {
                    this.lock.unlock();
                }
            } catch (InterruptedException e) {
                log.error("putMessage exception", e);
            }
        }
        return lockedMqs;
    }
}
4.1.2 消费者解锁/加锁
4.1.2.1 重平衡后更新处理队列

重平衡时,队列分配完成后对队列进行解锁和加锁

// RebalanceImpl.java
/**
 * 重平衡后更新 ProcessQueue 表
 * 丢弃不再消费的队列,为新增的队列新建 ProcessQueue 和 PullRequest
 *
 * @param topic 主题
 * @param mqSet 重平衡后该消费者新分配到的的消息队列
 * @param isOrder 是否顺序消费
 * @return
 */

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
                                                   final boolean isOrder)
 
{
    boolean changed = false;

    // 遍历本地缓存的消费者分到的消息队列,判断要丢弃的队列并丢弃
    Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<MessageQueue, ProcessQueue> next = it.next();
        MessageQueue mq = next.getKey();
        ProcessQueue pq = next.getValue();

        if (mq.getTopic().equals(topic)) {
            // 如果新分配到的消息队列集合中不含有老的消息队列,丢弃老的处理队列
            if (!mqSet.contains(mq)) {
                // 该 ProcessQueue 中不会有消息被消费
                pq.setDropped(true);
                // 移除消费队列,并释放 Broker锁,移除前持久化。
                if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                    it.remove();
                    changed = true;

                }
            } else if (pq.isPullExpired()) {
                /**
                 * 如果Reblance之后的mq集合包含该MessageQueue,但是ProcessQueue已经太久没有拉取数据(上次拉取消息的时间距离现在超过设置时间)
                */

                switch (this.consumeType()) {
                    case CONSUME_ACTIVELY:
                        break;
                    case CONSUME_PASSIVELY:
                        /**
                         * PushConsumer为被动消费
                         * 如果是PUSH,则丢弃ProcessQueue
                         * 同时删除MessageQueue
                         */

                        pq.setDropped(true);
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                        }
                        break;
                    default:
                        break;
                }
            }
        }
    }

    // 遍历新分配的 MessageQueue,对于新分配的,创建 PullRequest 启动拉取
    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    // 为每个 MessageQueue 新建一个 PullRequest
    for (MessageQueue mq : mqSet) {
        // 本地缓存的 ProcessQueue 中不包含,表示新增队列
        if (!this.processQueueTable.containsKey(mq)) {
            // 如果是顺序消费,尝试给处理队列加锁:尝试向 Broker 发送锁定队列请求。如果加锁失败则跳过,在下一次重平衡时尝试加锁
            if (isOrder && !this.lock(mq)) {
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                // 顺序消费,锁定处理队列失败,跳过拉取
                continue;
            }

            // 从内存中移除该 MessageQueue 的消费进度(老的进度不需要)
            this.removeDirtyOffset(mq);
            ProcessQueue pq = new ProcessQueue();

            // 计算当前 MessageQueue 应该从哪里开始拉取消息
            long nextOffset = -1L;
            try {
                // 计算并从偏移量存储中读取下次拉取的偏移量
                nextOffset = this.computePullFromWhereWithException(mq);
            } catch (Exception e) {
                log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
                continue;
            }

            if (nextOffset >= 0) {
                // 添加 MessageQueue 和 ProcessQueue 的映射关系
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {
                    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                } else {
                    // 添加成功,创建新的 PullRequest
                    // 唯一的创建 PullRequest 的地方
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {
                log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
            }
        }
    }

    // 将新建的 PullRequest 加入消息拉取线程 PullMessageService,开始拉取
    this.dispatchPullRequest(pullRequestList);

    return changed;
}
4.1.2.2 消息队列解锁
// RebalancePushImpl.java
/**
 * 将重平衡后丢弃的消费队列移除
 * 移除前持久化消费的消费进度
 *
 * @param mq 消息队列
 * @param pq 处理队列
 * @return
 */

@Override
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
    // 持久化消费进度,然后移除
    this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
    this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
    if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
        && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
        try {
            // 如果是顺序消费,尝试获取队列的消费锁,最多等待 1s
            if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
                // 获取成功,表示该队列没有消息正被消费,可以向 Broker 发请求解锁该队列
                try {
                    return this.unlockDelay(mq, pq);
                } finally {
                    pq.getConsumeLock().unlock();
                }
            } else {
                // 获取消费锁失败,表示该队列有消息正被消费,且消费时长大于 1s,那么本次无法将该队列解锁
                // 该队列新分配到负载的 Broker 由于拿不到该队列的锁,也无法开始消费,需要等待下一次重平衡时再尝试解锁
                log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
                         mq,
                         pq.getTryUnlockTimes());
                // 增加解锁尝试次数
                pq.incTryUnlockTimes();
            }
        } catch (Exception e) {
            log.error("removeUnnecessaryMessageQueue Exception", e);
        }

        return false;
    }
    return true;
}
4.1.2.3 消息队列加锁
// RebalanceImpl.java
/**
 * 锁定消息队列,向 Broker 发请求
 * 顺序消费时使用
 *
 * @param mq 要锁定的消息队列
 * @return 是否锁定成功
 */

public boolean lock(final MessageQueue mq) {
    // 查询 Broker 地址
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
    if (findBrokerResult != null) {
        // 构造锁定消息队列请求
        LockBatchRequestBody requestBody = new LockBatchRequestBody();
        requestBody.setConsumerGroup(this.consumerGroup);
        requestBody.setClientId(this.mQClientFactory.getClientId());
        requestBody.getMqSet().add(mq);

        try {
            // 调用 API 向 Broker 发送锁定队列请求,返回锁定成功的队列集合
            Set<MessageQueue> lockedMq =
                this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
            for (MessageQueue mmqq : lockedMq) {
                // 将锁定成功队列的处理队列加锁
                ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                if (processQueue != null) {
                    processQueue.setLocked(true);
                    processQueue.setLastLockTimestamp(System.currentTimeMillis());
                }
            }

            boolean lockOK = lockedMq.contains(mq);
            log.info("the message queue lock {}, {} {}",
                     lockOK ? "OK" : "Failed",
                     this.consumerGroup,
                     mq);
            return lockOK;
        } catch (Exception e) {
            log.error("lockBatchMQ exception, " + mq, e);
        }
    }

    return false;
}

4.2 消息拉取

4.2.1 消息拉取

消息拉取操作,对于顺序消费情况,提交拉取请求前会先判断处理队列的锁定状态。

  • 锁定:如果是第一次锁定(第一次拉取),计算并设置本次拉取任务消息队列的拉取偏移量
  • 未锁定:等待 3s 后再次执行拉取请求
/**
 * 拉取消息入口
 * 
 * @param pullRequest 拉取请求,包含需要拉取的消费队列信息
 */

public void pullMessage(final PullRequest pullRequest) {
    // 获取待拉取的消费队列
    final ProcessQueue processQueue = pullRequest.getProcessQueue();

    // ...

    if (!this.consumeOrderly) {
  // ...
    } else {
        // 处理顺序消费,如果处理队列已经被锁定
        if (processQueue.isLocked()) {
            // 如果之前没有被锁定过(是第一次拉取)
            if (!pullRequest.isPreviouslyLocked()) {
                long offset = -1L;
                try {
                    // 计算拉取偏移量
                    offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
                } catch (Exception e) {
                    // 计算异常,等待 3s 后再次判断
                    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                    log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
                    return;
                }
                boolean brokerBusy = offset < pullRequest.getNextOffset();
                log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                         pullRequest, offset, brokerBusy);
                if (brokerBusy) {
                    log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                             pullRequest, offset);
                }

                // 设置之前已经锁定过
                pullRequest.setPreviouslyLocked(true);
                // 设置要拉取的偏移量,在方法后面的逻辑中立即执行拉取
                pullRequest.setNextOffset(offset);
            }
        } else {
            // 如果处理队列没有被锁定,等待 3s 后将 PullRequest 放入拉取请求队列,再次判断锁定情况尝试拉取
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            log.info("pull message later because not locked in broker, {}", pullRequest);
            return;
        }
    }
    
    // ...
}
4.2.2 消息拉取成功
public void onSuccess(PullResult pullResult) {
    if (pullResult != null) {
        // 将消息字节数组解码成消息列表并填充msgFoundList;对消息进行TAG模式过滤
        pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                                                                                     subscriptionData);

        switch (pullResult.getPullStatus()) {
            // 找到对应消息
            case FOUND:
                // 上次请求偏移量
                long prevRequestOffset = pullRequest.getNextOffset();
                // 更新下一次拉取的偏移量
                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                // 计算和记录拉取用时
                long pullRT = System.currentTimeMillis() - beginTimestamp;
                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                                                                   pullRequest.getMessageQueue().getTopic(), pullRT);
                // 如果msgFoundList为空,马上进行下次拉取
                // msgFoundList为空的情况:因为根据TAG过滤时在服务端只验证了TAG的哈希码,客户端要再次对消息进行了过滤(见上),可能会出现为空的情况
                long firstMsgOffset = Long.MAX_VALUE;
                if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                } else {
                    // 获取返回结果中第一条消息的消费队列offset
                    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                                                                        pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                    // 将拉取到的消息存入ProcessQueue
                    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                    // 将拉取到的消息提交到ConsumeMessageService中供消费者消费(异步)
                    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                        pullResult.getMsgFoundList(),
                        processQueue,
                        pullRequest.getMessageQueue(),
                        dispatchToConsume);

                    // 等待pullInterval毫秒后重新拉取,或立即重新拉取
                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                                                               DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                    } else {
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    }
                }

                // ...
        }
        // ...
    }
}

4.3 顺序消费任务

// ConsumeMessageOrderlyService.java
// class ConsumeRequest
public void run() {
    if (this.processQueue.isDropped()) {
        log.warn("run, the message queue not be able to consume, because it's dropped. {}"this.messageQueue);
        return;
    }

    // 从锁容器中获取消息队列消费锁
    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
    // 消费时申请独占消息队列消费锁对象
    synchronized (objLock) {
        // 如果是广播模式;或者集群模式下处理队列被锁定且锁未超时,直接消费
        if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
            || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
            final long beginTime = System.currentTimeMillis();
            // 是否继续本次消费请求的消费任务
            for (boolean continueConsume = true; continueConsume; ) {
                if (this.processQueue.isDropped()) {
                    log.warn("the message queue not be able to consume, because it's dropped. {}"this.messageQueue);
                    break;
                }

                if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    && !this.processQueue.isLocked()) {
                    log.warn("the message queue not locked, so consume later, {}"this.messageQueue);
                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                    break;
                }

                if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    && this.processQueue.isLockExpired()) {
                    log.warn("the message queue lock expired, so consume later, {}"this.messageQueue);
                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                    break;
                }

                // 计算本次消费请求任务执行时长,如果超过 60s,结束本次消费任务,(顺序消费任务以时长计算)
                long interval = System.currentTimeMillis() - beginTime;
                if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                    // 如果本次消费任务执行时长超过 60s,等到 10ms 后继续执行,以便让其他队列有机会消费
                    ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                    break;
                }

                // 每次从处理队列获取的一批消息数量,默认为 1
                final int consumeBatchSize =
                    ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

                // 从处理队列中取一批(默认 1 条)消息
                List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                // 为重试消息恢复真正的 Topic
                defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                if (!msgs.isEmpty()) {
                    // 如果获取的这批消息不为空
                    final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                    ConsumeOrderlyStatus status = null;

                    // 消费前钩子函数
                    ConsumeMessageContext consumeMessageContext = null;
                    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                        consumeMessageContext = new ConsumeMessageContext();
                        consumeMessageContext
                            .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                        consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
                        consumeMessageContext.setMq(messageQueue);
                        consumeMessageContext.setMsgList(msgs);
                        consumeMessageContext.setSuccess(false);
                        // init the consume context type
                        consumeMessageContext.setProps(new HashMap<String, String>());
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                    }

                    long beginTimestamp = System.currentTimeMillis();
                    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                    boolean hasException = false;
                    try {
                        // 消费消息锁
                        this.processQueue.getConsumeLock().lock();
                        if (this.processQueue.isDropped()) {
                            log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                     this.messageQueue);
                            break;
                        }

                        // 消息消费监听器执行真正的消费业务逻辑
                        status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                    } catch (Throwable e) {
                        log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
                                               RemotingHelper.exceptionSimpleDesc(e),
                                               ConsumeMessageOrderlyService.this.consumerGroup,
                                               msgs,
                                               messageQueue), e);
                        hasException = true;
                    } finally {
                        // 消费消息锁解锁
                        this.processQueue.getConsumeLock().unlock();
                    }

                    if (null == status
                        || ConsumeOrderlyStatus.ROLLBACK == status
                        || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                        log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
                                 ConsumeMessageOrderlyService.this.consumerGroup,
                                 msgs,
                                 messageQueue);
                    }

                    // 判断消费状态
                    long consumeRT = System.currentTimeMillis() - beginTimestamp;
                    if (null == status) {
                        if (hasException) {
                            returnType = ConsumeReturnType.EXCEPTION;
                        } else {
                            returnType = ConsumeReturnType.RETURNNULL;
                        }
                    } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                        returnType = ConsumeReturnType.TIME_OUT;
                    } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                        returnType = ConsumeReturnType.FAILED;
                    } else if (ConsumeOrderlyStatus.SUCCESS == status) {
                        returnType = ConsumeReturnType.SUCCESS;
                    }

                    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                        consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                    }

                    if (null == status) {
                        status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }

                    // 消息消费后钩子函数
                    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                        consumeMessageContext.setStatus(status.toString());
                        consumeMessageContext
                            .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                    }

                    // 更新统计数据(消息消费时间)
                    ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                        .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

                    // 消费结果处理
                    continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                } else {
                    // 获取的这批消息为空,说明处理队列为空,结束本次消费请求的消费
                    continueConsume = false;
                }
            }
        } else {
            if (this.processQueue.isDropped()) {
                log.warn("the message queue not be able to consume, because it's dropped. {}"this.messageQueue);
                return;
            }

            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
        }
    }
}

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1921165.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

算法学习day10(贪心算法)

贪心算法&#xff1a;由局部最优->全局最优 贪心算法一般分为如下四步&#xff1a; 将问题分解为若干个子问题找出适合的贪心策略求解每一个子问题的最优解将局部最优解堆叠成全局最优解 一、摆动序列&#xff08;理解难&#xff09; 连续数字之间的差有正负的交替&…

GO channel 学习

引言 单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。 虽然可以使用共享内存进行数据交换&#xff0c;但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性&#xff0c;必须使用互斥量对内存进行加锁&#…

视频语音转文字工具用哪个好?推荐6款优质的视频转文字工具

在沉浸于电影情节时&#xff0c;周遭的喧嚣往往成了享受视听的障碍&#xff0c;这时&#xff0c;字幕的重要性便不言而喻。 字幕的作用远不止于此&#xff0c;它是听力受限观众的桥梁&#xff0c;也是语言学习者的得力助手。幸运的是&#xff0c;将视频语音转文字字幕现已变得…

红酒与未来科技:传统与创新的碰撞

在岁月的长河中&#xff0c;红酒以其深邃的色泽、丰富的口感和不同的文化魅力&#xff0c;成为人类文明中的一颗璀璨明珠。而未来科技&#xff0c;则以其迅猛的发展速度和无限的可能性&#xff0c;领着人类走向一个崭新的时代。当红酒与未来科技相遇&#xff0c;一场传统与创新…

电脑自动重启是什么原因呢?99%人都不知道的解决办法,直接打破循环

当你的电脑突然毫无预警地自动重启&#xff0c;不仅打断了工作流程&#xff0c;还可能导致未保存的数据丢失&#xff0c;这无疑是一件令人沮丧的事情。那么&#xff0c;电脑自动重启是什么原因呢&#xff1f;有什么方法可以解决呢&#xff1f;别担心&#xff0c;在大多数情况下…

对象与键值对数组的相互转换Object.entries与Object.fromEntries

Object.entries是JavaScript中的一个内置方法&#xff0c;它可以将一个对象的属性和值转换为一个包含键值对的数组。 let obj {name: mike,age: 18,sex: man } Object.entries(obj)Object.entries的使用场景: 1、动态更新对象属性 let obj {name: mike, age: 18, sex: man…

《昇思25天学习打卡营第02天|qingyun201003》

日期 心得 通过这次的学习&#xff0c;主要是了解过张量的基础概念&#xff0c;同时也知道有关构造张量的方法。通过索引查询张量&#xff0c;张量的运算。通过concat\stack 将张量进行维度链接。Tensor与NumPy的互相转换&#xff0c;但是我似乎并不了解什么它们的概念。也认知…

电脑视频去水印软件哪个好用,电脑上视频去水印的软件

在数字化时代&#xff0c;视频创作已成为许多人展示才华和创意的重要途径。然而&#xff0c;视频中的水印常常让人感到头疼&#xff0c;尤其是当水印影响了视频的整体美观时。本文将为你揭秘如何在电脑上使用各种软件去除视频水印&#xff0c;让你的作品更加专业&#xff01; 方…

时光穿梭机:AI如何让老照片焕发新生,跃然“动”起来

在岁月的长河中&#xff0c;每一张老照片都是时间的低语&#xff0c;承载着过往的记忆与温情。它们静静地躺在相册的角落&#xff0c;或是泛黄的相纸上&#xff0c;定格了某个瞬间的欢笑与泪水&#xff0c;却也因此失去了那份生动的活力。然而&#xff0c;随着人工智能&#xf…

如何做一个迟钝不受伤的打工人?

一、背景 在当前激烈的职场环境中&#xff0c;想要成为一个相对“迟钝”且不易受伤的打工人&#xff0c;以下是一些建议&#xff0c;但请注意&#xff0c;这里的“迟钝”并非指智力上的迟钝&#xff0c;而是指在应对复杂人际关系和压力时展现出的豁达与钝感力&#xff1a; 尊重…

如何在Linux系统安装openGauss数据库并使用固定公网地址远程连接

文章目录 前言1. Linux 安装 openGauss2. Linux 安装cpolar3. 创建openGauss主节点端口号公网地址4. 远程连接openGauss5. 固定连接TCP公网地址6. 固定地址连接测试 前言 本文主要介绍如何在Linux系统如何安装openGauss数据库管理系统&#xff0c;并结合cpolar内网穿透工具生成…

docker基础知识以及windows上的docker desktop 安装

记录以供备忘 基础概念&#xff1a; 什么是docker 将程序和环境一起打包&#xff0c;以在不同操作系统上运行的工具软件 什么是基础镜像 选一个基础操作系统和语言后&#xff0c;将对应的文件系统、依赖库、配置等打包为一个类似压缩包的文件&#xff0c;就是基础镜像 什么是…

【银河麒麟高级服务器操作系统】数据中心系统异常卡死分析处理建议

了解银河麒麟操作系统更多全新产品&#xff0c;请点击访问&#xff1a;https://product.kylinos.cn 1.服务器环境以及配置 【机型】浪潮NF5280M5 处理器&#xff1a; Intel 内存&#xff1a; 1T 【内核版本】 4.19.90-24.4.v2101.ky10.x86_64 【OS镜像版本】 银河麒麟…

python 屏幕显示一个文本窗口,文本窗口显示在当前鼠标在的位置,该文字窗口跟随鼠标移动,并且始终保持最前面显示 ,可以根据文字的多少来自动调节窗口大小

python 屏幕显示一个文本窗口,我有一段文字需要显示,鼠标在那里,文本窗口就在哪里显示,该文字窗口需要跟随鼠标移动,并且始终保持最前面显示,可以根据文字的多少来自动调节窗口大小 仅仅使用 tkinter # -*- coding:utf-8 -*-import tkinter as tkdef update_position(e…

安卓微信8.0之后如何利用缓存找回的三天之前不可见的朋友圈图片

安卓微信8.0之后如何利用缓存找回的三天之前不可见的朋友圈图片 复习了下安卓程序的知识&#xff0c;我们会了解到&#xff0c;安卓程序清楚数据的时候有两个选项 一个是清除全部数据一个是清除缓存。 清除全部数据表示清除应用数据缓存。 对于安卓微信8.0之后而言&#xff0…

动态规划|最长不下降子序列(自用)

参考视频小电视&#xff1a;【动态规划3-1&#xff1a;最长不下降序列】https://www.bilibili.com/video/BV1fK411L751?vd_source8e5e1c8c2ec3b6912ce3d9aa0c73f879 题目 题目分析 使用下标进行存储访问 int a[201]; int f[201];//从第i个数开始的长度 int p[201];//位置访…

centos磁盘空间满了-问题解决

报错问题解释&#xff1a; CentOS系统在运行过程中可能会出现磁盘空间不足的错误。这通常发生在以下几种情况&#xff1a; 系统日志文件或临时文件过大导致磁盘空间不足。 安装了大量软件或文件而没有清理无用文件。 有可能是某个进程占用了大量磁盘空间。 问题解决方法&a…

【C++杂货铺】C++11新特性

目录 &#x1f308; 前言&#x1f308; &#x1f4c1; C11介绍 &#x1f4c1; 统一初始化列表 &#x1f4c1; 声明 &#x1f4c2; auto &#x1f4c2; decltype &#x1f4c2; 返回类型后置 &#x1f4c2; 范围for &#x1f4c2; 模板别名 &#x1f4c2; nullptr &#x1…

彻底开源,免费商用,上海AI实验室把大模型门槛打下来

终于&#xff0c;业内迎来了首个全链条大模型开源体系。 大模型领域&#xff0c;有人探索前沿技术&#xff0c;有人在加速落地&#xff0c;也有人正在推动整个社区进步。 就在近日&#xff0c;AI 社区迎来首个统一的全链条贯穿的大模型开源体系。 虽然社区有LLaMA等影响力较大…

链表二 链表常见算法题

目录 ListNode代码 链表中倒数最后k个结点 描述 示例1 示例2 解题思路 AC代码 不借助指针 快慢指针 删除链表中重复的结点 描述 示例1 示例2 解题思路 AC代码 可测试运行代码(需要ListNode代码&#xff0c;在文章开头)&#xff1a; 运行结果 ​​​​​​​链表…