RocketMQ5.0消息消费<一> _ PUSH模式的消息拉取

news2024/12/23 15:05:16

RocketMQ5.0消息消费<一> _ PUSH模式的消息拉取

一、消息消费概述

消息消费以组的模式开展,一个消费组内有多个消费者,每一个消费者可订阅多个主题,消费组之间有两种消费模式:集群模式(默认)、广播模式:

  • 集群模式(默认):主题下的同一条消息只允许被相同消费组的其中一个消费者消费。消费偏移量存储在Broker端。
  • 广播模式:主题下的同一条消息将被集群内的所有消费者消费。消费偏移量存储在消费端。

消息服务器与消费者之间的消息传送有两种模式:推模式(默认)、拉模式。RocketMQ消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。

  • PUSH推模式(默认):消息到达Broker后,推送给消息消费者。
  • PULL拉模式:消费者主动发起拉消息请求。

RocketMQ支持局部顺序消息消费,即保证同一个消息队列上的消息顺序消费。不支持消息全局顺序消费, 若实现某一主题的全局顺序消息消费,可以将该主题的消费队列数设置为1 ,牺牲高可用性。

RocketMQ支持两种消息过滤模式:表达式(TAG、SQL92),类过滤模式。

消息拉模式主要是由客户端调用消息拉取API,而消息推模式是消息服务器Broker主动将消息推送到消息消费者,本章节介绍推模式下消息消费原理。

在这里插入图片描述
二、消费者启动流程

1. 订阅主题

如下所示,是消费者启动前的订阅主题信息代码。

// consumer.subscribe(TOPIC, "TAG_ORDER || TAG_ALL"); // TAG模式过滤消息,多个TAG用||隔开
consumer.subscribe(TOPIC, MessageSelector.bySql("(orderId > 5)")); // SQL表达式过滤信息

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#subscribe(java.lang.String, org.apache.rocketmq.client.consumer.MessageSelector)根据消息选择器订阅消息的方法,其调用链及代码如下。注意事项:

  • 根据订阅主题构建订阅信息{@link SubscriptionData};
  • 主题订阅信息添加到负载均衡中,后创建拉取消息任务。
    在这里插入图片描述
/**
 * 订阅topic
 * step1:构建订阅信息与消息过滤表达式{@link SubscriptionData}
 * step2: 放入订阅信息负载均衡实现
 */
public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
    try {
        if (messageSelector == null) {
            subscribe(topic, SubscriptionData.SUB_ALL);
            return;
        }
 
        // 构建订阅信息
        SubscriptionData subscriptionData = FilterAPI.build(topic,
            messageSelector.getExpression(), messageSelector.getExpressionType());
 
        // 订阅添加到负载均衡中,后创建拉取消息任务
        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
        if (this.mQClientFactory != null) {
            // 加锁定时发送到所有Broker(类过滤源代码发送到过滤服务器上)
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}

2. 消费者启动

下图是消费者UML图。org.apache.rocketmq.client.consumer.DefaultMQPushConsumer是PUSH模式消费者实现类,该类的关键属性如下代码所示。
在这里插入图片描述

// 消费组
private String consumerGroup;
/**
 * 消费组下的不同消费模式:
 * 集群模式:默认,topic的同一消息只能被组内的一个消费者消费
 * 广播模式:topic的同一消息被组内的所有消费者消费
 */
private MessageModel messageModel = MessageModel.CLUSTERING;
 
/**
 * 消费策略
 * CONSUME_FROM_LAST_OFFSET:从最新的消息开始消费(默认)
 * CONSUME_FROM_FIRST_OFFSET:从最早的消息开始消费
 * CONSUME_FROM_TIMESTAMP:从消费者启动时间戳开始消费
 */
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
// 集群模式下消费队列的负载均衡策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
// 订阅主题信息
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
// 消息业务监听器 
private MessageListener messageListener;
// 消费进度存储器
private OffsetStore offsetStore;
// 最小消费者线程数量
private int consumeThreadMin = 20;
// 最大消费者线程数量
private int consumeThreadMax = 20;
 
/**
 * 消费队列中消息最大跨度:最大消息的偏移量 - 最小消息的偏移量
 * 跨度默认2000,超出时延迟50ms再拉取消息
 */
private int consumeConcurrentlyMaxSpan = 2000;
// 消息拉取间隔时间
private long pullInterval = 0;
// 并发消费时一次消费消息条数
private int consumeMessageBatchMaxSize = 1;
// 每次拉取消息所拉取的消息条数,默认32
private int pullBatchSize = 32;
// 每次拉取消息时是否更新订阅信息,默认false
private boolean postSubscriptionWhenPull = false;
// 最大消费重试次数
private int maxReconsumeTimes = -1;
// 消息延迟到消费线程的时间,默认1s
private long suspendCurrentQueueTimeMillis = 1000;
// 消费超时时间,默认15min
private long consumeTimeout = 15;

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start是消费者启动的核心逻辑方法,而且是个synchronized修饰的方法,避免一个MQClientInstance实例下多个消费者同时启动,如下代码所示。注意事项:

  • 获取主题订阅信息,即:构建{@link SubscriptionData}。订阅关系的来源:

    a. 消费启动时订阅主题,即:DefaultMQPushConsumerImpl#subscribe();

    b. 订阅重试主题,即:主题为:%RETRY% + 消费组名称,不是消费者订阅的主题。

  • 初始化消费进度offsetStore。广播模式:消费进度存储在消费端;集群模式:消费进度存储在Broker端。

  • 创建消息消费线程池并启动consumeMessageService,负责消费消息:顺序消息、并发消息(默认)。

  • 启动MQClientInstance实例时,会启动拉取消息服务{@link PullMessageService} +
    启动消费者负载均衡{@link RebalanceService}。

/**
 * 消费者启动的核心方法
 * step1:消费者默认状态CREATE_JUST,查看{@link DefaultMQPushConsumerImpl#serviceState}属性参数
 * step2:检查配置信息,如:消费组、消费模式、主题订阅信息、消费偏移量等内容
 * step3:获取主题订阅信息:构建{@link SubscriptionData},主题的负载均衡
 *       订阅关系的来源:
 *          a. 消费启动时订阅主题,即:DefaultMQPushConsumerImpl#subscribe()
 *          b. 订阅重试主题,即:主题为:%RETRY% + 消费组名称,不是消费者订阅的主题
 * step4:初始化:MQClientInstance + 消息重新负载实现类 + 消费进度
 *        广播模式:消费进度存储在消费端
 *        集群模式:消费进度存储在Broker端
 * step5:创建消息消费线程池并启动,负责消费消息:顺序消息、并发消息(默认)
 * step6:向MQClientInstance注册消费者并启动MQClientInstance
 *        注意:同一个JVM所有生产者、消费者共同持有一个相同MQClientInstance,并只会启动一次
 * step7:启动MQClientInstance实例,会启动拉取消息服务{@link PullMessageService} + 启动消费者负载均衡{@link RebalanceService}
 */
public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;
 
            // 检查配置信息,如:消费组、消费模式、主题订阅信息、消费偏移量等内容
            this.checkConfig();
 
            /*
             * 获取主题订阅信息:构建{@link SubscriptionData},主题的负载均衡
             * 订阅关系的来源:
             *      a. 消费启动时订阅主题,即:DefaultMQPushConsumerImpl#subscribe()
             *      b. 订阅重试主题,即:主题为:%RETRY% + 消费组名称,不是消费者订阅的主题
             */
            this.copySubscription();
 
            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }
 
            // 初始化MQClientInstance
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
 
            // 初始化消息重新负载实现类
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
 
            if (this.pullAPIWrapper == null) {
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            }
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
 
            // 初始化消费进度
            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    // 广播模式,存储在消费端
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    // 集群
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            this.offsetStore.load();
 
            // 创建消息消费线程池并启动,负责消费消息
            // 顺序消息
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService =
                    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                //POPTODO reuse Executor ?
                this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
            }
            // 并发消息
            else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService =
                    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                //POPTODO reuse Executor ?
                this.consumeMessagePopService =
                    new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
            }
 
            this.consumeMessageService.start();
            // POPTODO
            this.consumeMessagePopService.start();
 
            /*
                向MQClientInstance注册消费者并启动MQClientInstance
                注意:同一个JVM所有生产者、消费者共同持有一个相同MQClientInstance,并启动一次
             */
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                // 注册失败:改变状态、关闭消费线程
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }
 
            // 启动MQClientInstance
            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
 
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    this.mQClientFactory.checkClientInBroker();
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    this.mQClientFactory.rebalanceImmediately();
}

