11.1 整体流程
我们使用 DefaultMQPushConsumer 的时候,一般流程是设置好 GroupName 、NameServer 地址 ,以及订阅的 Topic 名称, 然后填充Message 处理函数,最后调用 start () 。
11.1.1 上层接口类
DefaultMQPushConsumer 类在 org. apache.rocketmq. client. consumer 包中 ,这个类担任着上层接 口的角色,具体实现都在 DefaultMQPushConsumerlmpl 类中
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();
}
/**
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
*/
public DefaultMQPushConsumer(final String consumerGroup) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
}
我们常用的是最后这个构造函数,只传入一个 consumer Group 名称作为参数,这个构造函数会把 RPCHook 设为空,把负载均衡策略设置成平均策略。在构造函数的实现中 ,主要工作是创建 DefaultMQPushConsumerlmpl 对象。
11.1.2 DefaultMQPushConsumer 的实现者
DefaultMQPushConsumerlmpl 具体实 现了 DefaultMQPushConsumer 的业务逻辑, DefaultMQPushConsumerlmpl.java 在 org. apache.rocketmq.client.impl. consumer 这个包里,本节接下来从 start 方法着手分析。首先是初始化 MQClientinstance ,并且设置好 rebalance 策略和 pullAPIWraper ,有这些结构后才能发送 pull 请求获取消息。
//参见org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
// switch 的CREATE_JUST 语句块
// 初始化MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 初始化 PullAPIWrapper
if (this.pullAPIWrapper == null) {
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
}
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// 确定 OffsetStore 。 OffsetStore 里存储的是当前消费者所消费的消息
在队列中的偏移量
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
根据消费消息方式的不同 , OffsetStore 的类型也不同。 如果是 BROADCASTING模式,使用的是 LocalFileOffsetStore, Offset 存到本地;如果是 CLUSTERING模式,使用的是 RemoteBrokerOffsetStore, Offset 存到 Broker 机器上。
//然后是初始化 consumeMessageService ,根据对消息顺序需求的不同,使用不同的 Service 类型
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
//POPTODO reuse Executor ?
this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
//POPTODO reuse Executor ?
this.consumeMessagePopService =
new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
// POPTODO
this.consumeMessagePopService.start();
最后调用 MQClientlnstance 的 start 方法 ,开始获取数据。
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
11.1.3 获取消息逻辑
获取消息的逻辑实现在 public void pullMessage ( finalPullRequestpullRequest) 函数中,这是一个很大的函数,前半部分是进行一些判断, 是进行流量控制的逻辑(见代码清单 11-5 );中间是对返回消息结果做处理的逻辑 ;后面是发送获取消息请求的逻辑。
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 检查个数
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
// 检查总大小
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
通过判断未处理消息的个数和总大小来控制是否继续请求消息。 对于顺序消息还有一些特殊判断逻辑。
if (!this.consumeOrderly){
}else{
// 顺序消息
}
获取的消息返回后,根据返回状态,调用相应的回调处理方法。
PullCallback pullCallback = new PullCallback() {
... ...};
// 具体判断见代码块
switch (pullResult.getPullStatus()) {
case FOUND:
... ...
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
最后是发送获取消息请求,
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
this.defaultMQPushConsumer.getPullBatchSizeInBytes(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
这三个阶段不停地循环执行 ,直到程序被停止
11.2 消息的并发处理
11.2.1 并发处理过程
处理效率的高低是反应 Consumer 实现好坏的重要指标,本节以 ConsumeMessageConcurrentlyService 类 为例来分析 RocketMQ 的实现方式。 ConsumeMessageConcurrentlyService 类在 org.apache.rocketmq.client.impl.consumer 包中 。
这个类定义了三个线程池,一个主线程池用来正常执行收到的消息,用户可以自定义通过 consumeThreadMin 和 consumeThreadMax 来自定义线程个数。另外两个都是单线程的线程池,一个用来执行推迟消费的消息,另一个用来定期清理超时消息( 15 分钟)。
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<>();
String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_";
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_" + consumerGroupTag));
}
从 Broker 获取到一批消息以后,根据 BatchSize 的 设置 ,把一批消息封装到一个 ConsumeRequest 中 ,然后把这个 ConsumeRequest 提交到consumeExecutor 线程池中执行 。
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
消息的处理结果可能有不同的值,主要 的两个是 CONSUME SUCCESS 和 RECONSUME LATER 。 如果消费不成功,要把消息提交到上面说的scheduledExecutorService 线程池中, 5 秒后再执行;如果消费模式是 CLUSTERING模式,未消费成功的消息会先被发送回 Broker ,供这个 ConsumerGroup 里的其他 Consumer 消费,如果发送回 Broker 失败 , 再调用阻CONSUME_LATER
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
// 消息的处理结果可能有不同的值,主要的两个是 CONSUME_SUCCESS 和 RECONSUME_LATER
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// Maybe message is expired and cleaned, just ignore it.
if (!consumeRequest.getProcessQueue().containsMessage(msg)) {
log.info("Message is not found in its process queue; skip send-back-procedure, topic={}, "
+ "brokerName={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getBrokerName(),
msg.getQueueId(), msg.getQueueOffset());
continue;
}
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
处理逻辑是用户自定义 的, 当消息量大的时候,处理逻辑执行效率的高低影 响系统的吞吐量。 可以把多条消息组合起来处理,或者提高线程数,以提高系统的吞吐量。
11.2.2 ProcessQueue 对象
在前面的源码中,有个 Process Queue 类型的对象,这个对象的功能是什么呢?从 Broker 获得的消息,因为是提交到线程池里并行执行,很难监控和控制执行状态 , 比如如何获得当前消息堆积的数量,如何解决处理超时情况等 。RocketMQ 定义了一个快照类 Process Queue 来解决这些问题,在 Push Consumer 运行的时候,每个 Message Queue 都会有一个对应的 Process Queue 对象,保存了这个 Message Queue 消息处理状态的快照。Process Queue 对象里主要 的内容是一个 TreeMap 和 一个读写锁。 TreeMap里以 Message Queue 的 Offset 作为 Key,以消息内容的引用为 Value ,保存了所有从 MessageQueue 获取到但是还未被处理的消息,读写锁控制着多个线程对 TreeMap 对象的并发访问 。
有了 ProcessQueue 对象,可以随时停止、启动消息的消费,同时也可用于帮助实现顺序消费消息 。 顺序消息是通过 ConsumeMessage-OrderlyService 类实现的 ,主要流程和 ConsumeMessageConcurrentlyService 类似 ,区别只是在对并发消费的控制上 。
11.3 生产者消费者的底层类
MQClientinstance 是客户端各种类型的 Consumer 和 Producer 的底层类。这个类首先从 NameServer 获取并保存各种配置信息,比如 Topic 的 Route 信息 。 同时 MQClientlnstance 还会通过 MQClientAPIImpl 类实现消息的收发,也就是从 Broker 获取消息或者发送消息到 Broker 。既然 MQClientinstance 实现的是底层通信功能和获取并保存元数据的功能,就没必要每个 Consumer 或 Producer 都创建一个对象,一个 MQClientlnstance对象可以被多个 Consumer 或 Producer 公用 。 RocketMQ 通过一个工厂类达到共用 MQClientlnstance 的目的 。
// org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start L737
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start L915
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
注意, MQClientlnstance 是通过工厂类被创建的,并不是一个单例模式,有些情况下需要创建多个实例。
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
// clientld 的 格式是“ clientlp ”+ @ +“ InstanceName ” InstanceName 有默认值
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
普通情况下, 一个用到 RocketMQ 客户端的 Java 程序,或者说一个JVM 进程只要有 一个 MQClientlnstance 实例就够了 。 这时候创建一个或 多 个Consumer 或者 Producer , 底层使用的是同一个 MQClientlnstance 实例。
在 quick start 文档中创建一个 DefaultMQPushConsumer 来接收消息,没有设置这个 Consumer 的 InstanceName 参数(通过 setlnstanceName 函数进行设置), 这个时候 InstanceName 的值是默认的 “ DEFAULT ” 。 实际创建 的MQClientlnstance 个数由设定的逻辑进行控制 。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
// MQC!ientlnstance 个数由设定的逻辑
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
if (this.pullAPIWrapper == null) {
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
}
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
//POPTODO reuse Executor ?
this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
//POPTODO reuse Executor ?
this.consumeMessagePopService =
new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
// POPTODO
this.consumeMessagePopService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
}
}
从 InstanceName 的创建逻辑就可以看出,如果创建 Consumer 或者 Producer类型的时候不手动指定 InstanceName ,进程中只会有一个 MQClientlnstance对象。
有些情况下只有一个 MQClientlnstance 对象是不够的,比如一个 Java 程序需要连接两个 RoceketMQ 集群 ,从一个集群读取消息,发送到另一个集群, 一个 MQClientlnstance 对象无法支持这种场景 。 这种情况下一定要手动指定不同的 InstanceName ,底层会创建两个 MQClientlnstance 对象。
11.3.2 MQClientlnstance 类的功能
首先来看一下 MQClientlnstance 类的 Start 函数,从 Start 函数中的逻辑能大致了解 MQClientlnstance 类的功能
Start 函数中的 MQClientAPIImpl 对 象用来负责底层消息通信 , 然后启动pullMessageService 和 rebalanceService 。在类的成员变量中,用 topicRouteTable 、brokerAddrTable 等来存储从 NameServer 中获得的集群状态信息,并通过一个Scheduled Task 来维护这些信息 。
org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask
从代码中可以看出, MQClientlnstance 会定时进行如下几个操作:获 取NameServer 地址 、 更新 TopicRoute 信息 、清理离线的 Broker 和保存消费者的Offset 。