rocketmq源码-consumer已消费offset更新逻辑

news2025/3/13 9:58:03

前言

这篇博客,主要记录consumer已经消费的offset是如何更新的

对于集群模式,offset是维护在broker中的;而广播模式,offset是存储在本地文件中(暂时没有验证具体存储的位置,是根据源码推测的)

不管是pull模式,还是push模式,都需要维护consumer当前已经消费的offset
更新offset的逻辑,大致是这样的:
1.client从broker拉取消息
2.然后client回调业务系统的消费者所注册的messageListener,对消息进行处理
3.在处理完消息之后,会先将offset更新到client的内存中,需要注意的是:我这里说的client,并不是业务系统的消费者,而是rocketmq框架中的consumer
4.consumer在启动的时候,会启动一个异步线程,去定时的获取consumer内存中每个messageQueue的offset,然后通过发送netty请求到broker去处理
5.broker在接收到客户端的更新offset的请求之后,会把client发送过来的offset,更新到内存中,在内存中,也是通过一个map集合来存储
6.broker在启动的时候,也会启动一个异步定时的线程,定时的去拉取内存中的offset数据,然后持久化到磁盘文件上,consumerOffset.json

这上面的第一步,第二步是拉取消息的逻辑,在前面的博客中,也有介绍过,这里就不做过多的介绍

client更新offset到内存中

这是上面第三点的逻辑

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run

这个run方法,是将拉取到的消息,解析之后,依次回调业务系统中的消费者所注册的messageListener方法

在这里插入图片描述

在回调完之后,会接着进行其他逻辑的处理,其中,有一步很重要的操作:
在这里插入图片描述

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult

在这个方法中,会根据status,进行不同的逻辑处理,处理完成之后,会调用这个updateOffset方法
在这里插入图片描述
我们会发现:offsetStore有两个实现类,分别是local和remote,如果当前consumer设置的是集群模式,使用的是remote;如果设置的是广播模式,使用的是remote;我们以集群模式为例,来看源码,所以要看remote
在这里插入图片描述

这里可以看到,所谓的update逻辑,很简单,就是把messageQueue和offset保存到了内存中的一个map集合中
在这里插入图片描述

client定时任务,定期拉取offsetTable数据,发送netty请求

接着我们来证明第四点

org.apache.rocketmq.client.impl.factory.MQClientInstance#start

在前面的博客中,有说过,在consumer启动的时候,会调用这个start()方法,在这个方法中,当时有说到过,在红框圈出来的这个方法中,启动了N个异步线程
在这里插入图片描述
其中,有一个和更新offset相关的定时执行的线程,这个线程,每10S执行一次,我们来看下执行的逻辑
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persistAll

在这个方法中,可以看到,从内存的offsetTable中,拉取信息,然后调用updateConsumeOffsetToBroker()方法,更新完成之后,从内存的map集合中remove对应的messageQueue和offset数据
在这里插入图片描述

我们接着来看发送netty请求的逻辑
在这里插入图片描述

在这里插入图片描述
上面可以看到,实际上,就是发送了一个netty请求,指定的code码是UPDATE_CONSUMER_OFFSET

broker接收到netty请求,处理逻辑

在broker这边,是由consumeManageProcessor这个类来处理更新offset请求的,具体原因就是下面这个截图
在这里插入图片描述

org.apache.rocketmq.broker.processor.ConsumerManageProcessor#processRequest

我们接着来看接收到请求之后的一些处理
在这里插入图片描述
在这里插入图片描述

在下面真正去commitOffset的时候,我们会发现,其实就是把offset信息更新到了内存的offsetTable中,这个offsetTable是在broker的内存中,此时消费者的offset信息只是保存到了broker的内存中
在这里插入图片描述

broker端异步线程定时更新内存数据到磁盘文件

org.apache.rocketmq.broker.BrokerStartup#createBrokerController

这是broker在启动的时候,创建brokerController的方法,在这个方法中,创建完brokerController之后,会调用一个initialize()方法
在这里插入图片描述

在initialize()方法中,会启动一个异步线程,去持久化内存中的offset数据,就是下面这个截图中的任务
在这里插入图片描述