三、PUSH模式的消息拉取机制

如下图所示,是PUSH模式的消息拉取流程图。RocketMQ没有真正实现推模式,而是消费者主动向Broker拉取消息,RocketMQ推模式是消费者循环向Broker端发送消息拉取请求。
在这里插入图片描述
消费端拉取的消息存储在org.apache.rocketmq.client.impl.consumer.ProcessQueue类中,该类是消费端重现Broker的消费队列的快照(拉取消息的存放在msgTreeMap) ,其关键属性如下。

// 重入读写锁,控制并发修改msgTreeMap、consumingMsgOrderlyTreeMap
private final ReadWriteLock treeMapLock = new ReentrantReadWriteLock();
// 消息容器(拉取消息存储属性)
private final TreeMap<Long/* 消息在ConsumeQueue中的偏移量 */, MessageExt /* 消息内容 */> msgTreeMap = new TreeMap<Long, MessageExt>();
// msgTreeMap中的消息数量
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
private final Lock consumeLock = new ReentrantLock();
/**
 * 处理顺序消息使用,从{@link ProcessQueue#msgTreeMap}取出时,将其存入该参数中
 * A subset of msgTreeMap, will only be used when orderly consume
 */
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
// msgTreeMap中最大队列偏移量
private volatile long queueOffsetMax = 0L;
// 当前对象是否被丢弃
private volatile boolean dropped = false;
// 上次消息拉取时间戳
private volatile long lastPullTimestamp = System.currentTimeMillis();
// 消息消息消费时间戳
private volatile long lastConsumeTimestamp = System.currentTimeMillis();
// 顺序消息时,待处理队列是否被锁定
private volatile boolean locked = false;

org.apache.rocketmq.client.impl.consumer.PullRequest拉取请求对象,关键属性如下。

// 消费组
private String consumerGroup;
// 待拉取消费队列
private MessageQueue messageQueue;
// 消息处理队列
private ProcessQueue processQueue;
// 待拉取的messageQueue的偏移量
private long nextOffset;
// 是否锁定
private boolean previouslyLocked = false;

1. 消费端拉取消息

MQClientInstance实例启动时,会启动拉取消息服务org.apache.rocketmq.client.impl.consumer.PullMessageService,该类是一个线程,负责从broker拉取消息并提交到线程池消费。下图是run()方法的调用链。
在这里插入图片描述
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage是拉取消息的核心逻辑方法,代码如下所示。注意事项:

  • org.apache.rocketmq.client.impl.consumer.ProcessQueue:该类是Broker的消费队列的重现和快照(消息的存放),

a. 默认从broker每次拉取32条消息,按队列偏移量存入ProcessQueue;

b. PullMessageService维护一个线程池,将拉取请求任务提交到线程池;

c. 消息消费成功后,从ProcessQueue中移除

  • org.apache.rocketmq.client.consumer.PullCallback:定义拉取消息成功或失败时的处理逻辑。根据拉取消息结果状态,作不同处理。其中拉取结果为FOUND时的处理:

step1:获取下一次拉取的偏移量,矫正后的偏移量;
step2:本次拉取消息为空,则立即执行下次拉取(TAG具体内容过滤导致可能为空);
step3:拉取的消息,存放到ProcessQueue.msgTreeMap中;
step4:本次拉取消息,提交到ConsumeMessageService,供消费者消费(异步提交),本次拉取消息完成。

/**
 * 拉取消息
 * step1:消息处理队列是否被丢弃{@link ProcessQueue};
 * step2:检查当前消费者状态:消费者是否被挂起;
 * step3:拉取消息流控:消息总条数、消息总大小、消息最大/最小间隔等流控,并每1000次打印流控信息;
 * step4:构建消息拉取的sysFlag;
 * step5:从Broker服务器拉取消息{@link PullAPIWrapper#pullKernelImpl};
 * step6:定义拉取成功后处理,即:异步拉取回调函数{@link PullCallback};
 *        异步回调函数{@link PullCallback}把拉取的消息提交消费消息{@link ConsumeMessageService#submitConsumeRequest)}
 * @param pullRequest 消息拉取请求{@link PullRequest}
 */
