面试集中营—rocketmq架构篇

news2025/1/23 17:40:24

一、基本定义

        Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

  • Topic:消息主题,用于将一类的消息进行归类,比如订单主题,就是所有订单相关的消息都可以由这个主题去承载,生产者向这个主题发送消息。
  • 生产者:负责生产消息并发送消息到 Topic 的角色。
  • 消费者:负责从 Topic 接收并消费消息的角色。
  • 消息:生产者向 Topic 发送的内容,会被消费者消费。
  • 消息属性:生产者发送的时候可以为消息自定义一些业务相关的属性,比如 Mesage Key 和 Tag 等。
  • Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

        Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。

二、消息存储

1、基础

        分布式队列因为有可靠性的要求,一般要对消息进行持久化的处理。对于存储介质的选择:

        1、关系型数据库DB:如Avtivemq,关系型数据库在数据量变大之后性能会显著下降,同时还需要进行关系型数据库的连接,数据库持久化的操作,最终还是数据还是进入的文件系统。

        2、文件系统:如RocketMQ、Kafka、RabbitMQ。直接把数据保存在文件系统中,可靠快速。

2、消息存储和发送性能保证

        如果磁盘配置得当,磁盘的顺序写入速度可以达到600MB/s,这超过了一般网卡的传输速度。但是随机写的速度只有大概100KB/s,和顺序写入相比相差了大概6000倍。所以RocketMQ就使用了顺序写来保证消息的存储速度。

        默认情况下,从磁盘中读取文件或者通过网络向文件中写入数据,需要内核态与用户态的多次拷贝操作。RocketMQ使用了零拷贝的技术,省去了向用户态拷贝的步骤,提高了消息存盘和网络发送的速度。零拷贝机制在Java中有一个MappedByteBuffer来实现

       采用MappedByteBuffer技术对文件有大小的要求,故RocketMQ的默认的一个CommitLog日志数据文件大小为1g。

3、消息存储结构

       消息存储架构图中主要有下面三个跟消息存储相关的文件构成。

  • commitLog:  存储消息的元数据,可能会有多个文件,每个文件默认大小1g。
  • consumequeue:存储消息在CommitLog的索引,这个消费逻辑队列。为了加快commitLog的读取速度。当我们创建了一个消息队列就会对应产生一个对应的consumequeue。
  • indexFile: 为了消息查询提供了一种通过key或者时间区间来查询消息的方法,这种提供indexFile来查找消息的方法不影响发送和消费消息的主流程

        消息发送过来先存储到commitLog,为了加快commitLog读取速度,就出现了一个consumequeue,相当于消息的主键索引,而indexFile其他索引例如key或者时间

4、刷盘机制

        同步和异步刷屏如下图所示

        (1) 同步刷盘:如上图所示,只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。

        (2) 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache(内存)即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

三、高可用机制

通信机制

        RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:

        1、Broker启动后,向NameServer注册并每隔30s时间定时向NameServer上报Topic路由信息;

        2、消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息;

        3、消息生产者Producer根据第二步获得路由信息,选择一个消息队列(MessageQueue)进行消息的投递,消息的接收方是Broker,Broker接收消息并落盘;

        4、消息的消费者Consumer也根据第二步获得的路由信息,先进行客户端的负载均衡,然后选择一个或多个消息队列中的信息进行消费;

高可用集群架构

        如上图所示,生产者有生产者集群,消费者有消费者集群,Name Server也就Name Server集群。Broker集群一般采用多主多从的机制。一组主从节点的broker名称相同,其中master节点的brokerId为0,从节点的brokerId大于0;从节点只负责读的工作

        当消费者读取消息时,出现master节点故障,broker集群会立即把读取的目标转到slave节点中,保证消费的高可用;

        生产者发送消息时,即创建topic的时候,把topic中的多个messagequeue创建在多个broker组上,如果其中一个主节点挂了,还有一个主节点提供服务,此时保证生产者的高可用;

主从复制

        主从复制指的是broker集群中的主从节点的数据复制,包括同步复制和异步复制。同步复制牺牲了一部分的性能但是数据可靠性高基本不会都丢失数据;

        生产环境中,建议刷盘方式配置成异步SYNC_FLUSH,保证一个吞吐量,然后主从复制使用同步复制ASYNC_FLUSH组合来保证数据的安全性;

四、消息投递与消费       

负载均衡

        生产者负载均衡

         生产者在发送消息的时候,要根据topic的路由来完成消息的投递,投递的目的地是broker。一个topic关联了多个messageQueue,客户端正常会轮询的方式依次投递到不同的消息队列中,由于消息队列配置在不同的broker上,这样就完成了消息投递的负载均衡,我们看下官网中给出的解释:

        Producer端在发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。

        具体的容错策略均在MQFaultStrategy这个类中定义。这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550L ms,就退避30000L ms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。

        消费者负载均衡

         在RocketMQ中,Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端知道从Broker端的哪一个消息队列中去获取消息。因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。

        在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量中;

        在Consumer实例的启动后,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。这个线程会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。

        集群模式

        1)获取该Topic主题下的消息消费队列集合;

        2)向Broker端发送获取该消费组下消费者Id列表;

        3)先对Topic下的消息消费队列、消费者Id排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列。

        广播模式

        Consumer实例把所有的消息队列中的消息都拉取过来

