Chapter11-最常用的消费类

news2025/4/13 22:05:42

11.1 整体流程

        我们使用 DefaultMQPushConsumer 的时候,一般流程是设置好 GroupName 、NameServer 地址 ,以及订阅的 Topic 名称, 然后填充Message 处理函数,最后调用 start () 。

        11.1.1 上层接口类

        DefaultMQPushConsumer 类在 org. apache.rocketmq. client. consumer 包中 ,这个类担任着上层接 口的角色,具体实现都在 DefaultMQPushConsumerlmpl 类中

    public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
        this.defaultMQPushConsumer = defaultMQPushConsumer;
        this.rpcHook = rpcHook;
        this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();
    }

    /**
     * Constructor specifying consumer group.
     *
     * @param consumerGroup Consumer group.
     */
    public DefaultMQPushConsumer(final String consumerGroup) {
        this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
    }

        我们常用的是最后这个构造函数,只传入一个 consumer Group 名称作为参数,这个构造函数会把 RPCHook 设为空,把负载均衡策略设置成平均策略。在构造函数的实现中 ,主要工作是创建 DefaultMQPushConsumerlmpl 对象。

        11.1.2 DefaultMQPushConsumer 的实现者 

         DefaultMQPushConsumerlmpl 具体实 现了 DefaultMQPushConsumer 的业务逻辑, DefaultMQPushConsumerlmpl.java 在 org. apache.rocketmq.client.impl. consumer 这个包里,本节接下来从 start 方法着手分析。首先是初始化 MQClientinstance ,并且设置好 rebalance 策略和 pullAPIWraper ,有这些结构后才能发送 pull 请求获取消息。

//参见org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
//  switch 的CREATE_JUST 语句块

// 初始化MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                // 初始化 PullAPIWrapper
if (this.pullAPIWrapper == null) {
    this.pullAPIWrapper = new PullAPIWrapper(
        mQClientFactory,
        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
}
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// 确定 OffsetStore 。 OffsetStore 里存储的是当前消费者所消费的消息
在队列中的偏移量
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();

        根据消费消息方式的不同 , OffsetStore 的类型也不同。 如果是 BROADCASTING模式,使用的是 LocalFileOffsetStore, Offset 存到本地;如果是 CLUSTERING模式,使用的是 RemoteBrokerOffsetStore, Offset 存到 Broker 机器上。

//然后是初始化 consumeMessageService ,根据对消息顺序需求的不同,使用不同的 Service 类型
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                    //POPTODO reuse Executor ?
                    this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                    //POPTODO reuse Executor ?
                    this.consumeMessagePopService =
                        new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

this.consumeMessageService.start();
 // POPTODO
this.consumeMessagePopService.start();

        最后调用 MQClientlnstance 的 start 方法 ,开始获取数据。

mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;

        11.1.3 获取消息逻辑

        获取消息的逻辑实现在 public void pullMessage ( finalPullRequestpullRequest) 函数中,这是一个很大的函数,前半部分是进行一些判断, 是进行流量控制的逻辑(见代码清单 11-5 );中间是对返回消息结果做处理的逻辑 ;后面是发送获取消息请求的逻辑。

        long cachedMessageCount = processQueue.getMsgCount().get();
        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
        // 检查个数
        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
        // 检查总大小
        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }

通过判断未处理消息的个数和总大小来控制是否继续请求消息。 对于顺序消息还有一些特殊判断逻辑。

if (!this.consumeOrderly){
    
    }else{
    // 顺序消息
}

获取的消息返回后,根据返回状态,调用相应的回调处理方法

PullCallback pullCallback = new PullCallback() {
    ... ...};
