RocketMQ源码学习笔记:消费者启动流程

news2024/11/24 0:46:45

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

目录

  • 1、前置知识
    • 1.1、pull和push型消费者
    • 1.2、消息CommitLog到ConsumeQueue
    • 1.3、自动创建的重试主题
    • 1.4、广播型消费和集群型消费中offset的存储位置
  • 2、消费中的启动流程
    • 2.1、Preview
    • 2.2、校验,配置,启动

1、前置知识

1.1、pull和push型消费者

消费者可分为两种,一种是pull另一种是push

pull是消费者主动到broker上拿消息,push则是设置监听等broker发信息过来。

当然,底层实现其实都是pullpush模式下broker只是发送一个通知,然后消费者到broker上拉取消息。


1.2、消息CommitLog到ConsumeQueue

消费者读取数据时不会去读取CommitLog,而是CunsumeQueue

生成者发送信息只会到CommitLog中发送消息,不管ConsumeQueue

Broker专门启动了一个后台线程从CommitLog中将消息同步到ConsumeQueue,就是ReputMessageService

这种与消息相关的服务基本上都在在DefaultMessageStore中进行构建和启动。
在这里插入图片描述


ReputMessageService这个类源码没什么特点或者技术亮点,代码一层套一层比较繁琐,没有必要太过深究,只需要知道它的功能即可。


1.3、自动创建的重试主题

消费消息失败时我们可以返回重试枚举return ConsumeConcurrentlyStatus.RECONSUME_LATER;

这样隔一段时间后消费者会再次尝试消费消息。

重试机制会为原来的topic创建一个信息重试topic

dashborad界面NORMAL就是我们平时创建的一般的topic,RETRY就是系统自动创建的重试topic,一般是在原topic前加上%RETRY%
在这里插入图片描述

1.4、广播型消费和集群型消费中offset的存储位置

offsetStore表示消息消费的偏移量的存储位置。

在这里插入图片描述
从上图可以看到,广播型和集群型的消费模式有不同的offsetStore的值。

查看源码会发现,集群型消费的偏移量需要从远程的broker中获取;而广播型的offsetStore只在本地叫offsets.json的文件中维护偏移量,下图是广播型消费创建offsetStore的源码。

在这里插入图片描述
这是因为广播型的消息是给每个messageQueue都塞了消息,不像集群型一条消息只塞给其中一个messageQueue,所以广播型消费的特点是不保证每个consumer都能消费到消息,而集群型需要严格保证这一点。

所以偏移量放在本地可以降低通讯成本,正好广播型不需要保证消息一定被消费,那即使消费者宕机offset丢失也是可以接受的。

而集群型要保证消息一定被消费,就需要将offset保存在远程的broker,这样即使消费者宕机消息在broker中也是未消费的状态。


2、消费中的启动流程

2.1、Preview

在这里插入图片描述

这是一个简单的Consumer的代码。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic1", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list
            , ConsumeConcurrentlyContext consumeConcurrentlyContext) {
       // todo
    }
});
consumer.start();

前面的设置结束后就会通过start()启动服务拉取消息,接下来会进入start()查看源码了解消费者的启动过程。

2.2、校验,配置,启动

刚启动时,状态是CREATE_JUST,我们进入这个分支查看源码。

在这里插入图片描述
服务启动前都要做一些校验,然后配置初始化必要的bean,其中就包括为以后可能的消费失败创建一个重试topic

这些配置和检验不细说。不过有一个配置offsetStore比较有意思。

广播型消费的offset存储在本地的offsets.json中,集群型消费的offset存储在远程。下面是相关代码。
在这里插入图片描述

各种配置之后开始启动消费者服务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1930896.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

主流大数据调度工具DolphinScheduler之数据ETL流程

今天给大家分享主流大数据调度工具DolphinScheduler&#xff0c;以及数据的ETL流程。 一&#xff1a;调度工具DS 主流大数据调度工具DolphinScheduler&#xff0c; 其定位&#xff1a;解决数据处理流程中错综复杂的依赖关系 任务支持类型&#xff1a;支持传统的shell任务&a…