public void pullMessage(final PullRequest pullRequest) {
    // 获取消息处理队列
    final ProcessQueue processQueue = pullRequest.getProcessQueue();
    // 该处理队列被丢弃,则返回
    if (processQueue.isDropped()) {
        log.info("the pull request[{}] is dropped.", pullRequest.toString());
        return;
    }
 
    // 本次拉取时间设置到上次拉取时间戳
    pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
 
    try {
        // 消费者状态
        this.makeSureStateOK();
    } catch (MQClientException e) {
        log.warn("pullMessage exception, consumer state not ok", e);
        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        return;
    }
 
    // 消费者被挂起,则本次延迟1s再放入拉取任务队列中
    if (this.isPause()) {
        log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
        return;
    }
 
    long cachedMessageCount = processQueue.getMsgCount().get(); // 消息总条数
    long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); // 消息总大小
 
    // 消息总条数 > 阈值1000条,则流控,忽略本次拉取,延迟50ms下次拉取
    if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        // 流控次数每1000次,打印流控日志
        if ((queueFlowControlTimes++ % 1000) == 0) {
            log.warn(
                "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
        }
        return;
    }
 
    // 消息总大小 > 阈值100MB,则流控,忽略本次拉取,延迟50ms下次拉取
    if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        if ((queueFlowControlTimes++ % 1000) == 0) {
            log.warn(
                "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
        }
        return;
    }
 
    // 并发消息
    if (!this.consumeOrderly) {
        // 消息跨度 > 阈值2000条,则流控,忽略本次拉取,延迟50ms下次拉取
        if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                    processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                    pullRequest, queueMaxSpanFlowControlTimes);
            }
            return;
        }
    }
    // 顺序消息
    else {
        // 处理队列被锁住
        if (processQueue.isLocked()) {
            // 拉取请求是否锁定,默认不锁定false
            if (!pullRequest.isPreviouslyLocked()) {
                long offset = -1L;
                try {
                    // 获取消费队列的消费进度,若进度<0时,则根据配置矫正消费进度(DefaultMQPushConsumer.consumeFromWhere配置)
                    offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
                    if (offset < 0) {
                        throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset);
                    }
                } catch (Exception e) {
                    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                    log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
                    return;
                }
                boolean brokerBusy = offset < pullRequest.getNextOffset();
                log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                    pullRequest, offset, brokerBusy);
                if (brokerBusy) {
                    log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                        pullRequest, offset);
                }
 
                pullRequest.setPreviouslyLocked(true);
                pullRequest.setNextOffset(offset);
            }
        }
        // 处理队列未被锁住,则延迟
        else {
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            log.info("pull message later because not locked in broker, {}", pullRequest);
            return;
        }
    }
 
    // 获取topic订阅信息,为空,则忽略本次拉取,延迟3s下次拉取
    final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
    if (null == subscriptionData) {
        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        log.warn("find the consumer's subscription failed, {}", pullRequest);
        return;
    }
 
    final long beginTimestamp = System.currentTimeMillis();
 
    // 拉取消息成功后处理(内部实现类)
    PullCallback pullCallback = new PullCallback() {
        @Override
        public void onSuccess(PullResult pullResult) {
            if (pullResult != null) {
                // 消息字节数组填充msgList;TAG具体内容过滤
                pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                    subscriptionData);
 
                // 根据拉取消息结果状态,作不同处理
                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        long prevRequestOffset = pullRequest.getNextOffset();
                        // 获取下一次拉取的偏移量,矫正后的偏移量
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                        long pullRT = System.currentTimeMillis() - beginTimestamp;
                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                            pullRequest.getMessageQueue().getTopic(), pullRT);
 
                        long firstMsgOffset = Long.MAX_VALUE;
                        // 本次拉取消息为空,则立即执行下次拉取(TAG具体内容过滤导致可能为空)
                        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        } else {
                            firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
 
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
 
                            // 拉取的消息,存放到ProcessQueue.msgTreeMap中
                            boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                            // 本次拉取消息,提交到ConsumeMessageService,供消费者消费(异步提交),本次拉取消息完成
                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                pullResult.getMsgFoundList(),
                                processQueue,
                                pullRequest.getMessageQueue(),
                                dispatchToConsume);
 
                            // 判断两次拉取时间间隔,执行下一次拉取消息
                            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                    DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                            } else {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            }
                        }
 
                        if (pullResult.getNextBeginOffset() < prevRequestOffset
                            || firstMsgOffset < prevRequestOffset) {
                            log.warn(
                                "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                pullResult.getNextBeginOffset(),
                                firstMsgOffset,
                                prevRequestOffset);
                        }
 
                        break;
                    case NO_NEW_MSG: // 没有新消息
                    case NO_MATCHED_MSG: // 没有匹配的消息
                        // 服务器broker矫正的nextBeginOffset作为下一次拉取的偏移量
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
 
                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
 
                        // 立即拉取下一轮消息
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        break;
                    case OFFSET_ILLEGAL: // offset不合规
                        log.warn("the pull request offset illegal, {} {}",
                            pullRequest.toString(), pullResult.toString());
                        // 服务器broker矫正的nextBeginOffset作为下一次拉取的偏移量
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
 
                        // 丢弃ProcessQueue,其中里面的消息立即停止消费
                        pullRequest.getProcessQueue().setDropped(true);
                        DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
 
                            @Override
                            public void run() {
                                try {
                                    // 更新消费者消费进度
                                    DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                        pullRequest.getNextOffset(), false);
 
                                    // 持久化消费进度
                                    DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
 
                                    // 当前消费队列从负载均衡RebalanceImpl移除
                                    DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
 
                                    log.warn("fix the pull request offset, {}", pullRequest);
                                } catch (Throwable e) {
                                    log.error("executeTaskLater Exception", e);
                                }
                            }
                        }, 10000);
                        break;
                    default:
                        break;
                }
            }
        }
 
        @Override
        public void onException(Throwable e) {
            if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                log.warn("execute the pull request exception", e);
            }
 
            // 延迟拉取消息
            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }
    };
 
    // 构建消息拉取的sysFlag
    boolean commitOffsetEnable = false;
    long commitOffsetValue = 0L;
    if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
        commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
        if (commitOffsetValue > 0) {
            commitOffsetEnable = true;
        }
    }
 
    String subExpression = null;
    boolean classFilter = false;
    SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
    if (sd != null) {
        if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
            subExpression = sd.getSubString();
        }
 
        classFilter = sd.isClassFilterMode();
    }
 
    int sysFlag = PullSysFlag.buildSysFlag(
        commitOffsetEnable, // commitOffset
        true, // suspend 没有找到消息时,是否挂起broker标记
        subExpression != null, // subscription 表达式过滤
        classFilter // class filter 类过滤消息模式
    );
    try {
        // 从broker服务端拉取消息
        this.pullAPIWrapper.pullKernelImpl(
            pullRequest.getMessageQueue(),
            subExpression,
            subscriptionData.getExpressionType(),
            subscriptionData.getSubVersion(),
            pullRequest.getNextOffset(),
            this.defaultMQPushConsumer.getPullBatchSize(),
            this.defaultMQPushConsumer.getPullBatchSizeInBytes(),
            sysFlag,
            commitOffsetValue,
            // broker未找到消息时,broker配置了开启长轮询模式
            BROKER_SUSPEND_MAX_TIME_MILLIS, // broker挂起超时时间,PUSH模式默认15s,PULL模式默认20s
            CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // broker挂起时,消费者超时时间,默认30s
            CommunicationMode.ASYNC,
            pullCallback
        );
    } catch (Exception e) {
        log.error("pullKernelImpl exception", e);
        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    }
}

