前言
上一篇博客,记录的是push模式,异步发送netty请求拉取消息的代码,这篇博客主要记录consumer发送同步netty请求,去拉取消息的逻辑,但是对于同步发送请求,需要结合LitePullConsumer来看
在Lite PullConsumer中有两种方式,分别是:subscribe和assign模式,这两种模式的区别,我的理解是:前者是mq帮我们进行负载均衡,后者我们可以按照自己的需求去进行负载均衡,给当前消费者分配messageQueue
但是这两种模式的相同点是:都是采用的pull模式,需要在消费者这一端,主动的去pull消息
对于使用同步发送请求的consumer,需要这样使用
所以可以看到,对于pull模式,就是在业务代码中自己去poll的,接着我们来看源码
源码
start()
org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#start
如图1所示:在这个start()方法中,有两个方法需要着重关注下,就是mqClientFactory.start()和operateAfterRunning()
前面这个方法已经看了好多次了,后面这个方法是assign模式的时候,会取启动task任务,这个后面再说
图1:
我们先来看mqClientFactory.start(),和这篇博客有关系的,是这里面负载均衡的逻辑
这个调用链中间的逻辑,比较简答,就不介绍了,我们主要关注和pull模式拉取消息有关的逻辑
this.rebalanceService.start();
org.apache.rocketmq.client.impl.consumer.RebalanceService#run
org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance
org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#doRebalance
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
在这里进行负载均衡的时候,区分了集群模式和广播模式,至于如何进行负载均衡,后面会单独起一篇博客记录,这里只关系和pull模式拉取消息有关的逻辑
我们会发现,不管是集群模式,还是广播模式,都会调用中两个方法
第一个方法updateProcessQueueTableInRebalance 是更新当前consumer所需要处理的messageQueue
第二个方法messageQueueChanged,对于push模式,暂时看懂处理逻辑,但是对于pullLite模式,有很重的一个步骤
对于litePull模式,在这个方法中会调用updatePullTask()这个方法,这个方法,是启动了task任务,这个task任务是为了拉取消息的
这里在update的时候,可以看到,会先从内存中taskTable中取任务,如果是刚启动,这里肯定是null,所以关键的逻辑在startPullTask()中
可以看到,这里是根据messageQueue进行遍历,初始化pullTask对象,如果pullTask对象不在内存中,就先设置到内存中,然后再启动task任务;这里为什么只需要调用schedule调用一次呢?我们知道,拉取消息的话,是一个持久的逻辑,就是需要不停的去调用,那就不用想了,肯定是在task中,会再次调用schedule去拉取消息
前面铺垫了这么多,就是为了讲清楚这里的task任务是在什么时候启动的,现在我们可以看到,是在负载均衡之后,根据最新的负载均衡结果,每个messageQueue对应一个task,然后启动task,去拉取消息
对于PullTaskImpl的run()方法,不贴全部的代码了,看关键逻辑
这里圈起来的四块代码,是四块比较关键的逻辑
- 这里的pull请求,就是取发起同步netty请求的地方
- 第二部分是针对同步返回的pullResult进行处理
- 第三部分是更新当前messageQueue的offset,表示已经处理到哪里了,下次去拉取消息时会用到
- 重新调度当前task任务,继续拉取消息
pull() 发起同步netty请求
这里只需要一张截图,就全说明白了,如果看过前面一篇push模式的博客,就会发现,对于pull模式,这里设置的sync 同步模式发送请求
submitConsumeRequest
这里也是只需要一张截图,在从broker获取到返回参数之后,这里会把消息取出来,然后包装成ConsumeRequest对象;我们下面 消费者poll()拉取消息 这一节会看到,消费者去拉取消息的时候,是通过poll()方法拉取的,在底层,就是从这个cache中获取的
第三点和第四点就不说了,比较简单,没有什么特殊逻辑
operateAfterRunning()
前面在消费者启动的时候,除了调用mqClientFactory.start(),还会operateAfterRunning()调用这个方法
对于assign模式,这里会调用updateAssignPullTask();这个方法很重要,也是启动拉取消息任务的地方
对于updateAssign方法,也一样,会去调用startPullTask方法
但是在调用之前,会根据入参的mqNewSet进行remove的操作,也就是说,如果现在consumer处理的是messageQueue1、messageQueue2;但是现在consumer自己分配了messageQueue之后,可能consumer只处理messageQueue1,那在这里,就会把messageQueue2对应的task给停掉,因为这里把cancelled属性设置为了true
这里对于assign模式,一直有个疑问,明明在启动负载均衡策略的时候,会启动一次task,为什么还会接着再判断一遍呢?因为我感觉负载均衡逻辑执行了之后,已经启动了task,这里再判断应该也不会再启动一遍,因为task已经在taskTable中了
但是直到我看到了assign模式的使用方法之后,我明白了
这里可以看到,在start之后,assign模式,会自己再去根据自己的逻辑,去计算当前消费者要处理哪些messageQueue,然后调用assign方法更新
这里可以看到,在consumer自己重新计算分配了messageQueue之后,会调用assign方法更新consumer所需要处理的messageQueue,并且会对task任务进行一遍重新处理
消费者poll()拉取消息
这里poll的逻辑也很简单,就是从一个blockingQueue中拉取request信息,然后从request对象中获取到当前那要处理的消息,把解析到的消息,返回给consumer;
在前面 submitConsumeRequest 这一节中,有看到消息是怎么放到consumeRequestCache中的,所以对于pull的模式,就是这么简单