// 具体判断见代码块
switch (pullResult.getPullStatus()) {
    case FOUND:
        ... ... 
         break;
    case NO_NEW_MSG:
    case NO_MATCHED_MSG:

最后是发送获取消息请求,

    try {
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                this.defaultMQPushConsumer.getPullBatchSizeInBytes(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }

这三个阶段不停地循环执行 ,直到程序被停止

11.2 消息的并发处理

        11.2.1 并发处理过程

         处理效率的高低是反应 Consumer 实现好坏的重要指标,本节以 Consume­MessageConcurrentlyService 类 为例来分析 RocketMQ 的实现方式。 Consume­MessageConcurrentlyService 类在 org.apache.rocketmq.client.impl.consumer 包中 。

这个类定义了三个线程池,一个主线程池用来正常执行收到的消息,用户可以自定义通过 consumeThreadMin 和 consumeThreadMax 来自定义线程个数。另外两个都是单线程的线程池,一个用来执行推迟消费的消息,另一个用来定期清理超时消息( 15 分钟)。

 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService

public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerConcurrently messageListener) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListener;

        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new LinkedBlockingQueue<>();

        String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_";
        this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));

        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_" + consumerGroupTag));
    }

        从 Broker 获取到一批消息以后,根据 BatchSize 的 设置 ,把一批消息封装到一个 ConsumeRequest 中 ,然后把这个 ConsumeRequest 提交到consumeExecutor 线程池中执行  。

    @Override
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            for (int total = 0; total < msgs.size(); ) {
                List<MessageExt> msgThis = new ArrayList<>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }

                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }

                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }

        消息的处理结果可能有不同的值,主要 的两个是 CONSUME SUCCESS 和 RECONSUME LATER 。 如果消费不成功,要把消息提交到上面说的scheduledExecutorService 线程池中, 5 秒后再执行;如果消费模式是  CLUSTERING模式,未消费成功的消息会先被发送回 Broker ,供这个 ConsumerGroup 里的其他 Consumer 消费,如果发送回 Broker 失败 , 再调用阻CONSUME_LATER

    public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();

        if (consumeRequest.getMsgs().isEmpty())
            return;
        // 消息的处理结果可能有不同的值,主要的两个是 CONSUME_SUCCESS 和 RECONSUME_LATER
        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }

        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    // Maybe message is expired and cleaned, just ignore it.
                    if (!consumeRequest.getProcessQueue().containsMessage(msg)) {
                        log.info("Message is not found in its process queue; skip send-back-procedure, topic={}, "
                                + "brokerName={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getBrokerName(),
                            msg.getQueueId(), msg.getQueueOffset());
                        continue;
                    }
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }

                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);

                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }

        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

        处理逻辑是用户自定义 的, 当消息量大的时候,处理逻辑执行效率的高低影 响系统的吞吐量。 可以把多条消息组合起来处理,或者提高线程数,以提高系统的吞吐量。

        11.2.2 ProcessQueue 对象

        在前面的源码中,有个 Process Queue 类型的对象,这个对象的功能是什么呢?从 Broker 获得的消息,因为是提交到线程池里并行执行,很难监控和控制执行状态 , 比如如何获得当前消息堆积的数量,如何解决处理超时情况等 。RocketMQ 定义了一个快照类 Process Queue 来解决这些问题,在 Push Consumer 运行的时候,每个 Message Queue 都会有一个对应的 Process Queue 对象,保存了这个 Message Queue 消息处理状态的快照。Process Queue 对象里主要 的内容是一个 TreeMap 和 一个读写锁。 TreeMap里以 Message Queue 的 Offset 作为 Key,以消息内容的引用为 Value ,保存了所有从 MessageQueue 获取到但是还未被处理的消息,读写锁控制着多个线程对 TreeMap 对象的并发访问 。

        有了 ProcessQueue 对象,可以随时停止、启动消息的消费,同时也可用于帮助实现顺序消费消息 。 顺序消息是通过 ConsumeMessage-OrderlyService 类实现的 ,主要流程和 ConsumeMessageConcurrentlyService 类似 ,区别只是在对并发消费的控制上 。