org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl从broker服务端拉取消息执行方法,代码如下。

/**
 * 从broker服务端拉取消息
 * step1:从本地MQClientInstance中根据消费队列mq获取brokerName、brokerId来查找broker信息;
 * step2:本地没有broker信息,则从NameServer注册中心获取;
 * step3:组装拉取请求头{@link PullMessageRequestHeader};
 * step4:过滤模式是类模式,则从broker获取FilterServer,从FilterServer上拉取消息;
 * step5:执行拉取消息,并返回。
 * Broker端处理拉取请求:
 * {@link org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest}
 * @param mq 从哪个消费队列拉取消息
 * @param subExpression 消息过滤表达式
 * @param expressionType 过滤表达式类型:TAG、SQL92
 * @param subVersion 订阅版本
 * @param offset 消息拉取偏移量
 * @param maxNums 最大拉取消息数量,默认32
 * @param maxSizeInBytes 最大字节数
 * @param sysFlag 拉取系统标记
 * @param commitOffset 当前MessageQueue的消费进度(内存中)
 * @param brokerSuspendMaxTimeMillis 允许Broker挂起时间,默认15s
 * @param timeoutMillis 拉取超时时间
 * @param communicationMode 拉取模式,默认异步拉取
 * @param pullCallback 拉取回调函数
 * @return 拉取结果
 */
public PullResult pullKernelImpl(
    final MessageQueue mq,
    final String subExpression,
    final String expressionType,
    final long subVersion,
    final long offset,
    final int maxNums,
    final int maxSizeInBytes,
    final int sysFlag,
    final long commitOffset,
    final long brokerSuspendMaxTimeMillis,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 从本地MQClientInstance中根据消费队列mq获取brokerName、brokerId来查找broker信息
    FindBrokerResult findBrokerResult =
        this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq),
            this.recalculatePullFromWhichNode(mq), false);
    // 本地没有,则根据topic从NameServer(Broker注册中心)
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq),
                this.recalculatePullFromWhichNode(mq), false);
    }
 
 
    if (findBrokerResult != null) {
        {
            // check version
            if (!ExpressionType.isTagType(expressionType)
                && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                    + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
            }
        }
        int sysFlagInner = sysFlag;
 
        // broker是从,则sysFlagInner清除FLAG_COMMIT_OFFSET标记
        if (findBrokerResult.isSlave()) {
            sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
        }
 
        // 组装拉取请求头
        PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
        requestHeader.setConsumerGroup(this.consumerGroup);
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setQueueOffset(offset);
        requestHeader.setMaxMsgNums(maxNums);
        requestHeader.setSysFlag(sysFlagInner);
        requestHeader.setCommitOffset(commitOffset);
        requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
        requestHeader.setSubscription(subExpression);
        requestHeader.setSubVersion(subVersion);
        requestHeader.setMaxMsgBytes(maxSizeInBytes);
        requestHeader.setExpressionType(expressionType);
        requestHeader.setBname(mq.getBrokerName());
 
        // 过滤模式是类模式,则从broker获取FilterServer,从FilterServer上拉取消息
        String brokerAddr = findBrokerResult.getBrokerAddr();
        if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
            // 随机获取Broker上的过滤服务器注册表中的一个过滤服务器
            brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
        }
 
        // 执行拉取消息,并返回结果
        PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
            brokerAddr,
            requestHeader,
            timeoutMillis,
            communicationMode,
            pullCallback);
 
        return pullResult;
    }
 
    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

2. Broker处理拉取请求
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest是Broker处理拉取消息请求的核心逻辑方法,代码如下。注意事项:

  • org.apache.rocketmq.store.MessageStore#getMessage(java.lang.String,
    java.lang.String, int, long, int,
    org.apache.rocketmq.store.MessageFilter):查找消息,默认一次查找32条记录(消费端定义)。

  • Broker是否开启slaveReadEnable:参数决定下次拉取使用从消息队列(主从Broker切换的逻辑代码)。

  • 没有找到消息时,broker是否开启长轮询查找消息(默认true):
    org.apache.rocketmq.broker.processor.DefaultPullMessageResultHandler#handle,即:没有找到消息时,挂起拉取请求,监听是否有消息到达,若有则重启挂起的请求。

