目录
一、前言
二、消费者启动(consumer start)流程
1、RocketMQPushConsumer初始化
1.1、InitializingBean的afterPropertiesSet() 实现
1.2、RocketMQPushConsumer初始化
2、DefaultMQPushConsumer#start()逻辑
3、defaultMQPushConsumerImpl.start()逻辑
3.1、预设置服务状态
3.2、校验配置信息
3.3、复制订阅关系
3.4、消息模式是否为集群通信
3.5、初始化MQClientInstance
3.6、初始化Consumer端负载均衡核心类RebalanceImpl的字段
3.7、指定Pull模式API包装器
3.8、指定消费进度(偏移量)
3.9、创建消息消费服务
3.10、注册到代理服务器Broker
3.11、启动MQClientInstance,下面分析
3.12、从名字服务器nameServer中获取监听的topic路由信息,若变更则修改
3.13、检查消费者是否注册到代理服务器broker中
3.14、向所有代理服务器broker发送心跳信息、并上传FilterClass的源文件
3.15、立即唤醒ReBalance服务线程
4、总结
一、前言
上一篇文章我们分析了消息中间件RocketMQ是如何集成到SpringBoot的,主要基于SpringBoot的自动装配机制来集成的。主要的工作就是完成了生产者、发送消息模板、监听器容器等Bean的装配,监听器容器的启动中涉及到一部分流程就是我们本篇博客要讲解的内容。从上一篇文章我们不用SpringBoot项目的例子中也可以看到我们是要启动消费者的,故这里我们将探究如下内容:RocketMQPushConsumer初始化时做了什么、其start()方法又做了什么?DefaultMQPushConsumerImpl.start()呢?
RocketMQ提供了两种消息消费模式,一种是pull拉模式(手动挡),一种是push模式(自动挡)。
Pull模式:
DefaultMQPullConsumer主动的从Broker拉取消息,主动权由应用控制,可以实现批量的消费消息。Pull方式取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,也可以自定义与控制offset位置。
优势:consumer可以按需消费,不用担心自己处理能力,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适消息延迟与忙等。
缺点:由于主动权在消费方,消费方无法及时获取最新的消息。比较适合不及时批处理场景。
注意:在2022之后版本中它将被移除,取而代之的是DefaultLitePullConsumer。
Push模式:(推荐、默认使用)
Consumer消费的一种类型,应用不需要主动调用Consumer的拉消息方法,在底层已经封装了拉取的调用逻辑,在用户层面看来是broker把消息推送过来的,其实底层还是consumer去broker主动拉取消息。
二、消费者启动(consumer start)流程
1、RocketMQPushConsumer初始化
1.1、InitializingBean的afterPropertiesSet() 实现
@Override
public void afterPropertiesSet() throws Exception {
this.initRocketMQPushConsumer();
this.messageType = this.getMessageType();
log.debug("RocketMQ messageType: {}", this.messageType.getName());
}
DefaultRocketMQListenerContainer上一节已经分析了装配,它里面维护监听器及其注解、DefaultMQPushConsumer 、两个顺序消费与并发消费内部类等。实现了Spring里面的InitializingBean接口,Spring在完成属性设置后就会执行afterPropertiesSet()方法,该方法中完成DefaultMQPushConsumer 以及messageType初始化。
1.2、RocketMQPushConsumer初始化
private void initRocketMQPushConsumer() throws MQClientException {
// 非空校验,如果空抛异常
Assert.notNull(this.rocketMQListener, "Property 'rocketMQListener' is required");
Assert.notNull(this.consumerGroup, "Property 'consumerGroup' is required");
Assert.notNull(this.nameServer, "Property 'nameServer' is required");
Assert.notNull(this.topic, "Property 'topic' is required");
RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(this.applicationContext.getEnvironment()
, this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
// 默认启用消息轨迹
boolean enableMsgTrace = this.rocketMQMessageListener.enableMsgTrace();
if (Objects.nonNull(rpcHook)) {
this.consumer = new DefaultMQPushConsumer(this.consumerGroup, rpcHook, new AllocateMessageQueueAveragely()
, enableMsgTrace, this.applicationContext
.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
this.consumer.setVipChannelEnabled(false);
this.consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, this.consumerGroup));
} else {
// 观察控制台调用了这里
log.debug("Access-key or secret-key not configure in " + this + ".");
this.consumer = new DefaultMQPushConsumer(this.consumerGroup, enableMsgTrace, this.applicationContext
.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
String customizedNameServer = this.applicationContext.getEnvironment()
.resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
if (customizedNameServer != null) {
this.consumer.setNamesrvAddr(customizedNameServer);
} else {
this.consumer.setNamesrvAddr(this.nameServer);
}
if (this.accessChannel != null) {
this.consumer.setAccessChannel(this.accessChannel);
}
// 最大消费线程数
this.consumer.setConsumeThreadMax(this.consumeThreadMax);
// 最小消费线程数
if (this.consumeThreadMax < this.consumer.getConsumeThreadMin()) {
this.consumer.setConsumeThreadMin(this.consumeThreadMax);
}
// 消费超时时间
this.consumer.setConsumeTimeout(this.consumeTimeout);
this.consumer.setInstanceName(this.name);
// 消息模式,默认集群模式
switch(this.messageModel) {
// 广播模式
case BROADCASTING:
this.consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
break;
// 集群模式
case CLUSTERING:
this.consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
}
switch(this.selectorType) {
case TAG:
this.consumer.subscribe(this.topic, this.selectorExpression);
break;
case SQL92:
this.consumer.subscribe(this.topic, MessageSelector.bySql(this.selectorExpression));
break;
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}
// 消费模式,默认并发模式
switch(this.consumeMode) {
// 顺序消费
case ORDERLY:
this.consumer.setMessageListener(new DefaultRocketMQListenerContainer.DefaultMessageListenerOrderly());
break;
// 并发消费
case CONCURRENTLY:
this.consumer.setMessageListener(new DefaultRocketMQListenerContainer.DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
if (this.rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener)this.rocketMQListener).prepareStart(this.consumer);
}
}
首先进行非空校验,如果空抛异常。然后调用RocketMQPushConsumer的构造方法初始化一些字段,同时设置最大消费线程数、最小消费线程数、消息通信模式、消息选择器以及消息消费模式等
2、DefaultMQPushConsumer#start()逻辑
DefaultRocketMQListenerContainer中封装了DefaultMQPushConsumer并完成了初始化,然后调用其start()方法来到下面处理逻辑:
该启动方法主要做两件大事:
- 调用defaultMQPushConsumerImpl.start(),DefaultMQPushConsumer的内部实现,DefaultMQPushConsumer把大多数功能都委托给它。
- 由DefaultMQPushConsumer初始化时默认启用消息轨迹,故会传入namesrvAddr和 accessChannel启动消息轨迹跟踪消息。涉及到生产者发送消息、消息通信、执行器等,消息生产与消费都会用到,流程也不小后面看下时间是否允许补充篇文章分析下。
3、defaultMQPushConsumerImpl.start()逻辑
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}",
this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
// 1、将消费者服务状态预设置为 "启动失败"
this.serviceState = ServiceState.START_FAILED;
// 2、校验一堆配置,例如:consumerGroup配置规则、消息传播方式不能为null(默认为集群消费--CLUSTERING)、并发消费线程数量。
this.checkConfig();
// 3、copy订阅关系,监听重投队列%RETRY%TOPIC。
this.copySubscription();
// 4、如果消息通信方式是集群模式,将消费者实例的name 修改为PID
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 5、初始化MQ客户端连接工厂,此处的MQClientManager使用了饿汉式单例模式
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this
.defaultMQPushConsumer, this.rpcHook);
// 6、初始化Consumer端负载均衡核心类RebalanceImpl的字段
// 设置消费组
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
// 设置消息通信模式
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
// 消息队列分配策略算法(默认为:消息队列的平均分配算法),指定如何将消息队列分配给每个使用者客户端。
this.rebalanceImpl.setAllocateMessageQueueStrategy(this
.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
// 设置MQClient工厂
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 7、指定Pull模型请求包装器
if (this.pullAPIWrapper == null) {
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
}
// 注册消息过滤钩子
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// 8、指定消费进度(偏移量)
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 广播模式offset保存在本地
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this
.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
// 集群模式offset保存在服务器
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this
.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
// 9、创建消费服务
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
// 顺序消费
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this,
(MessageListenerOrderly) this.getMessageListenerInner());
//POPTODO reuse Executor ?
this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this,
(MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
// 并行消费
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this,
(MessageListenerConcurrently) this.getMessageListenerInner());
//POPTODO reuse Executor ?
this.consumeMessagePopService =
new ConsumeMessagePopConcurrentlyService(this,
(MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
// POPTODO
this.consumeMessagePopService.start();
// 向broker注册自己(consumer)
boolean registerOK = mQClientFactory.registerConsumer(this
.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
// 将上面启动的consumeMessageService关闭掉
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
// 将consumer的状态修改为 "运行中"
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 从nameServer中获取监听的topic路由信息,若变更则修改
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
// 检查消费者是否注册到broker中
this.mQClientFactory.checkClientInBroker();
// 向所有broker发送心跳信息、并上传FilterClass的源文件
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 唤醒ReBalance服务线程
this.mQClientFactory.rebalanceImmediately();
}
ServiceState是一个枚举常量: CREATE_JUST服务刚刚创建,不是启动;RUNNING服务运行;SHUTDOWN_ALREADY服务关闭;START_FAILED服务启动失败。serviceState作为DefaultMQPushConsumerImpl的字段,初始化时为CREATE_JUST服务刚刚创建,不是启动状态。这时start()方法逻辑有点多,下面分步分析:
3.1、预设置服务状态
将消费者服务状态先设置为 "启动失败",在下面的case有可能处理
3.2、校验配置信息
校验一堆配置,例如:consumerGroup配置规则、消息通信方式不能为null(默认为集群消费--CLUSTERING)、并发消费线程数量。校验不通过则抛异常结束start()。
3.3、复制订阅关系
将订阅关系put维护到rebalanceImpl的subscriptionInner字段(map结构:key为topic主题,value为SubscriptionData订阅数据),在真正做负载平衡(doRebalance)时需用到这份数据
3.4、消息模式是否为集群通信
如果消息通信方式是集群模式,将消费者实例的name 修改为PID
3.5、初始化MQClientInstance
初始化MQ客户端连接工厂,此处的MQClientManager使用了饿汉式单例模式
3.6、初始化Consumer端负载均衡核心类RebalanceImpl的字段
设置RebalanceImpl的字段如:消费组、消息通信模式、消息队列分配策略算法【(默认为:消息队列的平均分配算法),指定如何将消息队列分配给每个使用者客户端。】、MQClient工厂。
3.7、指定Pull模式API包装器
指定Pull模式API包装器,然后注册消息过滤钩子。
3.8、指定消费进度(偏移量)
OffsetStore偏移存储接口有两个实现类,一个是本地存储实现LocalFileOffsetStore,另一个是远程存储实现RemoteBrokerOffsetStore。由于初始化时默认的消息通信模式是集群,故会使用RemoteBrokerOffsetStore的加载方法load()。
3.9、创建消息消费服务
ConsumeMessageService也有4个实现类,但可以归为两大类一个是顺序消费,另一个是并发消费。由于初始化时默认使用并发消费,故会初始化下面的两个并发消费服务,并调用其start()方法。至于里面的内容,我们后面另出文章再介绍。
3.10、注册到代理服务器Broker
如果注册失败,则重置服务状态为初始化状态,将上面启动的消费消息服务关闭掉,同时抛出异常结束掉start()。如果注册成功,则继续走下面逻辑,启动MQClientInstance下面分析,置consumer服务状态为 "运行中"。
3.11、启动MQClientInstance,下面分析
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
// 如果nameserver地址为空,会去`http:// + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP`获取,
// WS_DOMAIN_NAME由配置参数rocketmq.namesrv.domain设置,WS_DOMAIN_SUBG由配置参数rocketmq.namesrv.domain.subgroup设置
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
// 开启请求和响应通道,即远程网络通信服务,生产者和消费者客户端处理消息发送和消费的API。
this.mQClientAPIImpl.start();
// Start various schedule tasks
/*
* 1.定时2min拉取最新的nameServer信息
* 2.默认定时30秒拉取最新的broker和topic路由信息(可配置)
* 3.默认定时30s向broker发送心跳包(可配置)
* 4.默认定时5s持久化consumer的offset(可配置)
* 5.定时1分钟,动态调整线程池线程数量
*/
this.startScheduledTask();
// Start pull service
// 启动消息拉取服务
this.pullMessageService.start();
// Start rebalance service
// 启动负载均衡服务
this.rebalanceService.start();
// Start push service
// 启动producer消息推送服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
主要逻辑:
- 开启请求和响应通道,即远程网络通信服务,生产者和消费者客户端处理消息发送和消费的API。
- 启动定时任务,如拉取最新的nameServer信息、拉取最新的broker和topic路由信息、向broker发送心跳包、持久化consumer的offset、动态调整线程池线程数量
- 开启消息拉取服务,客户端主动向代理服务器Broker拉取消息
- 启动负载均衡服务,该功能是Consumer端的负载均衡
3.12、从名字服务器nameServer中获取监听的topic路由信息,若变更则修改
3.13、检查消费者是否注册到代理服务器broker中
3.14、向所有代理服务器broker发送心跳信息、并上传FilterClass的源文件
3.15、立即唤醒ReBalance服务线程
4、总结
- DefaultRocketMQListenerContainer上一节已经分析了装配,它里面维护监听器及其注解、DefaultMQPushConsumer 、两个顺序消费与并发消费内部类等。实现了Spring里面的InitializingBean接口,Spring在完成属性设置后就会执行afterPropertiesSet()方法,该方法中完成DefaultMQPushConsumer 以及messageType初始化。
- DefaultMQPushConsumer,在大多数情况下,这是最推荐使用消息的类。从技术上讲,这个推客户端实际上是底层拉服务的包装器。具体来说,在从代理提取的消息到达时,它粗略地调用已注册的回调处理程序来提供消息。它是线程安全的类。
- DefaultMQPushConsumerImpl是RocketMQ的内部实现。DefaultMQPushConsumer封装了它,并将大多数功能都委托给它。其start()方法逻辑主要为复制订阅关系、初始化MQClientInstance、初始化Consumer端负载均衡核心类RebalanceImpl的字段、创建消息消费服务并启动、启动MQClientInstance
- MQClientInstance的底层涉及到RocketMQ的远程通信模块,其start()方法中的逻辑开启了启动请求响应通道MQClientAPIImpl(涉及远程通信模块)、启动定时任务【如拉取最新的nameServer信息、拉取最新的broker和topic路由信息、向broker发送心跳包、持久化consumer的offset、动态调整线程池线程数量】、向代理服务器Broker拉取消息服务PullMessageService、Consumer客户端负载均衡服务RebalanceService