11.3 生产者消费者的底层类 

         MQClientinstance 是客户端各种类型的 Consumer 和 Producer 的底层类。这个类首先从 NameServer 获取并保存各种配置信息,比如 Topic 的 Route 信息 。 同时 MQClientlnstance 还会通过 MQClientAPIImpl 类实现消息的收发,也就是从 Broker 获取消息或者发送消息到 Broker 。既然 MQClientinstance 实现的是底层通信功能和获取并保存元数据的功能,就没必要每个 Consumer 或 Producer 都创建一个对象,一个 MQClientlnstance对象可以被多个 Consumer 或 Producer 公用 。 RocketMQ 通过一个工厂类达到共用 MQClientlnstance 的目的 。 

// org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start L737
//    org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start L915
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

        注意, MQClientlnstance 是通过工厂类被创建的,并不是一个单例模式,有些情况下需要创建多个实例。 

    public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        // clientld 的 格式是“ clientlp ”+ @ +“ InstanceName ” InstanceName  有默认值
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }

        普通情况下, 一个用到 RocketMQ 客户端的 Java 程序,或者说一个JVM 进程只要有 一个 MQClientlnstance 实例就够了 。 这时候创建一个或 多 个Consumer 或者 Producer , 底层使用的是同一个 MQClientlnstance 实例。

        在 quick start 文档中创建一个 DefaultMQPushConsumer 来接收消息,没有设置这个 Consumer 的 InstanceName 参数(通过 setlnstanceName 函数进行设置), 这个时候 InstanceName 的值是默认的 “ DEFAULT ” 。 实际创建 的MQClientlnstance 个数由设定的逻辑进行控制 。

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();

                this.copySubscription();
                // MQC!ientlnstance 个数由设定的逻辑
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }

                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

                if (this.pullAPIWrapper == null) {
                    this.pullAPIWrapper = new PullAPIWrapper(
                        mQClientFactory,
                        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                }
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();

                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                    //POPTODO reuse Executor ?
                    this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                    //POPTODO reuse Executor ?
                    this.consumeMessagePopService =
                        new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                this.consumeMessageService.start();
                // POPTODO
                this.consumeMessagePopService.start();

                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
    }
    public void changeInstanceNameToPID() {
        if (this.instanceName.equals("DEFAULT")) {
            this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
        }
    }

        从 InstanceName 的创建逻辑就可以看出,如果创建 Consumer 或者 Producer类型的时候不手动指定 InstanceName ,进程中只会有一个 MQClientlnstance对象。

        有些情况下只有一个 MQClientlnstance 对象是不够的,比如一个 Java 程序需要连接两个 RoceketMQ 集群 ,从一个集群读取消息,发送到另一个集群, 一个 MQClientlnstance 对象无法支持这种场景 。 这种情况下一定要手动指定不同的 InstanceName ,底层会创建两个 MQClientlnstance 对象。

        11.3.2 MQClientlnstance 类的功能 

        首先来看一下 MQClientlnstance 类的 Start 函数,从 Start 函数中的逻辑能大致了解 MQClientlnstance 类的功能 

        Start 函数中的 MQClientAPIImpl 对 象用来负责底层消息通信 , 然后启动pullMessageService 和 rebalanceService 。在类的成员变量中,用 topicRouteTable 、brokerAddrTable 等来存储从 NameServer 中获得的集群状态信息,并通过一个Scheduled Task 来维护这些信息 。

org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask 

        从代码中可以看出, MQClientlnstance 会定时进行如下几个操作:获 取NameServer 地址 、 更新 TopicRoute 信息 、清理离线的 Broker 和保存消费者的Offset 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/434016.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Qt Quick - SplitView

Qt Quick - SplitView使用总结 一、概述二、属性介绍三、简单使用四、序列化SplitView的状态五、定制化 一、概述 SplitView是一个水平或垂直布局元素的控件&#xff0c;每个元素之间有一个可拖动的分配页面内容的滑块。很像IDE里面的那些窗口。就像下面的简单的布局内容一样。…