五、事务消息

        Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

事务消息流程概要

        分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程

        1.事务消息发送及提交:

        (1) 发送消息(half消息)。

        (2) 服务端响应消息写入结果。

        (3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

        (4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

        2.补偿流程:

        (1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

        (2) Producer收到回查消息,检查回查消息对应的本地事务的状态

        (3) 根据本地事务状态,重新Commit或者Rollback

        其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

事务消息设计

        我们看到上图中有一个halfmsg,这就是事务消息的关键;

        1、事务消息在一阶段对用户不可见

         如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。

        2、Commit和Rollback操作以及Op消息的引入

        在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。

        Rollback

        本身一阶段的消息对用户就是不可见的, 也就是无法消费的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的);但还是需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。

        OP消息

        RocketMQ将Op消息写入到全局一个特定的Topic中,这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。

        Commit

        在执行二阶段Commit操作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。

六、消息消费幂等性

     必要性

       Rocketmq中的消息是会出现重复的,主要有以下三个内容

       1、发送时消息重复:生产者在成功发送了消息之后,此时由于网络抖动等问题造成服务端没有对客户端应答失败,故而生产者会重复投递一个messageId相同的消息;

       2、投递时消息重复:消费者成功消费之后,由于网络抖动等问题造成客户端没有给服务端应答,此时服务端基于投递策略(比如至少成功一次),会再次尝试投递之前已经成功的消息;

       3、扩容时rebalance造成的消息重复投递:当客户端重启,扩缩容时会造成rebanlance就是重新负载均衡,也会造成消息的重复投递;

     解决方案

       首先不建议使用messageId来做幂等性检查,但是rocketmq不保证默认消息id的唯一性;

       通常在消息中设置一个业务key,在消费者端通过判定业务key是否已经消费过了来判定幂等性;这个业务key可以通过数据库来存储,也可以通过redis等缓存工具来存储;

参考:

3.RocketMQ消息存储结构_哔哩哔哩_bilibili

分布式事务-阿里云MQ事务消息踩坑记录_localtransactionchecker-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1676680.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于yolov2深度学习网络的单人口罩佩戴检测和人脸定位算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 MATLAB2022A 3.部分核心程序 ..............................................................I0 imresize…

详述进程的地址空间

进程的地址空间 合法的地址 (可读或可写) 代码 (main, %rip 会从此处取出待执行的指令),只读数据 (static int x),读写堆栈 (int y),读写运行时分配的内存 (???),读写动态链接库 (???) 非法的地址 NULL,导致 se…

Arduino-ILI9341驱动-SPI接口TFTLCD实现触摸功能系列之触控开关二

Arduino-ILI9341驱动-SPI接口TFTLCD实现触摸功能系列之触控开关二 1.概述 这篇文章在触摸屏上绘制一个开关,通过点击开关实现控制灯的开关功能。 2.硬件 硬件连接参考第一篇文章介绍 Arduino-ILI9341驱动-SPI接口TFTLCD实现触摸功能系列之获取触控坐标一 3.实现…

使用Caché管理工具

Cach通过一个web工具来对其进行系统管理和完成管理任务,该方法的一个好处是不必将Cach安装到用于管理的系统上。目前,通过网络远程管理和控制对站点的访问,这些都比较容易。因为数据及其格式信息都直接来自被管理的系统,因此,这也可以最小化跨版本的兼容问题。 本文将描述…

【知识碎片】2024_05_14

本篇记录了两道关于位运算的选择题,和一道有点思维的代码题。 C语言碎片知识 求函数返回值,传入 -1 ,则在64位机器上函数返回( ) int func(int x) {int count 0;while (x){count;x x&(x - 1);//与运算} return c…

Java医院绩效核算系统与his对接所需数据有哪些?java+springboot+MySQL医院绩效管理系统-构建智慧医疗生态

Java医院绩效核算系统与his对接所需数据有哪些?javaspringbootMySQL医院绩效管理系统-构建智慧医疗生态 医院绩效核算系统与his对接所需数据 1、诊察工作量绩效:信息系统-财务权限-统计报表-报表浏览-财务常用报表-门诊医生工作量报表 2、判读及操作工…

微信小程序 - - - - - 使用TDesign库(微信小程序UI库)

使用TDesign库 1. 初始化依赖2. 安装TDesgin3. npm构建3. 修改 app.json 1. 初始化依赖 npm init -y2. 安装TDesgin yarn add tdesign-miniprogram -S --productionor npm install tdesign-miniprogram -S --production3. npm构建 3. 修改 app.json 将 app.json 中的 “styl…

CTF如何学习?

CTF如何学习?打CTF有什么用 CTF本身有几个常见的领域 MISC WEB [逆向 密码学](https://www.zhihu.com/search?q逆向 密码学&search_sourceEntity&hybrid_search_sourceEntity&hybrid_search_extra{“sourceType”%3A"answer"%2C"sourc…

ubuntu 22.04 安装 RTX 4090 显卡驱动 GPU Driver(PyTorch准备)

文章目录 1. 参考文章2. 检查GPU是Nvidia3. 卸载已有驱动3.1. 命令删除3.2. 老驱动包 4. 官网下载驱动5. 运行5.1. 远程安装关闭交互界面5.2. 运行5.3. 打开交互界面 6. 检测与后续安装 1. 参考文章 https://blog.csdn.net/JineD/article/details/129432308 2. 检查GPU是Nvid…

【MySQL】Mysql——安装指南(Linux)

MySQL8.0.26-Linux版安装 1. 准备一台Linux服务器 云服务器或者虚拟机都可以; Linux的版本为 CentOS7; 2. 下载Linux版MySQL安装包 3. 上传MySQL安装包 4. 创建目录,并解压 mkdir mysqltar -xvf mysql-8.0.26-1.el7.x86_64.rpm-bundle.tar -C mysql5. 安装mysql的安装包 …

CAPL入门之使用CAPL记录测试Logging

0 前言 以往测试的log都是直接从trace导出,但是最近发现trace中能导出的数据是有限的,如果测试的时间过长,新的数据就会把之前的数据全部覆盖,并且对于长时间的测试,直接导出trace的内容也会造成查找效率低下的问题。因…

iOS 创建pch文件

1.参考链接(xcode8添加方法,之前的跟这个差不多): 参考链接 2.自我总结: (1)创建pch文件: 注意点:1)注意选中所有的targets(看图明义) 2&…

关于链表相关的OJ题

✨✨✨专栏:数据结构 🧑‍🎓个人主页:SWsunlight 一、 OJ题 返回倒数第K个节点: 1、遍历链表一遍:用2个指针,phead和ptail先让ptail先走k步,然后让2个指针一起走,快的走到NULL即…

计算机发展史故事【13】

微电脑先锋 与第一台电子计算机埃历阿克的命运相似,1974 年面世的“牛郎星”能否作为世界上第一台微电脑被载入史册,人们似乎也存在着分歧。 拥有微处理器发明权的英特尔公司,难道自己不会组装微电脑,非得罗伯茨来越俎代庖吗&…

从零开始:C++ String类的模拟实现

文章目录 引言1.类的基本结构2.构造函数和析构函数3.基本成员函数总结 引言 在C编程中,字符串操作是非常常见且重要的任务。标准库中的std::string类提供了丰富且强大的功能,使得字符串处理变得相对简单。然而,对于学习C的开发者来说&#x…

【考研数学】强化阶段,张宇《1000题》正确率达到多少算合格?

首次正确率在60%以上就算是合格! 张宇老师的1000题真挺难的,所以如果第一次做正确率不高,不要太焦虑,1000题不管是难度,综合度还是计算量,都比其他的题集高一截。 大家真实的做题情况下,如果正…

前端工程化 - 快速通关 - ES6

目录 ES6 1.1 let 1.2 const 1.3解构 1.4链判断 1.5参数默认值 1.6箭头函数 1.7模板字符串 1.8Promise 1.9Async 函数 1.10模块化 ES6 ●ECMAScript(ES) 是规范、 JavaScript 是 ES 的实现 ●ES6 的第一个版本 在 2015 年 6 月发布&#xff0c…

【C语言习题】12.扫雷游戏

文章目录 1.扫雷游戏分析和设计1.1 扫雷游戏的功能说明1.2游戏界面:1.3游戏的分析和设计1.2.1 数据结构的分析1.2.2 ⽂件结构设计 2.扫雷游戏的代码实现3.代码讲解 1.扫雷游戏分析和设计 1.1 扫雷游戏的功能说明 使用控制台实现经典的扫雷游戏游戏可以通过菜单实现…

机器学习案例:加州房产价格(四)

参考链接:https://hands1ml.apachecn.org/2/#_12 数据探索和可视化、发现规律 通过之前的工作,你只是快速查看了数据,对要处理的数据有了整体了解,现在的目标是更深的探索数据。 首先,保证你将测试集放在了一旁&…

特征模态分解(FMD):一种小众而又新颖的分解方法

​ 声明:文章是从本人公众号中复制而来,因此,想最新最快了解各类智能优化算法及其改进的朋友,可关注我的公众号:强盛机器学习,不定期会有很多免费代码分享~ 今天为大家介绍一个小众而又新颖的信号分…