MBR40150FCT-ASEMI无人机专用MBR40150FCT

编辑&#xff1a;ll MBR40150FCT-ASEMI无人机专用MBR40150FCT 型号&#xff1a;MBR40150FCT 品牌&#xff1a;ASEMI 封装&#xff1a;TO-220F 批号&#xff1a;最新 最大平均正向电流&#xff08;IF&#xff09;&#xff1a;40A 最大循环峰值反向电压&#xff08;VRRM&a…

shell脚本——编程规范与变量

目录 一、shell脚本 1、shell脚本概述 2、shell脚本的应用场景 3、shell脚本的作用——命令解释器 二、Shell 脚本编程规范 1、用户登录Shell 2、shell脚本的构成 3、执行shell脚本 三、重定向与管道操作 1、重定向 1.1、交互式硬件设备 1.2、重定向操作 2、重定向…

php相关

php相关 ​ 借鉴了小迪安全以及各位大佬的博客&#xff0c;如果一切顺利&#xff0c;会不定期更新。 如果感觉不妥&#xff0c;可以私信删除。 默认有php基础。 文章目录 php相关1. php 缺陷函数1. 与2. MD53. intval()4. preg_match() 2. php特性1. php字符串解析特性2. 杂…

数据结构-C语言-排序(3)

代码位置&#xff1a;test-c-2024: 对C语言习题代码的练习 (gitee.com) 一、前言&#xff1a; 1.1-排序定义&#xff1a; 排序就是将一组杂乱无章的数据按照一定的规律&#xff08;升序或降序&#xff09;组织起来。(注&#xff1a;我们这里的排序采用的都为升序) 1.2-排序分…

从汇编层看64位程序运行——栈保护

大纲 栈保护延伸阅读参考资料 在《从汇编层看64位程序运行——ROP攻击以控制程序执行流程》中&#xff0c;我们看到可以通过“微操”栈空间控制程序执行流程。现实中&#xff0c;黑客一般会利用栈溢出改写Next RIP地址&#xff0c;这就会修改连续的栈空间。而编译器针对这种场景…

集合媒体管理、分类、搜索于一体的开源利器:Stash

Stash&#xff1a;强大的媒体管理工具&#xff0c;让您的影音生活井井有条- 精选真开源&#xff0c;释放新价值。 概览 Stash是一个专为个人媒体管理而设计的开源工具&#xff0c;基于 Go 编写&#xff0c;支持自部署。它以用户友好的界面和强大的功能&#xff0c;满足了现代用…

16_网络IPC2-寻址

进程标识 字节序 采用大小模式对数据进行存放的主要区别在于在存放的字节顺序&#xff0c;大端方式将高位存放在低地址&#xff0c;小端方式将高位存放在高地址。 采用大端方式进行数据存放符合人类的正常思维&#xff0c;而采用小端方式进行数据存放利于计算机处理。到目前…

IDEA快速生成项目树形结构图

下图用的IDEA工具&#xff0c;但我觉得WebStorm 应该也可以 文章目录 进入项目根目录下&#xff0c;进入cmd输入如下指令&#xff1a; 只有文件夹 tree . > list.txt 包括文件夹和文件 tree /f . > list.txt 还可以为相关包路径加上注释

系统架构师考点--软件工程(下)

大家好。今天继续总结软件工程的知识点。 一、处理流程设计 业务流程重组BPR BPR是对企业的业务流程进行根本性的再思考和彻底性的再设计&#xff0c;从而获得可以用诸如成本、质量、服务和速度等方面的业绩来衡量的显著性的成就。BPR设计原则、系统规划和步骤如下图所示&am…

从 Pandas 到 Polars 十八:数据科学 2025,对未来几年内数据科学领域发展的预测或展望