调度系统: Quartz

最近在做数据中台架构设计&#xff0c;整体架构设计完后发现数据中台最重要的就是元数据和调度系统。元数据设计参考了atlas、metcat、datahus&#xff0c;模型设计、数据架构、技术架构基本完成。现在设计调度系统&#xff0c;才发现调度系统不像别的系统&#xff0c;主要是理…

缓存与数据库双写一致性几种策略分析

作者&#xff1a;京东零售 于泷 一、背景 在高并发场景中&#xff0c;为防止大量请求直接访问数据库&#xff0c;缓解数据库压力&#xff0c;常用的方式一般会增加缓存层起到缓冲作用&#xff0c;减少数据库压力。引入缓存&#xff0c;就会涉及到缓存与数据库中数据如何保持一…

春天到了,讲讲Spring的工作原理

一、春天到了&#xff0c;讲讲Spring的工作原理 在致力于优质IT知识出版分享的异步社区&#xff0c;有这么一本书——两版累计销售了近10w本&#xff0c;它可是完完全全靠着自己过硬的内容实力打出的这片天&#xff01; 第二版已出版4年&#xff0c;基于Spring 5.x编写&#x…

PostGre数据库操作

菜鸟教程 PostgreSQL 教程 | 菜鸟教程PostgreSQL 教程 PostgreSQL 是一个免费的对象-关系数据库服务器(ORDBMS)&#xff0c;在灵活的BSD许可证下发行。 PostgreSQL 开发者把它念作 post-gress-Q-L。 PostgreSQL 的 Slogan 是 “世界上最先进的开源关系型数据库”。 参考内容&a…

新库上线 | CnOpenData中国汽车能源消耗量数据

中国汽车能源消耗量数据 一、数据简介 工业和信息化部组织制定的《乘用车燃料消耗量限值》强制性国家标准&#xff08;GB19578-2021&#xff09;于2021年7月1日起正式实施&#xff0c;该标准规定了燃用汽油或柴油燃料、最大设计总质量不超过3500kg的M1类车辆在今后一段时期的燃…

如何开启tiktok之旅

关于tiktok的用户规模&#xff0c;相比国内抖音而言的机会这里就不再多说了&#xff0c;我之所以研究tiktok&#xff0c;是因为有不少客户咨询了我们tiktok加速方案&#xff0c;我们自身是一家纯网络公司&#xff0c;只提供tiktok加速方案而已&#xff0c;但是遭不住需求量大。…

【设计模式】Java 的三种代理模式

文章目录 一、前言二、正文1、静态代理2、动态代理3、Cglib代理Spring中AOP使用代理 三、总结 一、前言 代理(Proxy)模式是一种结构型设计模式&#xff0c;提供了对目标对象另外的访问方式&#xff1b;即通过代理对象访问目标对象。 这样做的好处是&#xff1a;可以在目标对…

什么是转化率优化(CRO)?网站转化率不高,可以看看这篇文章

你是否将人们带到你的网站&#xff0c;但只是让他们中的一小部分人完成了该页面的目标&#xff1f;你可以每天有成千上万的网站访问者到达。但如果你的网站没有设置成鼓励转换&#xff0c;你就不会说服网站访问者去做。这使得他们的整个访问几乎毫无价值&#xff0c;特别是如果…

MySQL-中间件mycat(三)

目录 &#x1f341;高可用方案 &#x1f341;安装配置 HAProxy &#x1f342;安装 HAProxy &#x1f342;启动验证 &#x1f341;配置 Keepalived &#x1f342;安装 Keepalived &#x1f342;修改配置文件 &#x1f342;启动验证 &#x1f342;测试高可用 &#x1f341;mycat …

经典transformer视觉模型总结

Vision Transformer 模型 ViT: AN IMAGE IS WORTH 16X16 WORDS: TRANSFORMERS FOR IMAGE RECOGNITION AT SCALE 是 2020 年 Google 团队提出的将 Transformer 应用在图像分类的模型。 ViT 在 Transformer 架构的视觉模型的地位类似 ResNet 模型。因为其模型“简单”且效果好,可…