下面这个截图,是开始持久化的逻辑,大致有三个步骤

  1. 先将当前内存中的offset数据,转换为String
  2. 然后获取到当前磁盘文件的fileName
  3. 然后进行持久化

在这里插入图片描述

encode

在这里插入图片描述
这里最开始在看encode的时候,其实没太看懂,因为这里直接把当前对象转换为json字符串返回了,这个不清楚为什么不只把offsetTable的数据,写到磁盘文件上

可以看上面的decode方法,也是把json字符串转换为对象,然后从对象中获取到offsetTable数据

configFilePath

在这里插入图片描述
在这里插入图片描述

持久化

这里持久化的逻辑,看起来是

  1. 先把数据,写到.tmp文件中
  2. 然后把当前数据进行备份,.bak文件
  3. 删除原磁盘上的文件
  4. 接着将tmp文件更名为fileName这个文件

在这里插入图片描述

总结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/103323.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

socket,http和websocket的区别

HTTP协议是非持久化的,单向的网络协议,在建立连接后只允许浏览器向服务器发出请求后,服务器才能返回相应的数据。当需要即时通讯时,通过轮询在特定的时间间隔(如1秒),由浏览器向服务器发送Reque…

Delft3D水动力-富营养化模型

湖泊富营养化等水质问题严重威胁我国经济社会的发展,也是水环境和水生态领域科研热点之一。水环境模型是制定湖泊富营养化控制对策,预测湖泊水环境发展轨迹的重要工具,在环境影响评价、排污口论证等方面也有着广泛的应用。荷兰Delft研究所开发…

Oracle数据库:数据的仓库,永久就存储,Oracle安装教程,修改Oracle scott,sys,system用户的密码,查看表格

Oracle数据库:数据的仓库,永久就存储,Oracle安装教程,修改Oracle scott,sys,system用户的密码,查看表格 2022找工作是学历、能力和运气的超强结合体,遇到寒冬,大厂不招人…

Diffusion理论知识学习-预备