/**
 * 处理客户端拉取消息请求
 * 消费者拉取消息入口请求broker:{@link org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage}
 * step1:broker是否读权限、订阅信息是否存在、topic存在且允许读
 * step2:消息队列是否合规、有表达式过滤消息
 * step3:查找消息{@link DefaultMessageStore#getMessage
 * step4:设置nextBeginOffset、minOffset、maxOffset等
 * step5:查找消息结果 转换成 拉取响应状态码
 * step6:建议下次拉取使用从消息队列
 * step7:消息消费函数钩子
 * step8:更新消息消费进度
 * step9:处理拉取请求,并响应,如:没有找到消息时,broker是否开启长轮询查找消息 {@link DefaultPullMessageResultHandler#handle}
 * @param channel {@link Channel}
 * @param request 客户端消息拉取请求
 * @param brokerAllowSuspend broker是否允许挂起
 * @return
 * @throws RemotingCommandException
 */
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
    throws RemotingCommandException {
    RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
    final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
    // 解码获取拉取消息请求头
    final PullMessageRequestHeader requestHeader =
        (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
 
    response.setOpaque(request.getOpaque());
 
    LOGGER.debug("receive PullMessage request command, {}", request);
 
    // broker是否具有读权限
    if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
        return response;
    }
 
    // 简单拉取模式下,是否开启
    if (request.getCode() == RequestCode.LITE_PULL_MESSAGE && !this.brokerController.getBrokerConfig().isLitePullMessageEnable()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] for lite pull consumer is forbidden");
        return response;
    }
 
    // 订阅信息是否存在
    SubscriptionGroupConfig subscriptionGroupConfig =
        this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
    if (null == subscriptionGroupConfig) {
        response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
        response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
        return response;
    }
 
    // 订阅信息存在,是否允许消费
    if (!subscriptionGroupConfig.isConsumeEnable()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        responseHeader.setForbiddenType(ForbiddenType.GROUP_FORBIDDEN);
        response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
        return response;
    }
 
    final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
    final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
 
    // topic配置
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    if (null == topicConfig) {
        LOGGER.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
        return response;
    }
 
    // topic存在,是否允许读
    if (!PermName.isReadable(topicConfig.getPerm())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        responseHeader.setForbiddenType(ForbiddenType.TOPIC_FORBIDDEN);
        response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
        return response;
    }
 
    TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
 
    {
        RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
        if (rewriteResult != null) {
            return rewriteResult;
        }
    }
 
    // 消费队列是否合规
    if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
        String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
            requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
        LOGGER.warn(errorInfo);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorInfo);
        return response;
    }
 
    SubscriptionData subscriptionData = null;
    ConsumerFilterData consumerFilterData = null;
    // 有表达式过滤消息
    if (hasSubscriptionFlag) {
        try {
            subscriptionData = FilterAPI.build(
                requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
            );
            // 不是TAG模式,即:SQL2过滤
            if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                // 构建过滤数据
                consumerFilterData = ConsumerFilterManager.build(
                    requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
                    requestHeader.getExpressionType(), requestHeader.getSubVersion()
                );
                assert consumerFilterData != null;
            }
        } catch (Exception e) {
            LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
                requestHeader.getConsumerGroup());
            response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
            response.setRemark("parse the consumer's subscription failed");
            return response;
        }
    } else {
        ConsumerGroupInfo consumerGroupInfo =
            this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
        if (null == consumerGroupInfo) {
            LOGGER.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
            response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
            return response;
        }
 
        if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
            && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
            response.setCode(ResponseCode.NO_PERMISSION);
            responseHeader.setForbiddenType(ForbiddenType.BROADCASTING_DISABLE_FORBIDDEN);
            response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
            return response;
        }
 
        boolean readForbidden = this.brokerController.getSubscriptionGroupManager().getForbidden(//
            subscriptionGroupConfig.getGroupName(), requestHeader.getTopic(), PermName.INDEX_PERM_READ);
        if (readForbidden) {
            response.setCode(ResponseCode.NO_PERMISSION);
            responseHeader.setForbiddenType(ForbiddenType.SUBSCRIPTION_FORBIDDEN);
            response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] is forbidden for topic[" + requestHeader.getTopic() + "]");
            return response;
        }
 
        subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
        if (null == subscriptionData) {
            LOGGER.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
            response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
            return response;
        }
 
        if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
            LOGGER.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
                subscriptionData.getSubString());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
            response.setRemark("the consumer's subscription not latest");
            return response;
        }
        if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
            consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
                requestHeader.getConsumerGroup());
            if (consumerFilterData == null) {
                response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
                response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
                return response;
            }
            if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
                LOGGER.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
                    requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
                response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
                response.setRemark("the consumer's consumer filter data not latest");
                return response;
            }
        }
    }
 
    if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
        && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
        return response;
    }
 
    // 消息过滤
    MessageFilter messageFilter;
    // 支持重试消息过滤
    if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
        messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
            this.brokerController.getConsumerFilterManager());
    }
    // 不支持重试消息过滤,即:若是重试消息,则isMatchedByCommitLog直接返回true
    else {
        messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
            this.brokerController.getConsumerFilterManager());
    }
 
    // 查找消息
    final GetMessageResult getMessageResult =
        this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
            requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
    if (getMessageResult != null) {
        // 设置下次拉取的偏移量
        response.setRemark(getMessageResult.getStatus().name());
        responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
        responseHeader.setMinOffset(getMessageResult.getMinOffset());
        // this does not need to be modified since it's not an accurate value under logical queue.
        responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
        responseHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
        responseHeader.setGroupSysFlag(subscriptionGroupConfig.getGroupSysFlag());
 
        // 查找消息结果转换成拉取响应状态码
        switch (getMessageResult.getStatus()) {
            case FOUND:
                response.setCode(ResponseCode.SUCCESS);
                break;
            case MESSAGE_WAS_REMOVING:
                response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                break;
            case NO_MATCHED_LOGIC_QUEUE:
            case NO_MESSAGE_IN_QUEUE:
                if (0 != requestHeader.getQueueOffset()) {
                    response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                    // XXX: warn and notify me
                    LOGGER.info("the broker stores no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
                        requestHeader.getQueueOffset(),
                        getMessageResult.getNextBeginOffset(),
                        requestHeader.getTopic(),
                        requestHeader.getQueueId(),
                        requestHeader.getConsumerGroup()
                    );
                } else {
                    response.setCode(ResponseCode.PULL_NOT_FOUND);
                }
                break;
            case NO_MATCHED_MESSAGE:
                response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                break;
            case OFFSET_FOUND_NULL:
                response.setCode(ResponseCode.PULL_NOT_FOUND);
                break;
            case OFFSET_OVERFLOW_BADLY:
                response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                // XXX: warn and notify me
                LOGGER.info("the request offset: {} over flow badly, fix to {}, broker max offset: {}, consumer: {}",
                    requestHeader.getQueueOffset(), getMessageResult.getNextBeginOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
                break;
            case OFFSET_OVERFLOW_ONE:
                response.setCode(ResponseCode.PULL_NOT_FOUND);
                break;
            case OFFSET_TOO_SMALL:
                response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                LOGGER.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
                    requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
                    getMessageResult.getMinOffset(), channel.remoteAddress());
                break;
            default:
                assert false;
                break;
        }
 
        // 建议使用从消费队列
        if (this.brokerController.getBrokerConfig().isSlaveReadEnable() && !this.brokerController.getBrokerConfig().isInBrokerContainer()) {
            // consume too slow ,redirect to another machine
            // 主Broker繁忙,建议下一次到从Broker拉取
            if (getMessageResult.isSuggestPullingFromSlave()) {
                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
            }
            // consume ok
            else {
                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
            }
        } else {
            responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
        }
 
        if (this.brokerController.getBrokerConfig().getBrokerId() != MixAll.MASTER_ID && !getMessageResult.isSuggestPullingFromSlave()) {
            if (this.brokerController.getMinBrokerIdInGroup() == MixAll.MASTER_ID) {
                LOGGER.debug("slave redirect pullRequest to master, topic: {}, queueId: {}, consumer group: {}, next: {}, min: {}, max: {}",
                    requestHeader.getTopic(),
                    requestHeader.getQueueId(),
                    requestHeader.getConsumerGroup(),
                    responseHeader.getNextBeginOffset(),
                    responseHeader.getMinOffset(),
                    responseHeader.getMaxOffset()
                );
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                if (!getMessageResult.getStatus().equals(GetMessageStatus.FOUND)) {
                    response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                }
            }
        }
 
        // 消费消息钩子函数
        if (this.hasConsumeMessageHook()) {
            String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
            String authType = request.getExtFields().get(BrokerStatsManager.ACCOUNT_AUTH_TYPE);
            String ownerParent = request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_PARENT);
            String ownerSelf = request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_SELF);
 
            ConsumeMessageContext context = new ConsumeMessageContext();
            context.setConsumerGroup(requestHeader.getConsumerGroup());
            context.setTopic(requestHeader.getTopic());
            context.setQueueId(requestHeader.getQueueId());
            context.setAccountAuthType(authType);
            context.setAccountOwnerParent(ownerParent);
            context.setAccountOwnerSelf(ownerSelf);
            context.setNamespace(NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic()));
 
            switch (response.getCode()) {
                case ResponseCode.SUCCESS:
                    int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
                    int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
 
                    context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
                    context.setCommercialRcvTimes(incValue);
                    context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
                    context.setCommercialOwner(owner);
 
                    context.setRcvStat(BrokerStatsManager.StatsType.RCV_SUCCESS);
                    context.setRcvMsgNum(getMessageResult.getMessageCount());
                    context.setRcvMsgSize(getMessageResult.getBufferTotalSize());
                    context.setCommercialRcvMsgNum(getMessageResult.getMsgCount4Commercial());
 
                    break;
                case ResponseCode.PULL_NOT_FOUND:
                    if (!brokerAllowSuspend) {
 
                        context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
                        context.setCommercialRcvTimes(1);
                        context.setCommercialOwner(owner);
 
                        context.setRcvStat(BrokerStatsManager.StatsType.RCV_EPOLLS);
                        context.setRcvMsgNum(0);
                        context.setRcvMsgSize(0);
                        context.setCommercialRcvMsgNum(0);
                    }
                    break;
                case ResponseCode.PULL_RETRY_IMMEDIATELY:
                case ResponseCode.PULL_OFFSET_MOVED:
                    context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
                    context.setCommercialRcvTimes(1);
                    context.setCommercialOwner(owner);
 
                    context.setRcvStat(BrokerStatsManager.StatsType.RCV_EPOLLS);
                    context.setRcvMsgNum(0);
                    context.setRcvMsgSize(0);
                    context.setCommercialRcvMsgNum(0);
                    break;
                default:
                    assert false;
                    break;
            }
 
            try {
                this.executeConsumeMessageHookBefore(context);
            } catch (AbortProcessException e) {
                response.setCode(e.getResponseCode());
                response.setRemark(e.getErrorMessage());
                return response;
            }
        }
 
        //rewrite the response for the
        RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode());
        if (rewriteResult != null) {
            response = rewriteResult;
        }
 
        // 处理拉取请求,并响应,如:没有找到消息时,broker是否开启长轮询查找消息
        response = this.pullMessageResultHandler.handle(
            getMessageResult,
            request,
            requestHeader,
            channel,
            subscriptionData,
            subscriptionGroupConfig,
            brokerAllowSuspend,
            messageFilter,
            response
        );
    } else {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("store getMessage return null");
    }
 
    // 允许broker拉取消息挂起 + 有commitlog标记时:更新消息消费进度
    boolean storeOffsetEnable = brokerAllowSuspend;
    storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
    if (storeOffsetEnable) {
        // 更新消息消费进度
        this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
            requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
    }
    return response;
}