Doris单机版安装和初步使用

参考官方文档 https://doris.apache.org/zh-CN/docs/dev/get-starting/ 下载安装包 下载 - Apache Doris Index of /apache/doris/1.2/1.2.2-rc01 前置修改 #修改 /etc/security/limits.conf, 执行命令 vim /etc/security/limits.conf #添加以下 * soft nofile 204800 *…

【模电实验】基尔霍夫定律、叠加定理和戴维南定理验证实验

实验目的 验证基尔霍夫电流定律&#xff08;KCL&#xff09;和电压定律&#xff08;KVL&#xff09;加深对该定理的理解验证叠加定理&#xff0c;加深对该定理的理解验证戴维南定理&#xff0c;掌握有源二端口网络的开路电压&#xff0c;短路电流和入端等效电阻的测定方法通过实…

Pod探针解析及实战(k8s)

一、探针类型 1.1livenessProbe存活探针 用于判断容器是否存活&#xff08;running状态&#xff09;&#xff0c;如果LivenessProbe探针探测到容器不健康&#xff0c;则kubelet杀掉该容器&#xff0c;并根据容器的重启策略做相应的处理。如果一个容器不包含LivenessProbe探针…

cmake创建windows工程编译环境

1.1 为什么需要CMake 你或许听过好几种 Make 工具&#xff0c;例如 GNU Make &#xff0c;QT 的 QMake &#xff0c;微软的 MS NMake&#xff0c;BSD PMake&#xff0c;Makepp等等。这些 Make 工具遵循着不同的规范和标准&#xff0c;所执行的 Makefile 格式也千差万别。这样就…

ubuntu虚拟机增加磁盘后,虚拟机内部应该如何分配对应空间

fdisk -l 输入命令 parted /dev/sda 输入命令 unit s 设置Size单位&#xff0c;方便追加输入 输入命令 p free 查看详情 输入命令 resizepart 3 追加容量到sda3 输入命令 83886046s 空闲容量区间Free Space结束位置 输入命令 q 退出 输入命令 pvresize /dev/sda3 更新pv物…

【计算机网络】Linux 系统是如何收发网络包的?

【计算机网络】Linux 系统是如何收发网络包的&#xff1f; 文章目录 【计算机网络】Linux 系统是如何收发网络包的&#xff1f;网络模型Linux 网络协议栈Linux 接收网络包的流程Linux 发送网络包的流程总结 网络模型 为了使得多种设备能通过网络相互通信&#xff0c;和为了解决…

空格在科技类文章的排版中对于阅读体验的影响

© 2018 sparanoid © 2018-2023 Conmajia 第一部分援引自《中文文案排版指北》 研究显示&#xff0c;打字的时候不喜欢在中文和英文之间加空格的人&#xff0c;感情路都走得很辛苦&#xff0c;有七成的比例会在 34 岁的时候跟自己不爱的人结婚&#xff0c;而其余三成的…

分布式锁-Redisson

分布式锁 1、分布式锁1.1 本地锁的局限性1.1.1 测试代码1.1.2 使用ab工具测试(单节点)1.1.3 本地锁问题演示(集群情况) 1.2 分布式锁实现的解决方案1.3 使用Redis实现分布式锁(了解即可)1.3.1 编写代码1.3.2 压测 1.4 使用Redisson解决分布式锁1.4.1 实现代码1.4.1 压测1.4.2 可…

DS1302

DS1302时钟芯片简介 DS1302是DALLAS公司推出的涓流充电时钟芯片&#xff0c;内含一个实时时钟/日历和31字节静态RAM&#xff0c;可以通过串行接口与单片机进行通信。实时时钟/日历电路提供秒、分、时、日、星期、月、年的信息&#xff0c;每个月的天数和闰年的天数可自动调整&a…