30.RocketMQ之消费者拉取消息源码

news2024/12/22 15:24:25

highlight: arduino-light

消息拉取概述

消息消费模式有两种模式:广播模式与集群模式。

广播模式比较简单,每一个消费者需要拉取订阅主题下所有队列的消息。本文重点讲解集群模式。

在集群模式下,同一个消费者组内有多个消息消费者,同一个主题存在多个消费队列,消费者通过负载均衡的方式消费消息。

消息队列负载均衡,通常的作法是一个消息队列在同一个时间只允许被一个消费者消费,一个消息消费者可以同时消费多个消息队列,并且对每个分配给自己的队列加锁。

image.png

从MQClientInstance的启动流程中可以看出,RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取,并且这个拉取消息的动作是1个死循环。

```java @Override public void run() { log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
    try {
        PullRequest pullRequest = this.pullRequestQueue.take();
        this.pullMessage(pullRequest);
    } catch (InterruptedException ignored) {
    } catch (Exception e) {
        log.error("Pull Message Service Run Method exception", e);
    }
}

log.info(this.getServiceName() + " service end");

} ```

pullRequestQueue的类型是LinkedBlockingQueue,LinkedBlockingQueue是一个线程安全的阻塞队列,先进先出。 LinkedBlockingQueue作为生产者消费者的首选,可以指定容量也可以不指定,不指定的话默认最大是Integer.MAX_VALUE。

主要用到put和take方法,take和put都是阻塞式的操作,两者协作完成生产者和消费者的线程模型。 put方法将一个对象放到队列尾部,在队列满的时候会阻塞,直到有队列成员被消费。 take方法从head取一个对象,在队列为空的时候会阻塞,直到有队列成员被放进来。

this.pullRequestQueue.take()就是从pullRequestQueue中获取一个pullRequest,使用pullRequest去拉取消息。

**既然有take,那么必然有put!

pullRequestQueue中pullRequest的从哪来的呢?

拉取消息是在负载均衡服务时,给当前消费者分配了一些队列,将这些队列构建成一个一个的pullRequest。

放入了pullRequestQueue

理一下pullRequestQueue和PullRequest以及ProcessQueue的关系

pullRequestQueue的类型是LinkedBlockingQueue,是阻塞队列,实现了生产消费模型,里面放的是一个一个的PullRequest。

PullRequest中封装了ProcessQueue。

ProcessQueue是MessageQueue在消费端的重现、快照。

消息拉取流程

1.消费者每次做完负载均衡之后,都会构建PullRequest放到pullRequestQueue也就是LinkedBlockingQueue。

java //构建PullRequest PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); //拉取的消费进度 pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); //将新加入构建PullRequest的放入pullRequestList pullRequestList.add(pullRequest); changed = true;

2.PullRequest中封装了ProcessQueue,ProcessQueue是MessageQueue在消费端的重现、快照。

3.PullMessageService线程调用LinkedBlockingQueue的take方法从pullRequestQueue获取PullRequest。

4.PullMessageService线程使用PullRequest从消息服务器默认每次拉取32条消息,按消息的队列偏移顺序存放在ProcessQueue中。

5.PullMessageService线程将消息提交到消费者消费线程池,消息成功消费后ProcessQueue中移除。

上面就是消息拉取的整体流程。

消息拉取源码

上一篇文章我们讲到了RebalancePushImpl#dispatchPullRequest。