Broker端处理拉取消息后,响应到消费端,根据消费端发送拉取消息请求(详细见上小节)时,定义的org.apache.rocketmq.client.consumer.PullCallback处理拉取的消息,提交到ConsumeMessageService,供消费者消费(异步提交),本次拉取消息完成。消息消费参考《RocketMQ5.0.0消息消费<三> _ 消息消费》。

四、消息拉取长轮询机制

通过上述两小节介绍,RocketMQ推模式是消费者循环向Broker端发送消息拉取请求,如果消费者向Broker发送消息拉取时,此时消息并未到达消费队列,是否开启长轮询机制做不同处理。长轮询模式使得消息拉取能实现准实时。

  • 不启用长轮询模式:会在服务端等待shortPollingTimeMills时间后(挂起)再去判断消息是否已到达消息队列,如果消息未到达则提示消息拉取客户端
    PULL_NOT_FOUND(消息不存在);
  • 开启长轮询模式(默认):RocketMQ一方面会每5s轮询检查一次消息是否到达,与此同时有新消息到达后立马通知挂起线程再次验证新消息是否是自己感兴趣的消息,则从commitlog文件提取消息返回给消息消费者,否则直到挂起超时。

挂起超时时间由消费者在消息拉取时封装在请求参数中,PUSH模式默认为15s,PULL模式DefaultMQPullConsumer#brokerSuspendMaxTimeMillis设置。

RocketMQ长轮询机制由两个线程共同来完成:

  • PullRequestHoldService线程:消费者拉取消息未找到消息时,Broker挂起每个拉取消息的请求,并添加到该线程中,每隔5s重试再次拉取消息;

  • DefaultMessageStore.ReputMessageService线程:新消息执行异步转发(新消息构建消费队列)时,主动唤醒消费者的拉取请求,沉睡1ms继续下一次检查。

1. PullRequestHoldService线程

1):拉取请求添加到PullRequestHoldService线程

参考第二节《PUSH模式的消息拉取机制#Broker处理拉取请求》,org.apache.rocketmq.broker.processor.DefaultPullMessageResultHandler#handle处理拉取请求实现类,其状态ResponseCode.PULL_NOT_FOUND(未找到消息)时的处理逻辑,代码如下。

@Override
public RemotingCommand handle(final GetMessageResult getMessageResult,
                              final RemotingCommand request,
                              final PullMessageRequestHeader requestHeader,
                              final Channel channel,
                              final SubscriptionData subscriptionData,
                              final SubscriptionGroupConfig subscriptionGroupConfig,
                              final boolean brokerAllowSuspend,
                              final MessageFilter messageFilter,
                              RemotingCommand response) {
 
    final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
 
    switch (response.getCode()) {
        case ResponseCode.SUCCESS:
            ......
        case ResponseCode.PULL_NOT_FOUND: // 没有找到消息
            // 消费者拉取请求的是否有挂起标志
            final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
            // 消费者拉取请求的Broker挂起超时时间,默认15s
            final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
 
            // Broker是否支持挂起 + 消费者拉取请求的是否有挂起标志
            if (brokerAllowSuspend && hasSuspendFlag) {
                // Broker挂起超时时间,默认15s
                long pollingTimeMills = suspendTimeoutMillisLong;
                // Broker没有开启长轮询挂起模式,默认超时1s
                if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                }
 
                String topic = requestHeader.getTopic();
                long offset = requestHeader.getQueueOffset();
                int queueId = requestHeader.getQueueId();
                // 再次组装消息拉取请求
                PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                        this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                // 提交到PullRequestHoldService线程
                this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                // 响应设置null
                return null;
            }
        case ResponseCode.PULL_RETRY_IMMEDIATELY:
            break;
        case ResponseCode.PULL_OFFSET_MOVED:
            ......
 
            break;
        default:
            log.warn("[BUG] impossible result code of get message: {}", response.getCode());
            assert false;
    }
 
    return response;
}

