文章目录
- 一、消息消费者模式
- 二、消费者启动流程
- 1、 push模式
- 1)类关系
- 2)类构造器
- 3)启动流程
- 2、pull模式
- 1)类关系
- 2)类构造器
- 3)启动流程
一、消息消费者模式
消息消费分两种模式:推(push)和拉(pull)
消费由接口MQConsumer
定义,由两个方法构成,代码如下:
/**
* Message queue consumer interface
*/
public interface MQConsumer extends MQAdmin {
/**
* If consuming failure,message will be send back to the broker,and delay consuming some time
*/
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
/**
* Fetch message queues from consumer cache according to the topic
*
* @param topic message topic
* @return queue set
*/
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException;
}
二、消费者启动流程
1、 push模式
1)类关系
push模式接口定义为MQPushConsumer
,其继承自MQConsumer
接口,默认实现类DefaultMQPushConsumer
,类关系如下图:
2)类构造器
DefaultMQPushConsumer
类核心构造器如下
/**
* Constructor specifying namespace, consumer group, RPC hook and message queue allocating algorithm.
*
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
//消费者所属组
this.consumerGroup = consumerGroup;
//生产者实例的命名空间
this.namespace = namespace;
//消息队列分配算法
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
//push模式消费内部实现
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
参数说明:
- consumerGroup :消费者所属组
- namespace :生产者实例的命名空间
- allocateMessageQueueStrategy :消息队列分配算法
- defaultMQPushConsumerImpl :push模式消费内部实现
其他类成员变量
- messageModel:消费模式,有集群模式(BROADCASTING)和广播模式(CLUSTERING)两种,默认CLUSTERING
- consumeFromWhere:从消息服务器拉取不到消息时使用的重新计算消费策略,默认CONSUME_FROM_LAST_OFFSET
- consumeThreadMin:最小消费线程数,默认20
- consumeThreadMax:最大消费线程数,默认20
- pullBatchSize:每次拉取消息的条数,默认32
- postSubscriptionWhenPull:是否每次拉取消息后更新订阅信息,默认false
3)启动流程
push模式下,消费者启动通过DefaultMQPushConsumer#start
方法启动,start方法代码如下:
/**
* This method gets internal infrastructure readily to serve. Instances must call this method after configuration.
*
* @throws MQClientException if there is any client error.
*/
@Override
public void start() throws MQClientException {
//设置消费者分组
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
//启动内部消费者
this.defaultMQPushConsumerImpl.start();
//异步数据分发器
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
可以看出,消费者启动时,start方法主要主要做了3个事情:设置消费者分组、调用内部消费者实现实例start方法、如果存在则启动消息异步分发器。
让我们来看看内部消费者实现(DefaultMQPushConsumerImpl)启动时做了哪些事情。
- 1、DefaultMQPushConsumerImpl在执行start方法时,先对本实例服务的状态(serviceState)进行判断。
//服务实例状态,有:CREATE_JUST、RUNNING、SHUTDOWN_ALREADY、START_FAILED,默认CREATE_JUST
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
如果是刚创建实例时(CREATE_JUST状态),则先将附实例状态更改为START_FAILED,然后进行一些列初始化并启动操作(检查配置、复制订阅信息、设置负载均衡实现、创建并加载消息进度offsetStore、创建并启动消息消费consumeMessageService服务、向消费者工厂注册自己),最后变更为RUNNING状态。
如果是其他状态,则放弃初始化并提示。
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
//启动前先将服务状态设置为failed
this.serviceState = ServiceState.START_FAILED;
//检查配置信息
this.checkConfig();
//复制订阅信息
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
//广播模式,消息进度存储在消费端
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
//集群模式,消息进度存储在broker端
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
//根据消息是否顺序创建消费服务
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
//注册消费者
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//只会启动一次
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
- 2、更新订阅主题信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
- 3、向broker发送检查客户端命令
this.mQClientFactory.checkClientInBroker();
- 4、向所有broker发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
- 5、立即启动负载均衡服务
this.mQClientFactory.rebalanceImmediately();
至此,push模式消费者启动完成
2、pull模式
pull模式原实现DefaultMQPullConsumer
已过时,官方退出新pull模式实现类DefaultLitePullConsumer
。
1)类关系
pull模式由LitePullConsumer
接口定义,默认实现类为DefaultLitePullConsumer
,类关系如下:
2)类构造器
DefaultLitePullConsumer
类构造器代码如下:
/**
* Constructor specifying namespace, consumer group and RPC hook.
*
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
//生产者实例命名空间
this.namespace = namespace;
//消费者所属组
this.consumerGroup = consumerGroup;
//是否允许steam流请求类型
this.enableStreamRequestType = true;
//默认内部消费实现
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
参数说明:
- namespace :生产者实例的命名空间
- consumerGroup:消费者所属组
- enableStreamRequestType:是否允许steam流请求类型
- defaultLitePullConsumerImpl:pull模式默认内部消费实现
其他类成员变量
- messageModel:消费模式,默认CLUSTERING
- autoCommit:是否自动提交消息偏移量,默认true
- pullThreadNums:拉取消息线程数,默认20
- pullBatchSize:每次拉取消息的条数,默认10
- consumeFromWhere:从消息服务器拉取不到消息时使用的重新计算消费策略,默认CONSUME_FROM_LAST_OFFSET
3)启动流程
pull模式,消息启动入口为DefaultLitePullConsumer#start
方法,代码如下:
@Override
public void start() throws MQClientException {
//初始化异步数据分发器
setTraceDispatcher();
//设置消费者所属组
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
//启动内部消息消费实现
this.defaultLitePullConsumerImpl.start();
//启动异步数据分发器
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
可以看出,pull消费实现主要做了3个事情:初始化并启动消息异步数据分发器、设置消费者所属组、启动内部消息消费实现(defaultLitePullConsumerImpl)。
现在,让我们来看看内部消息消费实现(defaultLitePullConsumerImpl)
启动时做了哪些事情:
- 与push模式类似,pull模式
defaultLitePullConsumerImpl
执行start方法时,同样根据内部服务实例状态(serviceState)进行判断,如果是START_FAILED状态,则进行消费者初始化操作;如果其他状态则放弃并提示错误
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
//先设置为failed状态
this.serviceState = ServiceState.START_FAILED;
//检查配置
this.checkConfig();
if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultLitePullConsumer.changeInstanceNameToPID();
}
//初始化MQClientInstance,注册消费者实例
initMQClientFactory();
//初始化消息消费负载均衡实现
initRebalanceImpl();
//初始化消息拉取包装器,注册消息过滤HOOK
initPullAPIWrapper();
//初始化消息消费进度
initOffsetStore();
//启动MQClientInstance
mQClientFactory.start();
//定时任务更新消息队列
startScheduleTask();
//服务实例状态更新为RUNNING
this.serviceState = ServiceState.RUNNING;
log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());
//向broker发送检查客户端命令
operateAfterRunning();
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}
至此,pull模式消费者启动流程完成。