前言
这篇博客,主要记录consumer已经消费的offset是如何更新的
对于集群模式,offset是维护在broker中的;而广播模式,offset是存储在本地文件中(暂时没有验证具体存储的位置,是根据源码推测的)
不管是pull模式,还是push模式,都需要维护consumer当前已经消费的offset
更新offset的逻辑,大致是这样的:
1.client从broker拉取消息
2.然后client回调业务系统的消费者所注册的messageListener,对消息进行处理
3.在处理完消息之后,会先将offset更新到client的内存中,需要注意的是:我这里说的client,并不是业务系统的消费者,而是rocketmq框架中的consumer
4.consumer在启动的时候,会启动一个异步线程,去定时的获取consumer内存中每个messageQueue的offset,然后通过发送netty请求到broker去处理
5.broker在接收到客户端的更新offset的请求之后,会把client发送过来的offset,更新到内存中,在内存中,也是通过一个map集合来存储
6.broker在启动的时候,也会启动一个异步定时的线程,定时的去拉取内存中的offset数据,然后持久化到磁盘文件上,consumerOffset.json
这上面的第一步,第二步是拉取消息的逻辑,在前面的博客中,也有介绍过,这里就不做过多的介绍
client更新offset到内存中
这是上面第三点的逻辑
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
这个run方法,是将拉取到的消息,解析之后,依次回调业务系统中的消费者所注册的messageListener方法
在回调完之后,会接着进行其他逻辑的处理,其中,有一步很重要的操作:
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
在这个方法中,会根据status,进行不同的逻辑处理,处理完成之后,会调用这个updateOffset方法
我们会发现:offsetStore有两个实现类,分别是local和remote,如果当前consumer设置的是集群模式,使用的是remote;如果设置的是广播模式,使用的是remote;我们以集群模式为例,来看源码,所以要看remote
这里可以看到,所谓的update逻辑,很简单,就是把messageQueue和offset保存到了内存中的一个map集合中
client定时任务,定期拉取offsetTable数据,发送netty请求
接着我们来证明第四点
org.apache.rocketmq.client.impl.factory.MQClientInstance#start
在前面的博客中,有说过,在consumer启动的时候,会调用这个start()方法,在这个方法中,当时有说到过,在红框圈出来的这个方法中,启动了N个异步线程
其中,有一个和更新offset相关的定时执行的线程,这个线程,每10S执行一次,我们来看下执行的逻辑
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persistAll
在这个方法中,可以看到,从内存的offsetTable中,拉取信息,然后调用updateConsumeOffsetToBroker()方法,更新完成之后,从内存的map集合中remove对应的messageQueue和offset数据
我们接着来看发送netty请求的逻辑
上面可以看到,实际上,就是发送了一个netty请求,指定的code码是UPDATE_CONSUMER_OFFSET
broker接收到netty请求,处理逻辑
在broker这边,是由consumeManageProcessor这个类来处理更新offset请求的,具体原因就是下面这个截图
org.apache.rocketmq.broker.processor.ConsumerManageProcessor#processRequest
我们接着来看接收到请求之后的一些处理
在下面真正去commitOffset的时候,我们会发现,其实就是把offset信息更新到了内存的offsetTable中,这个offsetTable是在broker的内存中,此时消费者的offset信息只是保存到了broker的内存中
broker端异步线程定时更新内存数据到磁盘文件
org.apache.rocketmq.broker.BrokerStartup#createBrokerController
这是broker在启动的时候,创建brokerController的方法,在这个方法中,创建完brokerController之后,会调用一个initialize()方法
在initialize()方法中,会启动一个异步线程,去持久化内存中的offset数据,就是下面这个截图中的任务
下面这个截图,是开始持久化的逻辑,大致有三个步骤
- 先将当前内存中的offset数据,转换为String
- 然后获取到当前磁盘文件的fileName
- 然后进行持久化
encode
这里最开始在看encode的时候,其实没太看懂,因为这里直接把当前对象转换为json字符串返回了,这个不清楚为什么不只把offsetTable的数据,写到磁盘文件上
可以看上面的decode方法,也是把json字符串转换为对象,然后从对象中获取到offsetTable数据
configFilePath
持久化
这里持久化的逻辑,看起来是
- 先把数据,写到.tmp文件中
- 然后把当前数据进行备份,.bak文件
- 删除原磁盘上的文件
- 接着将tmp文件更名为fileName这个文件