前言
这篇笔记记录consumer启动的逻辑
consumer主要是负责去broker中拉取消息,然后将拉取到的消息,交给消费者去处理
consumer本质上也是一个netty客户端,所以,在启动的时候,和producer有很多相似的点,但是有1个区别点:
1、producer的负载均衡是在真正去发送消息的时候,通过轮询的方式,选择其中的一个messageQueue;但是consumer是在启动的时候,通过负载均衡策略去分配consumerQueue
相同点:
1、producer和consumer对于broker来说,都说客户端,所以在启动时,都会启动nettyClient,nettyClient负责向broker发送消息、发送拉取消息的请求
源码
consumer的启动,入口层代码也是比较简单的,如图1所示
- 需要初始化一个consumer对象,这里的consumer对象和consumerGroup是存在一个映射关系的
- 然后订阅对应的topic即可,这里在订阅topic的时候,同时会把subExpression的信息存入到内存中,方便在后面接收到消息之后进行过滤
- 接着注册回调方法,这里的回调方法是真正处理消息的代码
- 然后调用start()方法启动即可
图1:
在其start()方法中,我们只关心消费者启动的代码,消息轨迹相关的,暂时先不关注,如图2所示
图2:
在start()方法中,会根据当前serviceState的状态,分别进行处理
在create_just的逻辑中,我目前已经看过,看懂的,都备注在注释中了,可以参考看下,如图3所示
- 就是会根据消费者注册的回调方法,生成不同的service,就是并行消费和顺序消费两个service
- 这里还会调用mQClientFactory.registerConsumer,去注册信息,这里是把group信息和consumer信息放到了内存中的一个map集合,在后面进行负载均衡和拉取消息的时候,会用到
需要核心关注的,还是mqClientFactory.start()方法
图3:
这里的mqClientFactory,之前在记录producer的笔记时,有说过,这里是共用的一个方法,但是在,消费者启动的时候,有两个方法是比较关键的,已经圈了出来;这里我们只需要知道,这两个service分别是用来去拉取消息、负载均衡的,后面单独起博客去学习
简单的记录下,pullMessageService中会启动一个线程,在run方法中,通过while判断,只有consumer没有stop,就一直去broke拉取消息
RebalanceService中:会根据负载均衡策略,给当前消费者分配messageQueue,因为一个topic有可能会有多个消费者,此时,多个消费者构成了消费者组,此时,多个消费者会分摊所有的messageQueue
总结
consumer端启动的时候,逻辑相对而言,比较简单,并且大部分是和producer共用的,这里需要着重关注的是,负载均衡和拉取消息的service,具体这两个service,后面会拆开详细记录,这篇就不再累述了