2):PullRequestHoldService线程周期执行

PullRequestHoldService该线程完成消费者拉取消息未找到消息时,Broker挂起每个拉取消息的请求,并添加到该线程中,每隔5s重试再次拉取消息。PullRequestHoldService#run方法5s周期执行消息拉取任务,其调用链如下。
在这里插入图片描述
org.apache.rocketmq.broker.longpolling.PullRequestHoldService#notifyMessageArriving是拉取消息的核心方法,代码如下。注意事项:

  • 判断是否有新消息:当前消费队列最大偏移量 > 待拉取偏移量,说明有新消息。
/**
 * 处理轮询机制的拉取请求
 * step1:获取key下所有挂起的拉取消息请求;
 * step2:复制并清空当前key的所有挂起拉取消息任务,
 *       采用synchronized,避免并发访问 {@link org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService}也会唤醒,尝试拉取消息
 * step3:当前消费队列最大偏移量 > 待拉取偏移量,说明有新消息;
 * step4:有匹配消息或拉取请求挂起超时时,拉取请求唤醒并返回给消息拉取的消费者
 *       {@link PullMessageProcessor#executeRequestWhenWakeup}
 * @param topic
 * @param queueId
 * @param maxOffset 当前消费队列最大偏移量
 * @param tagsCode
 * @param msgStoreTime 消息存储时间
 * @param filterBitMap
 * @param properties
 */
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    String key = this.buildKey(topic, queueId);
    // 获取key下所有挂起的拉取消息请求
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (mpr != null) {
        // 复制并清空当前key的所有挂起拉取消息任务
        List<PullRequest> requestList = mpr.cloneListAndClear();
        if (requestList != null) {
            List<PullRequest> replayList = new ArrayList<PullRequest>();
 
            for (PullRequest request : requestList) {
                long newestOffset = maxOffset;
                if (newestOffset <= request.getPullFromThisOffset()) {
                    newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                }
 
                // 当前消费队列最大偏移量 > 待拉取偏移量,说明有新消息
                if (newestOffset > request.getPullFromThisOffset()) {
                    // 消息是否匹配
                    boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                        new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                    // match by bit map, need eval again when properties is not null.
                    if (match && properties != null) {
                        match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                    }
 
                    if (match) {
                        try {
                            // 匹配,则将消费者拉取请求唤醒,将消息返回给消息拉取的消费者
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error(
                                "PullRequestHoldService#notifyMessageArriving: failed to execute request when "
                                    + "message matched, topic={}, queueId={}", topic, queueId, e);
                        }
                        continue;
                    }
                }
 
                // 消费者拉取请求挂起超时,则直接返回给消费者,未找到消息
                if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                    try {
                        // 将消费者拉取请求唤醒,将消息返回给消息拉取的消费者
                        this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                            request.getRequestCommand());
                    } catch (Throwable e) {
                        log.error(
                            "PullRequestHoldService#notifyMessageArriving: failed to execute request when time's "
                                + "up, topic={}, queueId={}", topic, queueId, e);
                    }
                    continue;
                }
 
                replayList.add(request);
            }
 
            // 下次循环拉取的消息
            if (!replayList.isEmpty()) {
                mpr.addPullRequest(replayList);
            }
        }
    }
}

2. DefaultMessageStore.ReputMessageService线程

org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService该线程完成新消息执行异步转发时(新消息构建消费队列),主动唤醒消费者的拉取请求,沉睡1ms继续下一次检查。

org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput执行异步转发的核心逻辑(参考《RocketMQ5.0.0消息存储<三>_消息转发与恢复机制》),其如下部分代码是主动唤醒消费者拉取请求。

/**
 * 消息提交到Commitlog时消息转发,构建ConsumeQueue、index文件服务 的核心方法
 * step1:消息转发偏移量 > 最小偏移量时,则最小偏移量赋值给消息转发偏移量
 * step2:消息转发偏移量 <= 最小偏移量时,获取转发偏移量开始的全部有效数据
 * step3:循环转发每条消息,获取每条消息的转发请求对象{@link DispatchRequest}
 * step4:消息解析成功后,转发消息{@link DefaultMessageStore#doDispatch(DispatchRequest)}
 *        注意:转发消息时遍历LinkedList<CommitLogDispatcher> dispatcherList,集合中有消息队列、索引文件执行转发的实现类
 * step5:更新消息转发偏移量
 */
private void doReput() {
                            ......
 
                            // 消息转发
                            DefaultMessageStore.this.doDispatch(dispatchRequest);
 
                            // 开启长轮询时,唤醒PullRequestHoldService线程,执行被挂起的拉取消息请求
                            if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
                                    && DefaultMessageStore.this.messageArrivingListener != null) {
                                // 唤醒被挂起的拉取消息请求,再次拉取消息
                                DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                        dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                        dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                        dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                // 多个消息队列,唤醒被挂起的拉取消息请求,再次拉取消息
                                notifyMessageArrive4MultiQueue(dispatchRequest);
                            }
 
                            ......
}

五、参考资料

  1. https://blog.csdn.net/liqiuman180688/article/details/88390667
  2. https://baijiahao.baidu.com/s?id=1740665653045957262&wfr=spider&for=pc
  3. https://blog.csdn.net/aideserter/article/details/118713781

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/737340.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

文字识别(OCR)介绍与开源方案对比

目录 文字识别&#xff08;OCR&#xff09;介绍与开源方案对比 一、OCR是什么 二、OCR基本原理说明 三、OCR基本实现流程 四、OCR开源项目调研 1、tesseract 2、PaddleOC 3、EasyOCR 4、chineseocr 5、chineseocr_lite 6、cnocr 7、商业付费OCR 1&#xff09;腾讯…

vue+Element 设置头部固定,并解决遮罩层显示问题

通过整体框架代码可以看到&#xff0c;其实element-ui已经实现了头部固定 找到这个fixedHeader&#xff0c;发现直接在全局设置文件里 这里如果设置为false&#xff0c;就表示头部不固定&#xff1b;改为true&#xff0c;则表示头部固定。 上述更改完后&#xff0c;就可以实…

关于索引应用的一些问题

