前言
在前面consumer启动的博客中,有说过,在启动过程中,有两个比较重要的逻辑,分别是负载均衡和拉取消息的service,这篇博客,主要记录拉取消息的service,因为前面的demo和这篇笔记中的demo,都是基于push模式来学习的,所以前面的笔记都是基于push模式的,但是最近看了下pull模式,和push模式的代码还是有点区别的,所以后面单独起一篇博客,记录pull模式的逻辑
源码
org.apache.rocketmq.client.impl.consumer.PullMessageService#run
前面有说过,拉取消息,使用的是这个service,所以我们从这个service的入口处开始看起
这里会发现,只有两行代码
- 从一个queue中拉取到一个pullRequest请求体
- 然后调用pullMessage方法
这里的这个queue很重要,这个queue中存放的是拉取消息的请求,会再调用pullMessage()方法拉取到消息,各个消费者处理完之后,再放到pullRequestQueue中一个请求,所以这里就会在本次拉取消息的请求完成之后,接着取pullRequest,再次触发拉取消息的请求
这里有一个很重要的逻辑:既然拉取消息的请求,是从pullRequestQueue中开始的,那在consumer启动之后,第一次拉取消息的pullRequest是什么放进队列里面的呢?因为这里看到只是去取数据,总要有一个地方,先放进去一个请求,才会开始拉取消息;第一次把pullRequest放到queue中,是在负载均衡分配了messageQueue之后,会构建pullRequest,然后把请求放到queue中,在后面负载均衡的service笔记中会记录
我们接着来看pullMessage()的相关逻辑:
会发现,在这个方法中,也没有太多的逻辑,就是把当前消费者信息取出来,强转成功push类型的consumer,然后调用其pullMessage()
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
在这个方法中,会有几个简单的流控判断
这里有一个pullCallBack方法,我给先暂时折叠起来了,这个方法,并不会立即执行,而是在异步拉取消息的时候,接收到broker的回调时,会通过这里的callback方法去解析返回的消息信息,所以暂时先不看
接着会通过pullKernelimpl去发起拉取消息的netty请求
org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl
在pullKernelImpl的方法里面,主要是根据brokerName获取到brokerAddr,然后构建netty请求的请求体,我们就不再贴代码了,里面的代码比较简单,并且不复杂,我们直接看发送请求的代码
org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage
在这个方法中,会先拼接拉取消息的code码
然后根据当前拉取消息的模式,调用不同的方法拉取消息
同步请求(这个后面博客单独详细说)
可以看到,同步请求,没有回调,就是等拿到response结果之后,再去处理;对于同步请求,通常和pull模式有关联
上面所讲的顺序和并行消费,我感觉只有push模式,才会有这个区分,对于pull模式的话,是consumer自己去拉取消费者的,好像没有看到区分并行、顺序消费的逻辑
所以这里的同步请求,不做过多解释,在后面讲解pull模式的时候,再详细说
同步请求的处理逻辑中,就是根据当前返回结果,构建了一个pullResult对象,然后返回
对于同步请求,在这个方法中,我们先只需要知道,当consumer发起拉取消息请求的时候,同步请求会等待返回结果,然后返回pullResult对象
异步发送请求
可以看到,异步发送请求时,这里是把回调回来的pullResult交给pullCallBack方法去处理了,所以这里当异步回调回来之后,会有pullCallBack来处理,也就是前面我说的先暂时折叠起来的方法
所以,我们要回来,去看pullCallBack的方法,在callBack的onSuccess方法中,大致我分了三个逻辑
这三个逻辑和下面图中三个红框截出来的代码一一对应
- 根据tag进行过滤,这是consumer在订阅topic的时候,可以指定过滤条件,在这里会根据tag进行一层过滤
- 将消息信息放到了processQueue中,然后通过submitConsumeRequest()方法,将请求信息交给consumer的回调方法
- 在消费者处理完成之后,会再次把pullRequest请求信息,放到队列中,继续发起下次请求,在case FOUND:这个case中的第二行代码,会设置pullRequest的nextOffset属性,这样就确保了下次不会拉取到重复的消息
这里我们看下上面截图中第二个框圈起来的 submitConsumeRequest()方法,在这个方法中,分为顺序消费和并行消费
顺序消费
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest
对于顺序消费,这里就是直接提交了一个任务到线程池中
在其task的run方法中,会先对当前messageQueue进行加锁
中间会for循环,遍历本次拉取到的消息,然后依次调用下面这个方法,去处理,这里的messageListener是consumer在启动之前,程序员自己注册的
这里我们可以看到,不管消息有多少,对于顺序消费的时候,只会有一个线程处理,并且这个线程在处理的时候,会对messageQueue加锁,只处理这个messageQueue中的消息,所以,对于一个messageQueue中的消息,一定是有序的
并行消费
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest
对于并行消费,我们可以发现,这里通过for循环去提交consumeRequest任务,可能会存在并行处理的场景,因为for循环的时候,一个消息,就提交了一个任务到线程池中
可以看到,在异步消费的时候,并没有加锁的逻辑,也不可能有加锁的逻辑,因为是多个线程,在并行的处理消息,加锁就会有问题,加锁就不是并行消费了
总结
对于消费者有两种模式:
pull和push
但是在push模式中,又分为了顺序消费和并行消费
pull模式,就是消费者的业务代码中,自己去拉取消息,处理完了,继续去拉下一批消息,也就是我们常说的,根据自己的消费能力去处理消息
但是push模式,是在定时的去拉取broke中的消息,然后就回调业务上的处理逻辑
在push模式中,底层发送netty请求,是异步发送的,在接收到broker返回的response之后,会通过callBack方法进行处理,在处理的过程中,会进行tag的过滤,最后将解析到的msg,通过业务上注册的messageListener进行回调处理
对于push模式,在本次拉取消息的结果处理完之后,会继续发起下一次拉取消息的请求
下面这个是push模式拉取到消息之后的处理逻辑