文章目录
- 消费者的接收消息流程
消费者的接收消息流程
还是先把消费者接收消息的流程图贴出来,再细说代码流程:

首先先从消费者的业务调用出发
// 创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_group");
// ...
// 注册监听消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
那么我们就从 consumer.start() 进入,看一下消费者的启动逻辑,该方法的核心代码也就是:
this.defaultMQPushConsumerImpl.start();
那么进入到这个 start 方法,这里进行了一些配置以及客户端的启动:
- 通过
checkConfig()检查消费组的一些配置:名称是否符合规范、消费者的线程数、消费者的监听等等 - 之后再设置一些属性
- 通过
mQClientFactory.start()启动客户端
那么我们进入到启动客户端这个逻辑,我们猜测这里 start 之后,可能就可以进行消息的拉取了,那么在 start 这个方法中,看到了有下边这一行:
this.pullMessageService.start();
这不正是拉取消息的服务吗?点进去之后,发现就是启动了一个线程,这个线程呢就是 this,那么我们点进去这个 start 方法是定义在 ServiceThread 类中,这个类并没有定义 run 方法,因此呢,这个 run 方法应该是定义在了子类 PullMessageService 类中,点进去找到 run 方法,可以看到在 run 方法中就会不停地去 messageRequestQueue 中拉取数据:
MessageRequest messageRequest = this.messageRequestQueue.take();
既然在这里拉取数据了,那么数据是什么时候放到 messageRequestQueue 中的呢?
只需要搜一下哪里调用到了 this.messageRequestQueue.put 就可以知道了,找到之后呢,我们在这一行打个断点,再去启动生产者,就可以知道整个调用链了,

那么根据栈调用情况呢,可以发现这一行是通过 RebalanceService 的 run 方法进入的,那么这个 RebalanceService 一定是在哪里作为一个线程被启动了

那么呢,我们之前说了在启动客户端的时候,调用 this.pullMessageService.start() 启动了这个线程,那么在下一行就启动了 rebalanceService 这个线程:

因此呢,就通过 debug 的方式找到了向 messageRequestQueue 中存放消息就是在 RebalanceService 这个线程中做的


