索引是啥:加快检索速度的数据结构 索引的优点和缺点 索引的优点: 1.建立索引后,数据库检索数据速度直线上升(使用正确的话),数据量越大越明显 2.分组和排序的时候,可以利用索引加快速度 3.通过建立唯一索引可以确保数据唯一,不需要加其他限制条件(既建立了索引 又保证了唯…

火山引擎 DataLeap 套件下构建数据目录(Data Catalog)系统的实践

更多技术交流、求职机会&#xff0c;欢迎关注字节跳动数据平台微信公众号&#xff0c;回复【1】进入官方交流群 摘要 Data Catalog 产品&#xff0c;通过汇总技术和业务元数据&#xff0c;解决大数据生产者组织梳理数据、数据消费者找数和理解数的业务场景&#xff0c;并服务于…

【综述003】面向未来的语义通信:基本原理与实现方法

摘要 0.引言 张平&#xff1a;提出“智简&#xff08;Intellicise&#xff09;”理念&#xff0c;提出模型驱动的语义通信框架&#xff0c;实现通信系统由传统传输比特演进为传输“模型”。如&#xff1a;语义基&#xff08;Seb&#xff09;牛凯&#xff1a;研究了从经典通信…

Three.js 三维模型(一)

简介 今天主要给搭建介绍下three.js的基本使用&#xff0c;本篇是基于笔者在16年给做的一个项目的demo版进行讲解的&#xff0c;笔者当时采用Html5和JS进行编写的。可能大家会问有没有vue、React 、angular版的。这些笔者后面有时间的时候一定会给大家介绍。 其实编程的本源在…

牌桌玩家越来越少?国产替代进程加速,中国企业要做好选择

现在“国产替代”这四个字热度很高。 可以说我们现在关注的问题&#xff0c;技术引进、自主创新、中国制造、卡脖子、全球竞争等&#xff0c;都可以用国产替代作为线索串联起来&#xff01; 其实这也不是21世纪之后中国刚刚遇到的问题。这是过去一百年中国人一直在奋斗的目标…

机器人动力学与控制学习笔记(十七)——基于名义模型的机器人滑模控制

十七、滑模控制器设计原理 滑模运动包括趋近运动和滑模运动两个过程。系统从任意初始状态趋向切换面&#xff0c;直到到达切换面的运动称为趋近运动&#xff0c;即趋近运动为的过程。根据滑模变结构原理&#xff0c;滑模可达性条件仅保证由状态空间任意位置运动点在有限时间内到…

NLP领域再创佳绩!阿里云机器学习平台 PAI 多篇论文入选 ACL 2023

近期&#xff0c;阿里云机器学习平台PAI主导的多篇论文在ACL 2023 Industry Track上入选。ACL是人工智能自然语言处理领域的顶级国际会议&#xff0c;聚焦于自然语言处理技术在各个应用场景的学术研究。该会议曾推动了预训练语言模型、文本挖掘、对话系统、机器翻译等自然语言处…

Trimble RealWorks处理点云数据(九)之点云分类后将地面导入Arcgis生成DEM

效果 步骤 1、las导入Trimble RealWorks 2、对点云数据预处理 可以参考这篇文章 TrimbleRealWorks点云数据预处理 我这边是把点云做了分类,而后将地面数据导出las 点云做为三维数据,后续步骤在arcscene中操作,能实时显示出来 3、arcscene创建las数据集

【计算机视觉 | 图像分割】arxiv 计算机视觉关于图像分割的学术速递(7 月 10 日论文合集)

文章目录 一、分割|语义相关(6篇)1.1 Unsupervised Segmentation of Fetal Brain MRI using Deep Learning Cascaded Registration1.2 Tranfer Learning of Semantic Segmentation Methods for Identifying Buried Archaeological Structures on LiDAR Data1.3 To pretrain or …

Vue2----Uniapp自定义弹窗

对于擅长后端的程序员&#xff0c;在编写前端时常常回去找库&#xff0c;比如elementUI&#xff0c;uview之类&#xff0c;但是往往这些库较为冗杂&#xff0c;有些功能比较强大&#xff0c;基本用不到&#xff0c;不好理解。这时候&#xff0c;如果可以自定义组件可能会对开发…

C++ STL常见算法

目录 1 各种常见算法的用法 1.1 非可变序列算法 1.2 可变序列算法 1.3 Partitions 1.4 排序算法 1.5 查找算法 1.6 集合算法 1.7 堆算法 1.8 最大最小值算法 1.9 其他算法 1 各种常见算法的用法 STL算法部分主要由头文件<algorithm>,<numeric>,<func…

uniapp 获取状态栏及小程序右侧胶囊信息(用于设置全屏小程序)

1.获取信息: //获取状态栏高度(px) this.statusBarHeight uni.getSystemInfoSync().statusBarHeight; //获取小程序胶囊信息 this.menuButtonInfo uni.getMenuButtonBoundingClientRect() 如下: 2.动态设置style样式: <view:style"{ paddingTop: menuButtonIn…

Oracle-RAC集群安装root.sh报错问题

问题背景: 在redhat 7.8上安装Oracle11G RAC集群&#xff0c;在节点一执行root.sh脚本时发生错误Disk Group OCRDG creation failed with the following message:ORA-15018: diskgroup cannot be created 问题分析: 从报错信息来看错误是在执行创建OCRDG磁盘组时失败&#xff0…

Python读取指定的TXT文本文件并从中提取指定数据的方法

本文介绍基于Python语言&#xff0c;遍历文件夹并从中找到文件名称符合我们需求的多个.txt格式文本文件&#xff0c;并从上述每一个文本文件中&#xff0c;找到我们需要的指定数据&#xff0c;最后得到所有文本文件中我们需要的数据的合集的方法。 首先&#xff0c;我们来明确一…

进度网络图详解

关键路径&#xff1a;总工期最长的那一条路径&#xff1a;可能不止一条。&#xff08;1条或多条&#xff09; 虚工作&#xff1a;不占用任何时间和资源的&#xff0c;只是为了让逻辑关系更加明确&#xff0c;网络图更加美观。 最早开始时间&#xff08;ES&#xff09;- 左上 最…

BT 种子,磁力链接是个啥?

[科普向] BT 种子、磁力链接到底是什么&#xff1f; BitTorrent 我们平时所说的 BT 种子&#xff0c;实际上指的是由 BitTorrent 协议所生成的一个包含资源信息的文件。与传统的网络传输协议不同&#xff0c;BitTorrent 协议是一种以 Peer-To-Peer&#xff08;P2P&#xff09…

【KingbaseES】查看表空间大小

查询单表空间大小 SELECT sys_size_pretty(sys_tablespace_size(sys_default))查看所有表空间大小&#xff08;不包含系统表空间&#xff0c;包含默认表空间&#xff09; SELECT oid,spcname AS "Name",sys_size_pretty(sys_tablespace_size(spcname)) AS "Lo…

2. SpringBoot快速回顾(@value读取配置文件)

目录 1.定义配置文件2. 定义Controller类3. 测试4. 优化4.1 封装实体类4.3 定义controller类4.2 测试 本文将介绍如何使用value读取配置文件的内容。 在实际项目中&#xff0c;往往会在配置文件中写项目部署需要配置的环境信息&#xff08;数据库驱动&#xff0c;数据库账号密码…