java @Override public void dispatchPullRequest(List<PullRequest> pullRequestList) { //遍历所有的pullRequest for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl .executePullRequestImmediately(pullRequest); } }

那么我们这一篇文章就接着往下看。

1.放入ProcessQueue

PullMessageService#executePullRequestImmediately

```java public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }

//把PullRequest放到pullRequestQueue后就会被拉取消息的线程服务取走
//做真正的拉取操作
 public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            //生产者消费者模型
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

```

发现只是将PullRequest做了入队操作,代码走到这里发现已经点不下去了。

所以我们搜索一下pullRequestQueue相关的代码。

2.拉取消息定时任务取出pullRequest开始拉取消息

默认是CommunicationMode.ASYNC,异步拉取消息。

PullMessageService#run

```java @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { //从pullRequestQueue获取pullRequest PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } }

log.info(this.getServiceName() + " service end");

} ```

PullMessageService是MQClientInstance#start方法执行this.pullMessageService.start();时启动。

从pullRequestQueue获取pullRequest,然后调用pullMessage(pullRequest);

PullMessageService#pullMessage

java private void pullMessage(final PullRequest pullRequest) { //根据消费组选择消费者 //MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer); //如果prev不为空 只会打印日志 final MQConsumerInner consumer = this.mQClientFactory.selectConsumer (pullRequest.getConsumerGroup()); // 使用消费者拉取消息 if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest,drop it"); } }

this.consumerTable.putIfAbsent(group, consumer);是根据消费组选择消费者,点进去看看。

java public MQConsumerInner selectConsumer(final String group) { return this.consumerTable.get(group); }

consumerTableMQClientInstance的1个属性,源码如下:

java ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();

可以看到consumerTable的类型是1个ConcurrentHashMap,key是consumerGroup,value是MQConsumerInner。

但是为什么选择的消费者只有1个?

在消费者启动的时候会调用consumerTable.putIfAbsent(group, consumer);往map中放入消费者组和消费者。

假如在main方法中for循环启动2个消费者,因为每个消费者启动都会执行以下代码:

```java public synchronized boolean registerConsumer(final String group, final MQConsumerInner consumer) { if (null == group || null == consumer) { return false; }

MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
if (prev != null) {
    log.warn("the consumer group[" + group + "] exist already.");
    return false;
}

return true;

} 在registerConsumer方法会执行以下代码 java consumerTable.putIfAbsent(group, consumer); ```

所以第一个消费者启动的时候,执行putIfAbsent成功。

第二个消费者启动的时候,执行putIfAbsent失败。

也就是在1个JVM中同1个消费者组只有1个消费者。

2.1流控:拉取次数&拉取流量

  • processQueue 的消息数量 大于 1000, processQueue 的消息大小 大于 100 MB,将延迟 50 毫秒后拉取消息
  • processQueue 中偏移量最大的消息与偏移量最小的消息的跨度超过 2000 则延迟 50 毫秒再拉取消息。
  • 根据主题拉取订阅的消息,如果为空,延迟 3 秒,再拉取。

2.3如果是顺序消费要先加锁

顺序消费需要先加锁,如果本地的消费进度大于远程broker的消费进度。

会使用远程的broker的消费进度进行拉取

2.4注册回调函数

2.5发送拉取消息请求

2.6执行回调函数,将拉取的消息放到线程池消费

对于顺序消费

status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);

DefaultMQPushConsumerImpl#pullMessage

```java public void pullMessage(final PullRequest pullRequest) { final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return; } //设置上次更新时间 用于移除不重要的mq pullRequest.getProcessQueue() .setLastPullTimestamp(System.currentTimeMillis());

//判断当前消费者是否还在RUNNING
        //PULL_TIME_DELAY_MILLS_WHEN_SUSPEND=3
        try {
            this.makeSureStateOK();
        } catch (MQClientException e) {
            log.warn("pullMessage exception, consumer state not ok", e);
            //延迟任务:3秒以后执行executePullRequestImmediately 
            //将pullRequest重新放入pullRequestQueue
            this.executePullRequestLater(pullRequest, 
                                         PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            return;
        }

        //PULL_TIME_DELAY_MILLS_WHEN_SUSPEND=3
        //如果在在暂停状态
        //延迟任务:3秒以后执行executePullRequestImmediately 
        //将pullRequest重新放入pullRequestQueue
        if (this.isPause()) {
            log.warn("consumer was paused, execute pull request later");
            this.executePullRequestLater(pullRequest, 
                                         PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
            return;
        }

        //private int pullThresholdForQueue = 1000;
        //流量控制:
        //拉取次数大于1000次   
        long cachedMessageCount = processQueue.getMsgCount().get();
        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

       if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            //PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50 
            //50毫秒以后在拉取
            this.executePullRequestLater(pullRequest, 
                                         PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            //如果正好是1000的倍数次还要打印日志
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn("exceeds the threshold {}, so do flow control");
            }
            return;
        }

        //private int pullThresholdForQueue = 1000;
        //流量控制:
        //拉取的数据累计大于100兆
        if (cachedMessageSizeInMiB > 
            this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            //50毫秒以后在拉取
            this.executePullRequestLater(pullRequest, 
                                         PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            //如果正好是1000的倍数次还要打印日志
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn("exceeds the threshold {} MiB, so do flow control");
            }
            return;
        }

        //如果不是顺序消费
        if (!this.consumeOrderly) {
            //则检查ProcessQueue对象的msgTreeMap。
            //TreeMap<Long,MessageExt>变量的第一个key值与最后一个key值之间的差额
            //该key值表示查询的队列偏移量queueoffset
            //若差额大于阈值(由DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定
            //默认是2000)
            //则调用PullMessageService.executePullRequestLater方法,
            //在50毫秒之后重新将该PullRequest请求
            //放入PullMessageService.pullRequestQueue队列中;
            //并跳出该方法;
            if (processQueue.getMaxSpan() >         
                    this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                //延迟50毫秒在拉取
                this.executePullRequestLater
                    (pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);                         
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                    log.warn("the queue's messages, span too long, flow control");
                }
                return;
            }
        } else {
            //如果是顺序消费
            //负载均衡时加的锁
            if (processQueue.isLocked()) {
                //不是第一次拉取
                if (!pullRequest.isLockedFirst()) {
                    //计算从哪里开始拉取消息即拉取消息的偏移量
                    //默认是CONSUME_FROM_LAST_OFFSET
                    final long offset =     
                            this.rebalanceImpl
                                .computePullFromWhere(pullRequest.getMessageQueue());
                    //本地的偏移量大于服务器的偏移量
                    boolean brokerBusy = offset < pullRequest.getNextOffset();
                    //重新设置本地的偏移量为从broker拉取的偏移量
                    //这里也会重复消费吧
                    pullRequest.setLockedFirst(true);
                    pullRequest.setNextOffset(offset);
                }
            } else {
                this.executePullRequestLater(pullRequest, 
                                             PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                return;
            }
        }

        final SubscriptionData subscriptionData 
                                =   this.rebalanceImpl  
                                        .getSubscriptionInner()
                                        .get(pullRequest.getMessageQueue().getTopic());
        if (null == subscriptionData) {
            //3秒后重新放入
            this.executePullRequestLater(pullRequest, 
                                         PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            log.warn("find the consumer's subscription failed");
            return;
        }

        final long beginTimestamp = System.currentTimeMillis();
        /*
         注意这里只是注册了1个回调函数
         功能就是把拉取到的消息保存到processqueue上,然后进行客户端实际业务消费,
         最后把pullRequest重新添加到阻塞队列供pullmessageservice服务线程重新拉取
         */
        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    //如果拉取的结果是FOUND 即 拉取到了数据
                    //那么解析二进制数据为具体的消息列表
                    pullResult=  DefaultMQPushConsumerImpl
                                    .this.pullAPIWrapper.processPullResult
                                            (pullRequest.getMessageQueue(), 
                                                    pullResult,subscriptionData);

                    //转换拉取结果
                    switch (pullResult.getPullStatus()) {
                        //消息拉取结果,消息拉取到了
                        case FOUND:
                            //拉取到的消息的位置,相对于consumer queue
                            long prevRequestOffset 
                                = pullRequest.getNextOffset();
                            //下次待拉取的消息在consumer queue的位置
                            pullRequest.setNextOffset
                                        (pullResult.getNextBeginOffset());
                            //拉取消息花费的时间
                            long pullRT = 
                                    System.currentTimeMillis() - beginTimestamp;
                            //累加响应时间
                            DefaultMQPushConsumerImpl
                                .this
                                .getConsumerStatsManager()
                                .incPullRT(pullRequest.getConsumerGroup(),
                                 pullRequest.getMessageQueue().getTopic(), pullRT);

                            long firstMsgOffset = Long.MAX_VALUE;
                            //如果拉取的消息列表为空
                            if (pullResult.getMsgFoundList() == null 
                                ||pullResult.getMsgFoundList().isEmpty()) {                                         //放入拉取队列再拉一次
                                defaultMQPushConsumerImpl.this
                                    .executePullRequestImmediately(pullRequest);
                            //如果拉取的消息列表不为空
                            } else {
                                //拉取到的消息中的第一个消息在commitlog的位置
                                //获取第一个消息的偏移量
                                firstMsgOffset = 
                                   pullResult
                                    .getMsgFoundList()
                                    .get(0)
                                    .getQueueOffset();
                                //累加响应时间
                                DefaultMQPushConsumerImpl
                                    .this.getConsumerStatsManager()
                                    .incPullTPS
                                    (pullRequest.getConsumerGroup(),
                                     pullRequest.getMessageQueue().getTopic(),          
                                     pullResult.getMsgFoundList().size());
                                //重要代码★:
                                //将消息放入processQueue的treemap
                                //把拉取到的消息保存到ProcessQueue.msgTreeMap
                                boolean dispatchToConsume =             
                                    processQueue.putMessage
                                            (pullResult.getMsgFoundList());

                                //提交给consumeMessageService
                                //客户端消费并发执行 ConsumeRequest.run()
                                //客户端消费并发执行 ConsumeRequest.run()
                                //客户端消费并发执行 ConsumeRequest.run()
                                //消费的代码要重点看
                                DefaultMQPushConsumerImpl
                                    .this
                                    .consumeMessageService
                                    .submitConsumeRequest
                                        (pullResult.getMsgFoundList(),
                                            processQueue,
                                                    pullRequest.getMessageQueue(),
                                                                dispatchToConsume);
                                //  private long pullInterval = 0;
                                //  如果interval大于0 那么延迟interval毫秒后拉取
                                //  默认是0这个应该是可以配置的 设置PullInterval
                                //  可以在拉取消息完成后 等待一段时间在拉取
                                if (DefaultMQPushConsumerImpl.this.
                                    defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this
                                        .executePullRequestLater(
                                            pullRequest,
                                            DefaultMQPushConsumerImpl.this
                                            .defaultMQPushConsumer
                                            .getPullInterval());

                                } else {
                                    //把PullRequest重新保存到
                                    //PullMessageService.pullRequestQueue阻塞队列
                                    //供消费线程继续执行消息继续拉取
                                    DefaultMQPushConsumerImpl
                                        .this
                                        .executePullRequestImmediately(pullRequest);
                                }
                            }
                            //下一次请求的偏移量应该大于前一次的偏移量
                            //第一个消息的偏移量应该大于前一次的偏移量
                            if (pullResult.getNextBeginOffset() < prevRequestOffset
                                            || firstMsgOffset < prevRequestOffset) {
                                log.warn("pull message result maybe data wrong");
                            }

                            break;

                         //未拉取到消息
                        case NO_NEW_MSG:
                            //拉取下一个新的offset 这个偏移量是服务器端获取到的
                            pullRequest.setNextOffset
                                (pullResult.getNextBeginOffset());
                            //本次拉取到的消息总size==0,则更新消费端本地的offset
                            DefaultMQPushConsumerImpl
                                .this
                                .correctTagsOffset(pullRequest);
                            //把pullRequest重新保存到pullmessageservice的阻塞队列
                            //供拉取线程重新执行
                            DefaultMQPushConsumerImpl
                                .this.executePullRequestImmediately(pullRequest);
                            break;

                         //消息拉取到了但是不匹配tag,broker进行tag过滤
                        case NO_MATCHED_MSG:
                            //拉取下一个新的offset 这个偏移量是服务器端获取到的
                            pullRequest.setNextOffset
                                (pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl
                                .this.correctTagsOffset(pullRequest);
                            //把pullRequest重新保存到pullmessageservice的
                            //阻塞队列供拉取线程重新执行
                            DefaultMQPushConsumerImpl
                                .this.executePullRequestImmediately(pullRequest);
                            break;

                         //offset非法,那么该pullRequest不会被重新进行拉取
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal");
                            //拉取下一个新的offset 这个偏移量是服务器端获取到的
                            pullRequest
                                .setNextOffset(pullResult.getNextBeginOffset());
                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl
                                .this.executeTaskLater(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl
                                            .this.offsetStore
                                            .updateOffset
                                            (pullRequest.getMessageQueue(),         
                                            pullRequest.getNextOffset(), false);

                                        DefaultMQPushConsumerImpl.this.offsetStore
                                            .persist(pullRequest.getMessageQueue());

                                        DefaultMQPushConsumerImpl.this
                                            .rebalanceImpl
                                            .removeProcessQueue
                                            (pullRequest.getMessageQueue());
                                        log.warn("fix the pull request offset");
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
                            }, 10000);
                            break;
                        default:
                            break;
                    }
                }
            }
            @Override
            public void onException(Throwable e) {
                if (!pullRequest.getMessageQueue().getTopic()
                                .startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("execute the pull request exception", e);
                }
                DefaultMQPushConsumerImpl.this
                        .executePullRequestLater
                            (pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        };

        boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
       if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            commitOffsetValue = this.offsetStore.readOffset
                                    (pullRequest.getMessageQueue(), 
                                        ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {
                commitOffsetEnable = true;
            }
        }

        String subExpression = null;
        boolean classFilter = false;
        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner()                                             .get(pullRequest.getMessageQueue().getTopic());
        if (sd != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() 
                && !sd.isClassFilterMode()) {
                subExpression = sd.getSubString();
            }

            classFilter = sd.isClassFilterMode();
        }

        int sysFlag = PullSysFlag.buildSysFlag(
            commitOffsetEnable, // commitOffset
            true, // suspend
            subExpression != null, // subscription
            classFilter // class filter
        );

        //真正拉取消息的逻辑在这里
        //上面的代码是拉取到消息的回调pullCallback
        //private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
        //拉取消息的最大挂起时间是15秒
        //private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
        //消费端的超时时间
        try {
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                //注册的回调函数
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, 
                                         PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }
    }

```

PullAPIWrapper#pullKernelImpl

```java public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, final long offset, final int maxNums, final int sysFlag, final long commitOffset, final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); }

if (findBrokerResult != null) {
            {
                // check version
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;

            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }

            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            //是否允许挂起的标志
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            //15秒
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);

            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = 
                    computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }

            PullResult pullResult =             
                    this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                        brokerAddr,
                        requestHeader,
                        timeoutMillis,
                        communicationMode,
                        pullCallback);

            return pullResult;
        }

        throw new MQClientException
                    ("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

```

RebalancePushImpl#computePullFromWhere

3.消息进度如何计算的

```java @Override public long computePullFromWhere(MessageQueue mq) { long result = -1; //默认是从上一个OFFSET消费 默认策略,即跳过消费过的历史消息 //private ConsumeFromWhere consumeFromWhere = // ConsumeFromWhere.CONSUMEFROMLASTOFFSET; final ConsumeFromWhere consumeFromWhere =
this.defaultMQPushConsumerImpl .getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); switch (consumeFromWhere) { case CONSUME
FROMLASTOFFSETANDFROMMINWHENBOOTFIRST: case CONSUMEFROMMINOFFSET: case CONSUMEFROMMAXOFFSET: //返回本地存储的消费偏移量获取上一次消费的偏移量 //注意集群消费模式offsetStore是RemoteBrokerOffsetStore //注意广播消费模式offsetStore是LocalFileOffsetStore#updateOffset case CONSUMEFROMLASTOFFSET: { //默认根据存储的消息偏移量消费 //根据消费模式 判断是从本地还是远程 //集群:远程 广播:本地 //第一次会从远程拉取 //后面都是从offsetTable读取 long lastOffset = offsetStore.readOffset (mq, ReadOffsetType.READFROMSTORE); //大于等于0说明是正常的消费进度 if (lastOffset >= 0) { //返回消费偏移量 result = lastOffset; } // First start,no offset else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRYGROUPTOPICPREFIX)) { result = 0L; } else { try { //从Broker中拉取该队列对应的maxOffset result = this.mQClientFactory .getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; } //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍 case CONSUMEFROMFIRSTOFFSET: { long lastOffset = offsetStore.readOffset (mq, ReadOffsetType.READFROMSTORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { result = 0L; } else { result = -1; } break; } //从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前 case CONSUMEFROMTIMESTAMP: { long lastOffset = offsetStore.readOffset (mq, ReadOffsetType.READFROMSTORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRYGROUPTOPICPREFIX)) { try { result = this.mQClientFactory .getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } else { try { long timestamp = UtilAll.parseDate( this.defaultMQPushConsumerImpl .getDefaultMQPushConsumer().getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS).getTime()); result = this.mQClientFactory.getMQAdminImpl() .searchOffset(mq, timestamp); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; }

default:
                break;
        }

        return result;
    }

```

CONSUME_FROM_LAST_OFFSET计算逻辑

```java case CONSUMEFROMLASTOFFSET: { //读取远程broker的偏移量 long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READFROMSTORE);
//如果返回的偏移量大于等于0,则直接使用该offset //这个也能理解,大于等于0,表示查询到有效的消息消费进度,从该有效进度开始消费。 //但我们要特别留意lastOffset为0是什么场景 //因为返回0,并不会执行CONSUME
FROMLASTOFFSET(语义)。 if (lastOffset >= 0) {
result = lastOffset; //如果lastOffset为-1//表示当前并未存储其有效偏移量,可以理解为第一次消费
}else if (-1 == lastOffset) {
//如果是消费组重试主题,从重试队列偏移量为0开始消费;
if (mq.getTopic().startsWith(MixAll.RETRYGROUPTOPICPREFIX)) {
result = 0L; } else { try { // 如果是普通主题,则从队列当前的最大的有效偏移量开始消费 // 即CONSUME
FROMLASTOFFSET语义的实现。 result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) { //如果从远程服务拉取最大偏移量拉取异常或其他情况,则使用-1作为第一次拉取偏移量。 result = -1; } } } else { result = -1;
} break; }

@Override
    public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
        if (mq != null) {
            switch (type) {
                case MEMORY_FIRST_THEN_STORE:
                case READ_FROM_MEMORY: {
                    AtomicLong offset = this.offsetTable.get(mq);
                    if (offset != null) {
                        return offset.get();
                    } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
                        return -1;
                    }
                }
                case READ_FROM_STORE: {
                    try {
                        //从broker读取进度
                        long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
                        AtomicLong offset = new AtomicLong(brokerOffset);
                        this.updateOffset(mq, offset.get(), false);
                        return brokerOffset;
                    }
                    // No offset in broker
                    catch (MQBrokerException e) {
                        return -1;
                    }
                    //Other exceptions
                    catch (Exception e) {
                        log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
                        return -2;
                    }
                }
                default:
                    break;
            }
        }

        return -1;
    }

```

接下来我们将以集群模式来查看一下消息消费进度的查询逻辑,集群模式的消息进度存储管理器实现为: RemoteBrokerOffsetStore,最终Broker端的命令处理类为:ConsumerManageProcessor。

```java ConsumerManageProcessor#queryConsumerOffset private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class); final QueryConsumerOffsetResponseHeader responseHeader = (QueryConsumerOffsetResponseHeader) response.readCustomHeader(); final QueryConsumerOffsetRequestHeader requestHeader = (QueryConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); //从消费消息进度文件中查询消息消费进度。 long offset = this.brokerController.getConsumerOffsetManager().queryOffset( requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
//如果消息消费进度文件中存储该队列的消息进度,其返回的offset必然会大于等于0 //则直接返回该偏移量该客户端,客户端从该偏移量开始消费。 if (offset >= 0) {
responseHeader.setOffset(offset); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { //如果未从消息消费进度文件中查询到其进度,offset为-1。 //则首先获取该主题、消息队列当前在Broker服务器中的最小偏移量。 long minOffset = this.brokerController.getMessageStore() .getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); //如果小于等于0 //返回0则表示该队列的文件还未曾删除过 //并且其最小偏移量对应的消息存储在内存中而不是存在磁盘中,则返回偏移量0 //这就意味着ConsumeFromWhere中定义的三种枚举类型都不会生效,直接从0开始消费
if (minOffset <= 0 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { responseHeader.setOffset(0L); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { //如果偏移量小于等于0,但其消息已经存储在磁盘中 //此时返回未找到,最终RebalancePushImpl#computePullFromWhere中得到的偏移量为-1。 response.setCode(ResponseCode.QUERYNOTFOUND); response.setRemark ("Not found, V306_SNAPSHOT maybe this group consumer boot first"); } } return response; }

参考链接:https://blog.csdn.net/prestigeding/article/details/96576932 查询文件消费进度:假如文件中是0:10那么下次就从10开始拉 java public long queryOffset(final String group, final String topic, final int queueId) { // topic@group String key = topic + TOPICGROUPSEPARATOR + group; ConcurrentMap map = this.offsetTable.get(key); if (null != map) { Long offset = map.get(queueId); if (offset != null) return offset; }

return -1;
    }

```

4.组装消息

image.png

PullMessageProcessor#processRequest

PullMessageProcessor#processReques

java //构建消息过滤器 MessageFilter messageFilter; if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) { messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager()); } else { messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager()); } //调用MessageStore.getMessage查找消息 final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage( requestHeader.getConsumerGroup(), //消费组名称 requestHeader.getTopic(), //主题名称 requestHeader.getQueueId(), //队列ID requestHeader.getQueueOffset(), //待拉取偏移量 requestHeader.getMaxMsgNums(), //最大拉取消息条数 messageFilter //消息过滤器 );

DefaultMessageStore#getMessage

```java public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { if (this.shutdown) { log.warn("message store has shutdown, so getMessage is forbidden"); return null; }

if (!this.runningFlags.isReadable()) {
            log.warn("message store is not readable, so getMessage is forbidden");
            return null;
        }

        long beginTime = this.getSystemClock().now();

        GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
        //offset是待拉取偏移量
        //查找下一次队列偏移量
        long nextBeginOffset = offset;
        //当前消息队列最小偏移量
        long minOffset = 0;
        //当前消息队列最大偏移量
        long maxOffset = 0;
        //预定义返回的响应结果
        GetMessageResult getResult = new GetMessageResult();
        //获取commitLog最大偏移量
        final long maxOffsetPy = this.commitLog.getMaxOffset();
        //根据主题名称和队列编号获取消息消费队列ConsumeQueue
        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
        if (consumeQueue != null) {
            minOffset = consumeQueue.getMinOffsetInQueue();
            maxOffset = consumeQueue.getMaxOffsetInQueue();
            //消息偏移量异常情况校对下一次拉取偏移量
            if (maxOffset == 0) {
                //表示当前消息队列中没有消息
                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                nextBeginOffset = nextOffsetCorrection(offset, 0);
            } else if (offset < minOffset) {
                //待拉取消息的偏移量小于队列的最小偏移量
                status = GetMessageStatus.OFFSET_TOO_SMALL;
                nextBeginOffset = nextOffsetCorrection(offset, minOffset);
            } else if (offset == maxOffset) {
                //待拉取偏移量为队列最大偏移量
                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                nextBeginOffset = nextOffsetCorrection(offset, offset);
            } else if (offset > maxOffset) {
                //偏移量越界
                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                if (0 == minOffset) {
                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                } else {
                    nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                }
            } else {
                SelectMappedBufferResult bufferConsumeQueue 
                                    = consumeQueue.getIndexBuffer(offset);
                if (bufferConsumeQueue != null) {
                    try {
                        status = GetMessageStatus.NO_MATCHED_MESSAGE;

                        long nextPhyFileStartOffset = Long.MIN_VALUE;
                        long maxPhyOffsetPulling = 0;

                        int i = 0;




                        //获取最大可以拉取的字节数
                        //最大每次可以拉取的字节数是16000
                        //最大每次可以拉取800条
                        //maxMsgNums默认是32,每个消费队列的索引大小是20字节,也就是640字节
                        final int maxFilterMessageCount 
                            = Math.max(16000, maxMsgNums * 
                                        ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        final boolean diskFallRecorded 
                            = this.messageStoreConfig.isDiskFallRecorded();
                        ConsumeQueueExt.CqExtUnit cqExtUnit 
                                = new ConsumeQueueExt.CqExtUnit();
                        //for循环 根据偏移量从CommitLog中拉取32条消息
                        //maxFilterMessageCount  = 640 
                        //每次拉取后增加20字节
                        for (; i < bufferConsumeQueue.getSize() && 
                             i < maxFilterMessageCount; 
                             i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            //读取偏移量8个字节
                            long offsetPy = 
                                    bufferConsumeQueue.getByteBuffer().getLong();
                            //读取消息长度4个字节
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                            //读取8个字节的tag的hashcode
                            long tagsCode = 
                                    bufferConsumeQueue.getByteBuffer().getLong();

                            maxPhyOffsetPulling = offsetPy;

                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                if (offsetPy < nextPhyFileStartOffset)
                                    continue;
                            }

                            boolean isInDisk
                                = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);

                            if (this.isTheBatchFull(sizePy, maxMsgNums, 
                                getResult.getBufferTotalSize(), 
                                getResult.getMessageCount(),
                                isInDisk)) {
                                break;
                            }

                            boolean extRet = false, isTagsCodeLegal = true;
                            if (consumeQueue.isExtAddr(tagsCode)) {
                                extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
                                if (extRet) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    log.error("can't Find Consume queue");
                                    isTagsCodeLegal = false;
                                }
                            }

                            if (messageFilter != null
                                && !messageFilter.isMatchedByConsumeQueue
                                    (isTagsCodeLegal ? tagsCode : null, extRet ? 
                                     cqExtUnit : null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }

                                continue;
                            }
                            //根据消息的偏移量和消息的长度查询消息对应的MappedBuffer
                            SelectMappedBufferResult selectResult = 
                                this.commitLog.getMessage(offsetPy, sizePy);
                            if (null == selectResult) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                }

                                nextPhyFileStartOffset = 
                                    this.commitLog.rollNextFile(offsetPy);
                                continue;
                            }
                            //判断消息过滤器是否存在
                            //如果存在使用消息过滤器过滤本条消息是否符合
                            if (messageFilter != null
                                && !messageFilter.isMatchedByCommitLog
                                    (selectResult.getByteBuffer().slice(), null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }
                                selectResult.release();
                                continue;
                            }

                            this.storeStatsService
                                .getGetMessageTransferedMsgCount()
                                .incrementAndGet();
                            //将获取到的消息添加到getResult
                            getResult.addMessage(selectResult);
                            status = GetMessageStatus.FOUND;
                            nextPhyFileStartOffset = Long.MIN_VALUE;
                        }

                        if (diskFallRecorded) {
                            long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
                            brokerStatsManager.recordDiskFallBehindSize
                                        (group, topic, queueId, fallBehind);
                        }
                        //获取下一次请求要拉取的偏移量
                        //假如本次offset是从第0条开始拉取,拉取32条
                        //i是累计的拉取的偏移量的总和640
                        //那么下次拉取的偏移量是:0+640/20=32 也就是从32开始拉取
                        nextBeginOffset=offset+(i/ConsumeQueue.CQ_STORE_UNIT_SIZE);

                        long diff = maxOffsetPy - maxPhyOffsetPulling;
                        long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
                            * (this.messageStoreConfig
                               .getAccessMessageInMemoryMaxRatio() / 100.0));
                        getResult.setSuggestPullingFromSlave(diff > memory);
                    } finally {

                        bufferConsumeQueue.release();
                    }
                } else {
                    status = GetMessageStatus.OFFSET_FOUND_NULL;
                    nextBeginOffset = nextOffsetCorrection
                        (offset, consumeQueue.rollNextFile(offset));
                }
            }
        } else {
            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            nextBeginOffset = nextOffsetCorrection(offset, 0);
        }

        if (GetMessageStatus.FOUND == status) {
            this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
        } else {
            this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
        }
        long eclipseTime = this.getSystemClock().now() - beginTime;
        this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);

        getResult.setStatus(status);
        getResult.setNextBeginOffset(nextBeginOffset);
        getResult.setMaxOffset(maxOffset);
        getResult.setMinOffset(minOffset);
      //返回响应
        return getResult;
    }

```

PullMessageProcessor#processRequest

```java //根据拉取结果填充responseHeader response.setRemark(getMessageResult.getStatus().name()); responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); responseHeader.setMinOffset(getMessageResult.getMinOffset()); responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

//判断如果存在主从同步慢,设置下一次拉取任务的ID为主节点
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
    case ASYNC_MASTER:
    case SYNC_MASTER:
        break;
    case SLAVE:
        if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
            responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
        }
        break;
}
...
//GetMessageResult与Response的Code转换
switch (getMessageResult.getStatus()) {
    case FOUND:         //成功
        response.setCode(ResponseCode.SUCCESS);
        break;
    case MESSAGE_WAS_REMOVING:  //消息存放在下一个commitLog中
        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);  //消息重试
        break;
    case NO_MATCHED_LOGIC_QUEUE:    //未找到队列
    case NO_MESSAGE_IN_QUEUE:   //队列中未包含消息
        if (0 != requestHeader.getQueueOffset()) {
            response.setCode(ResponseCode.PULL_OFFSET_MOVED);
            requestHeader.getQueueOffset(),
            getMessageResult.getNextBeginOffset(),
            requestHeader.getTopic(),
            requestHeader.getQueueId(),
            requestHeader.getConsumerGroup()
            );
        } else {
            response.setCode(ResponseCode.PULL_NOT_FOUND);
        }
        break;
    case NO_MATCHED_MESSAGE:    //未找到消息
        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
        break;
    case OFFSET_FOUND_NULL: //消息物理偏移量为空
        response.setCode(ResponseCode.PULL_NOT_FOUND);
        break;
    case OFFSET_OVERFLOW_BADLY: //offset越界
        response.setCode(ResponseCode.PULL_OFFSET_MOVED);
        // XXX: warn and notify me
        log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
                requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
        break;
    case OFFSET_OVERFLOW_ONE:   //offset在队列中未找到
        response.setCode(ResponseCode.PULL_NOT_FOUND);
        break;
    case OFFSET_TOO_SMALL:  //offset未在队列中
        response.setCode(ResponseCode.PULL_OFFSET_MOVED);
        requestHeader.getConsumerGroup(), 
        requestHeader.getTopic(), 
        requestHeader.getQueueOffset(),
        getMessageResult.getMinOffset(), channel.remoteAddress());
        break;
    default:
        assert false;
        break;
}
...
//如果CommitLog标记可用,并且当前Broker为主节点,则更新消息消费进度
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
    && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
    this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
        requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}

```

5.Broker返回响应

image.png

MQClientAPIImpl#processPullResponse

java private PullResult processPullResponse(    final RemotingCommand response) throws MQBrokerException, RemotingCommandException {    PullStatus pullStatus = PullStatus.NO_NEW_MSG;   //判断响应结果    switch (response.getCode()) {        case ResponseCode.SUCCESS:            pullStatus = PullStatus.FOUND;            break;        case ResponseCode.PULL_NOT_FOUND:            pullStatus = PullStatus.NO_NEW_MSG;            break;        case ResponseCode.PULL_RETRY_IMMEDIATELY:            pullStatus = PullStatus.NO_MATCHED_MSG;            break;        case ResponseCode.PULL_OFFSET_MOVED:            pullStatus = PullStatus.OFFSET_ILLEGAL;            break; ​        default:            throw new MQBrokerException(response.getCode(), response.getRemark());   } //解码响应头    PullMessageResponseHeader responseHeader =       (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); //封装PullResultExt返回    return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),        responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/722113.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

split()分割字符串【JavaScript】

分割字符串 在JavaScript中&#xff0c;我们可以使用split&#xff08; &#xff09;方法把一个字符串分割成一个数组&#xff0c; 这个数组存放的是原来字符串的所有字符片段。 有多少个片段&#xff0c;数组元素个数就是多少。 语法 字符串名.split&#xff08;"分割…

TypeScript——简介、开发环境搭建、基本类型、编译选项、webpack、babel、类、面向对象的特点、接口、泛型

文章目录 第一章 快速入门0、TypeScript简介1、TypeScript 开发环境搭建2、基本类型3、编译选项4、webpack5、Babel 第二章&#xff1a;面向对象1、类&#xff08;class&#xff09;2、面向对象的特点3、接口&#xff08;Interface&#xff09;4、泛型&#xff08;Generic&…

6、架构:组件与物料设计

本章节主要是物料组件的开发设计&#xff0c;之前提到了物料的结构与构成&#xff0c;但是并没有做明确的解释。作为低代码编辑器中核心的模块之一。 物料即承担了一个提供者的角色&#xff0c;通过对编辑器注入物料组件来完成页面的渲染和可视化编辑器的编排&#xff0c;最终…

服务无法注册进Eureka

相同的配置&#xff0c;在demo里能注册&#xff0c;在自己项目的无法注册&#xff0c;眼睛都快盯出老花眼了&#xff0c;还是不行&#xff0c;果然出现的问题只有在发现问题以后才觉得简单&#xff08;虽然确实是小问题&#xff0c;但是排查了一整天&#xff0c;值得记录一下&a…

IntelliJ IDEA 控制台中文乱码和错误: 非法字符: ‘\ufeff‘

一、问题描述&#xff1a; 最近在 Windows 电脑上使用 IntelliJ IDEA 运行 Java 程序时&#xff0c;发现运行报错且控制台显示乱码。如下图1&#xff1a; &#xfffd;&#xfffd;&#xfffd;&#xfffd;: &#xfffd;&#xfffd;&#xfffd;&#xfffd; GBK &#xff…

Xshell7连接Linux服务器的两种方式

文章目录 一、创建会话式连接二、直接在窗口中连接服务器 一、创建会话式连接 打开Xshell7之后&#xff0c;点击左上角的新建。 然后可以看到一下界面 在名称位置填入会话的名称&#xff0c;自己命名的&#xff0c;叫什么都可以。 主机那里需要填写服务器的ip地址&#xff0…

100种思维模型之黄金圈思维模型-90

黄金圈法则由西蒙.斯涅克&#xff08;Simon.sinek&#xff09;在TED演讲而被人所熟知&#xff0c;它是一种更好地思考问题的习惯。 西蒙.斯涅克说&#xff1a;“世界上所有伟大的领袖和组织——无论是苹果公司&#xff0c;马丁路德金&#xff0c;还是莱特兄弟&#xff0c;他们的…

SpringBoot 集成 xxl-job 实现定时任务管理

SpringBoot 集成 xxl-job 实现定时任务管理 摘要XXL-Job 优势集成XXL-Job操作环境运行XXL-Job1. 下载XXL-Job2. 创建数据库并导入数据3. 修改数据库连接配置4. 启动项目 项目集成1. 导入依赖2. 配置 application.yml 信息3. XxlJobConfig 配置类4. 创建 XxlJobTest 任务测试dem…

qt实现日历和天气显示(QCalendarWidget)

完成展示效果&#xff1a; 本项目主要有QCalendarWidget类和获取天气api 一、QCalendarWidget 关键代码&#xff1a; ui->mCalendarWidget->setHorizontalHeaderFormat(QCalendarWidget :: ShortDayNames);//星期一、二ui->mCalendarWidget->setVerticalHeaderFo…

基于Tars高并发IM系统的设计与实现--进阶篇2

基于Tars高并发IM系统的设计与实现–进阶篇2 消息时序 分为时间和序号 时间 分布式系统中&#xff0c;消息的时间一般都取服务端本地时间戳&#xff0c;一般IM系统服务主机不止一台&#xff0c;每台机器上时间可能会有差异&#xff0c;系统处理也会有延时&#xff0c;时间也…

neo4j删除Property Keys值方法

首先&#xff0c;停止neo4j服务 然后删除安装目录下面databases下面所有文件 重新运行neo4j&#xff0c;发现Property Keys值已经完全清干净了

Docker如何安装Nacos

&#x1f353; 简介&#xff1a;java系列技术分享(&#x1f449;持续更新中…&#x1f525;) &#x1f353; 初衷:一起学习、一起进步、坚持不懈 &#x1f353; 如果文章内容有误与您的想法不一致,欢迎大家在评论区指正&#x1f64f; &#x1f353; 希望这篇文章对你有所帮助,欢…

邮件收发原理及部署postfix邮件系统

目录 一、邮件收发原理 1、原理图及名词解释 2、MTA功能介绍 3、POP和IMAP获取邮件介绍 二、部署postfix邮件系统 1、环境准备 2、DNS服务器部署 3、部署Postfix 4、部署Dovecot 三、使用Foxmail测试 1、修改DNS服务器 2、Foxmail登录测试账户 3、发送测试邮件 …

热力图问题

1.python画图怎么使用特殊符号 python画图怎么使用特殊符号&#xff1f;_python中怎么在绘图中加a b c d_sinysama的博客-CSDN博客python画图怎么使用特殊符号_python中怎么在绘图中加a b c dhttps://blog.csdn.net/QAQIknow/article/details/124390075?ops_request_misc%257…

OPPO手机便签怎么设置字体颜色?便签调整字体颜色方法

OPPO是一个非常受年轻人青睐的手机品牌&#xff0c;它的手机不仅外观设计时尚轻薄&#xff0c;而且拍照清晰、系统流畅&#xff0c;并且拥有高中低不同档次的价位可供消费者选择。虽然OPPO手机的使用体验非常不错&#xff0c;但是有一部分用户也遇到了一些问题&#xff0c;例如…

B. Cake Assembly Line - 思维

分析&#xff1a; 推公式&#xff0c;需要每一块蛋糕的范围完全覆盖巧克力范围&#xff0c;假设蛋糕一共移动了距离d&#xff0c;那么则 对于每一个i都有a[i]-wd<b[i]-h<b[i]h<a[i]wd&#xff0c;解得b[i]h-a[i]-w<b[i]-h-a[i]w。只需要判断不等式是否成立就可以求…

平台使用篇 | RflySim飞控底层实验平台配置介绍

本课程提供的实验平台总体可以分成两个部分&#xff1a;硬件平台和软件平台。本讲简要介绍各个部分的基本组成及实验开发流程。 平台使用篇-RflySim飞控底层实验平台配置介绍 01 电脑配置 1.1推荐配置 •系统&#xff1a;Windows 10 x64系统&#xff08;版本大于等于1809&…

IIC通信原理(软件实现)-GD32

IIC通信原理-GD32 硬件连接 数据变换规则 起始信号和结束信号 应答信号 数据帧格式 #include "my_i2c_soft.h" #include "systick.h"void my_i2c_w_SDA(uint8_t bit_value) {gpio_bit_write(I2C_SOFT_PORT, I2C_SOFT_SDA_PIN, (bit_status)bit_val…

【软件下载】音频ASIO驱动下载

一&#xff0c;简介 在高速USB Audio使用中&#xff0c;需要再windows电脑上安装ASIO驱动&#xff0c;用来进行高速音频流的传输&#xff0c;本文主要介绍如何下载安装ASIO驱动。供参考。 二&#xff0c;安装步骤 2.1 软件下载 下载地址&#xff1a;http://www.asio4all.co…

【Python爬虫与数据分析】UDP/TCP通信协议

目录 一、网络编程基础 二、UDP协议 三、TCP协议 一、网络编程基础 数据编码与解码 str -> bytes&#xff1a;encode编码&#xff0c;发送信息的时候用encode编码bytes -> str&#xff1a;decode解码&#xff0c;打印接收的信息用decode解码 test 你好世界en_code…