我在2021年底开始使用Polars和DuckDB。我立刻意识到这些库很快就会成为数据科学生态系统的核心。自那时起&#xff0c;这些库的受欢迎程度呈指数级增长。 在这篇文章中&#xff0c;我做出了一些关于未来几年数据科学领域的发展方向和原因的预测。 这篇文章旨在检验我的预测能力…

日志的编写与线程池的结合

目录 一、认识日志 二、时间的等级划分 三、日志的输出端 3.1 保存至文件 四、日志的部分信息 4.1 日志等级 4.2 日志时间 五、加载日志 六、日志的宏编写 七、ThreadPool Log 一、认识日志 记录事件&#xff1a; 日志用于记录系统运行过程中发生的各种事件&…

word 设置多级混合标题自动更新

目录预览 一、问题描述二、原因分析三、解决方案四、参考链接 一、问题描述 有没有体会过多级标题&#xff0c;怎么设置都不听使唤的情况&#xff1f; 我想要的格式是&#xff1a; 二、原因分析 多级标题中发现&#xff0c;输入编号格式这里有个数字没有底纹,是了&#xff0…

解析 Mira :基于 Web3,让先进的 AI 技术易于访问和使用

“Mira 平台正在以 Web3 的方式解决当前 AI 开发面临的复杂性问题&#xff0c;同时保护 AI 贡献者的权益&#xff0c;让他们可以自主拥有并货币化自己的模型、数据和应用&#xff0c;以使先进的 AI 技术更加易于访问和使用。” AI 代表着一种先进的生产力&#xff0c;它通过深…

nginx代理缓存

在服务器架构中&#xff0c;反向代理服务器除了能够起到反向代理的作用之外&#xff0c;还可以缓存一些资源&#xff0c;加速客户端访问&#xff0c;nginx的ngx_http_proxy_module模块不仅包含了反向代理的功能还包含了缓存功能。 1、定义代理缓存规则 参数详解&#xff1a; p…

万字长文之分库分表里如何优化分页查询?【后端面试题 | 中间件 | 数据库 | MySQL | 分库分表 | 分页查询】

分库分表的一般做法 一般会使用三种算法&#xff1a; 哈希分库分表&#xff1a;根据分库分表键算出一个哈希值&#xff0c;根据这个哈希值选择一个数据库。最常见的就是数字类型的字段作为分库分表键&#xff0c;然后取余。比如在订单表里&#xff0c;可以按照买家的ID除以8的…

开发实战经验分享:互联网医院系统源码与在线问诊APP搭建

作为一名软件开发者&#xff0c;笔者有幸参与了多个互联网医院系统的开发项目&#xff0c;并在此过程中积累了丰富的实战经验。本文将结合我的开发经验&#xff0c;分享互联网医院系统源码的设计与在线问诊APP的搭建过程。 一、需求分析 在开发任何系统之前&#xff0c;首先要…

UPFC统一潮流控制器的simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 5.完整工程文件 1.课题概述 UPFC统一潮流控制器的simulink建模与仿真。能够在不增加输电线路物理容量的情况下&#xff0c;显著提高电力系统的传输能力和稳定性。UPFC能够同时控制输电线路的有功功率、无…

技术速递|Let’s Learn .NET Aspire – 开始您的云原生之旅!

作者&#xff1a;James Montemagno 排版&#xff1a;Alan Wang Let’s Learn .NET 是我们全球性的直播学习活动。在过去 3 年里&#xff0c;来自世界各地的开发人员与团队成员一起学习最新的 .NET 技术&#xff0c;并参加现场研讨会学习如何使用它&#xff01;最重要的是&#…

微软研究人员为电子表格应用开发了专用人工智能LLM

微软的 Copilot 生成式人工智能助手现已成为该公司许多软件应用程序的一部分。其中包括 Excel 电子表格应用程序&#xff0c;用户可以在其中输入文本提示来帮助处理某些选项。微软的一组研究人员一直在研究一种新的人工智能大型语言模型&#xff0c;这种模型是专门为 Excel、Go…