前言
这篇笔记主要记录consumer在启动过程中,负载均衡的逻辑,多个消费者组成一个消费者组,对于集群模式,同一个消费者组中的多个消费者共同消费一个topic下的所有消息,所以每个consumer可能会处理N个messageQueue,至于哪个consumer消费哪个messageQueue,是由负载均衡策略决定的
源码
在消费者启动的时候,会通过负载均衡策略,来决定当前消费者处理哪几个messageQueue,入口是:
this.rebalanceService.start();
在run()方法中,会通过while循环,每20S,去进行一次负载均衡计算
无论是pull模式,还是push模式,都会调用到
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance
这个方法,是按照topic维度,进行负载均衡
广播模式
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
在这个方法中,会先判断当前消费者的模式,是集群模式?还是广播模式
对于广播模式,无需进行负载均衡,因为广播模式,每个消费者都会消费所有的消息。也就是说,topic中的所有messageQueue,都是需要消费者去处理的
集群模式
对于集群模式,最重要的,就是这里根据负载均衡策略进行计算的逻辑
这里是根据负载均衡之后得到的结果,然后更新一些信息
这里更新的逻辑很重要,对于push模式,每个messageQueue会对应一个pullRequest请求,然后把pullRequest请求放到队列之后,线程会不停的从queue中拉取pullRequest,然后请求broker
updateProcessQueueTableInRebalance在这个方法中,就会去根据messageQueue,构建pullRequest请求,然后放到queue中
对于pull模式,是需要启动异步的pullTaskImpl任务,在这个任务中,会不停的去broker拉取消息,然后放到消费者主动拉取的队列中
messageQueueChanged() 这个方法,就会根据messageQueue,启动pullTaskImpl
所以,对于consumer,我们会发现,对于广播模式,无需进行负载均衡,每个消费者都会处理messageQueue中的消息,对于集群模式,同一个consumeGroup中的消费者,会分摊一个topic中所有的messageQueue
负载均衡策略
在consumer进行负载均衡时,默认提供了多个负载均衡策略;但是还没有仔细研究这几个负载均衡策略的细节,先列举出来
AllocateMachineRoomNearby
就近机房
AllocateMessageQueueAveragely
平均分配算法
AllocateMessageQueueAveragelyByCircle
平均轮询分配
AllocateMessageQueueByConfig
自定义配置
AllocateMessageQueueByMachineRoom
指定机房
AllocateMessageQueueConsistentHash
一致性hash算法