马尔科夫链 总体思想: 过去的所有信息都被保存在了现在的状态中,只使用现在状态就能预测到之后的状态,换句话说就是某个时刻的状态转移概率只依赖于它的前一个状态。 公式化表达:P(xt1∣xt,xt−1,...,x1,x0)P(xt1∣xt)P(x_{t1}|x_t,x_{t-1},...,x_1,x_0…

打造钢材行业智能制造新业态,B2B电商平台系统赋能企业交易全链路数字化

钢铁行业是制造业的“脊梁”,是工业的“粮食”,在我国由制造大国向制造强国挺进的过程中,钢铁行业由全球“老大”变“强大”至关重要,而数字化转型将发挥不可替代的作用,实施绿色制造、智能制造和精益制造,…

阿里中间件的源与流

目录前言从中间件->中台->阿里云从五彩石项目说起从HSF到EDAS从TDDL到PolarDB-XTDDL阶段DRDS阶段PolarDB-X 1.0阶段PolarDB-X 2.0阶段从Notify到RocketMQ阿里中间件一览参考链接前言 阿里中间件如雷贯耳,听上去高深莫测,那到底是哪几样神兵利器呢…

【MySQL】1.MySQL基础

文章目录1.0 数据库基本概念1.1 主流数据库介绍2.0 MySQL数据库架构2.1 MySQL软件分层2.1 MySQL存储引擎2.2 SQL语句分类1.0 数据库基本概念 想要理解数据库的概念,首先要了解数据库为何存在,它存在的价值是什么? 在之前的学习中&#xff0c…

用Python绑定调用C/C++/Rust库

用Python绑定调用C/C/Rust库 在《让你的Python程序像C语言一样快》我们学习了如何利用Python API来用C语言编写Python模块,通过将核心功能或性能敏感运算用C语言实现,Python程序可以运行地像C语言一样快。然而,很多时候我们需要的功能已经有…

Mycat(11):分片详解之字符串ID处理

1 找到conf/schema.xml并备份 2 字符串ID处理的分区算法 conf/rule.xml <tableRule name"jch"><rule><columns>id</columns><algorithm>jump-consistent-hash</algorithm></rule></tableRule><function name&qu…

css之grid布局个人学习笔记

前置 只是个人学习,内容只会记录自己想知道,有问题的知识点具体可以看看bilibili的耕耕技术宅-grid布局地址视频对应的耕耕技术宅-grid布局ppt地址学有余力的可以通关下这个小游戏通过给萝卜浇水&#xff0c;学习 CSS 网格布局CSS Grid 网格布局教程- 阮一峰 明确基本概念 下…

品牌创建百科怎么写?品牌百度百科怎么创建?

每年都会有很多新成立的品牌&#xff0c;但是能够被大众所熟知的却是寥寥无几&#xff0c;众多品牌都被淹没在了大千世界里。 一个品牌的创建&#xff0c;难的就在于宣传&#xff0c;宣传力度不到位&#xff0c;就没有用户会愿意为这个品牌买单。 那么怎么快速提升品牌的知名度…

肝完这在“牛客网”难倒万人的Java面试题后,已收获9个大厂offer

上周在牛客网看到了这几百道面试题之后&#xff0c;看到评论区全是太难了&#xff01;太难了&#xff0c;就深深被其吸引&#xff0c;索性直接花了一周的时间才把它们全部解析出来做成了这份文档&#xff0c;发给了最近面试的粉丝&#xff0c;他刷爆之后居然能拿到了好几个大厂…

IDEA集成docker-JDK11版本

IDEA集成docker 1. docker 服务器开启远程访问 登录 docker 所在的远程服务器&#xff0c;使用命令 vim /usr/lib/systemd/system/docker.service 修改配置文件&#xff0c;需注意&#xff0c;修改时确认自己的账户拥有相应权限 主要操作是找到 ExecStart/usr/bin/dockerd -H…

能量原理和变分法笔记1:变分法简介

上个学期在学校学了多体系统动力学的课&#xff0c;其中老师讲了变分原理&#xff0c;觉得很有启发&#xff0c;决定再学学相关的知识&#xff0c;在B站找到了一个这样的视频能量原理与变分法&#xff0c;做点笔记&#xff0c;加深一下理解。 第0章序言-微元、功和能(P2)第1章1…

常用的wxpython控件使用方法总结

写在开头&#xff1a;总结下现阶段我常用到wxpython控件的一些使用方法&#xff0c;便于记录和查询。 我一般是借助wxFormBuilder工具搭建基础的界面生成代码&#xff0c;这样做的好处自然是方便设计界面增加界面的美观度&#xff0c;再在.py文件手写代码设置控件的事件驱动&a…

【论文阅读总结】Batch Normalization总结

批量规范化&#xff1a;通过减少内部协变量转移加快深度网络训练1. 摘要2. 序言2.1 min-batches的优缺点2.2 批量归一化解决内部协变量转移的优点3.减少内部协变量转移实现思想3.1 白化的问题3.2 解决白化问题4.小批量统计进行标准化4.1.白化简化的两种方式4.1.1 对通道维度进行…

mybatis实现分页查询(两种方式:1pageHelper插件 2手写)

方法1&#xff1a;整合pageHelper分页插件 优点&#xff1a;快捷&#xff0c;只需要你有一个查询全部数据的方法即可 缺点&#xff1a;对于初学者来说&#xff0c;不了解内部的原理 前提&#xff1a;需要先实现一个最简单的 查询全部数据的方法&#xff0c;不会的可以先去搭建一…

C++零基础项目:俄罗斯方块!详细思路+源码分享

游戏介绍 这是使用 C 和 EasyX 写的一个俄罗斯方块小游戏&#xff0c;里面用到的 C 特性并不多。 游戏主要分成了两个类来实现&#xff1a;Game 和 Block 类&#xff0c;分别用来实现游戏逻辑和单独的俄罗斯方块&#xff0c;里面顶多就用到了静态成员函数和变量的特性&#x…

nexus上传自定义starter

nexus上传自定义starter1、starter上传简介2、上传方法2.1、setting.xml文件2.2、项目中的pom文件3、具体部署1、starter上传简介 在我们自定义了springboot的starter后&#xff0c;starter一般有是一个父子级maven工程&#xff0c;如下图所示&#xff0c;对于 autoconfigure 来…

H5 导航栏示例

本文是通过:hover更新元素样式&#xff0c;通过递归树形菜单渲染到页面。 效果 源码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"&…