Kafka设计与原理详解

news2025/1/13 2:33:54

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

「RocketMQ本质上的设计思路和Kafka类似」,但是和Kafka不同的是其使用Java进行开发,由于在国内的Java受众群体远远多于Scala、Erlang,所以RocketMQ是很多以Java语言为主的公司的首选。同样的RocketMQ和Kafka都是Apache基金会中的顶级项目,他们社区的活跃度都非常高,项目更新迭代也非常快。

RocketMQ是阿里review kafka的java版,如果消息性能要求高,用 RocketMQ 与 Kafka 可以更优

消息队列在实际应用中常用的使用场景,包含「应用解耦、异步处理、流量削锋、消息通讯、日志处理」等。

RocketMQ运行原理

 

上面是RocketMQ运行的一个大致流程图。

在对Apache RocketMQ进行深入探索的过程中,我们首先需要理解其核心组件的作用:

NameServer:充当注册中心的角色,主要任务是管理Broker节点.

Broker:它是RocketMQ系统的核心部分,主要负责消息的存储。

Producer:这是消息的生成者,它创建消息并将其写入Broker。

Consumer:作为消息的接收者,负责从Broker读取消息并进行处理。

下面,让我们一起深入了解RocketMQ的运行流程:

1. 首先,Broker在启动后会向根据其配置向NameServer注册。

2. NameServer作为注册中心,管理着我们的Broker集群信息以及Topic路由信息。例如,一个特定的Topic具有哪些Broker主机以及队列信息。

3. 接着,由特定的业务系统中的生产者(Producer)生成消息,并发送到Broker的主节点。

4. 在Broker节点中,这些消息将被保存到本地磁盘的CommitLog中,以确保消息不会丢失。

5. 接下来,主节点Broker将这些消息同步到从节点Broker,这样可以实现负载均衡并增强系统的鲁棒性。

6. 最后,业务系统中的消费者(Consumer)会从Broker中取出消息并进行处理,这就完成了数据的完整生命周期。

我们的Producer写入消息前需要先选择Broker,那Producer是如何选择Broker的呢?

上面提到,NameServer 在 RocketMQ 架构中起到了注册中心的作用,它负责管理所有的 Broker 节点。每当 Broker 启动后,它就会自动的注册到 NameServer 中,并且会每隔30秒向 NameServer 发送一次心跳,以证明它依然在运行。NameServer 则会每隔10秒检查一次各个 Broker 节点是否还在线,如果有 Broker 在120秒内未发送心跳,那么 NameServer 就会判断该 Broker 已经宕机,进而将其从注册列表中移除。

在业务系统中,Producer 在发送消息之前,会先从 NameServer 中拉取需要的 topic 路由信息,这些信息将包含目标 topic 各个 queue 的详细信息,以及各 queue 分别存储在哪个 Broker 节点上。Producer 会将这些信息缓存到本地,并依此信息,通过一种负载均衡算法,选择从哪个 queue 中读取数据,以及找到该 queue 对应的 Broker 节点。

那么,如果某个 Broker 在 Producer 准备写入数据的时候突然宕机了,又该如何处理呢?

RocketMQ 设计了一套故障探测与处理机制。如果某个 Broker 宕机了,那么 Producer 进行写入操作时将会失败,此时,它会发起重试操作,并从可用的 Broker 列表中重新选择一个进行写入。并且,为避免持续向故障节点写入数据,Producer 会采取一种称为"故障退避"的策略,即在一段时间内停止向该 Broker 发送数据。值得注意的是,Broker 的故障并不会立即被 Producer 和 NameServer 感知,这样做是为了降低 NameServer 处理逻辑的复杂性。当 Broker 宕机后,由于本地的 topic 路由缓存并未更新,Producer 仍可能尝试向故障的 Broker 发送数据,然后备受失败并重试。只有当 NameServer 在检查心跳时发现该 Broker 已宕机,并从注册列表中移除后,Producer 在刷新本地缓存时,才会真正地感知到该 Broker 的宕机。

当我们的Producer基于负载均衡选择了Broker节点,它的消息是如何写入的呢?

 

在深入理解 RocketMQ 的存储机制时,我们需要知道,Producer 在写入消息时,默认会优先写到操作系统管理的 pageCache,这个过程是异步的,只要消息被写入 pageCache,写入操作就被认为是成功的。这种异步的处理方式极大地提高了 RocketMQ 的写入效率。

当消息被写入 pageCache 后,将有一个后台线程异步地将这些消息从 pageCache 刷入到磁盘文件 CommitLog 中,CommitLog 是消息的实际存储位置。同时,还会有一个专门的线程负责将 CommitLog 中的消息位置(物理偏移量)写入到 ConsumeQueue 中。

那么客户端如何读取存储在 CommitLog 中的消息呢?

当 Consumer 端的消费者需要读取消息时,它会先到ConsumeQueue,然后根据在 ConsumeQueue 中存储的 offset 信息找到 CommitLog 中的实际数据进行读取。

这样的存储方法是否可以支持高并发模式的写入呢?

当系统面临大量同时写入和读取的请求时,可能会遇到一种情况,即大量的读取请求通过 ConsumeQueue 去找 CommitLog 中的数据,但是此时数据可能还在 pageCache 中并未完成异步写入。这时,系统会通过 CommitLog 和 PageCache 的映射,找到 pageCache 中的消息进行读取。也就是说,大量的读取和写入请求都对 pageCache 进行操作。但是当并发量过高时,可能会出现 "Broker busy" 的异常,这是因为在极高的并发场景下,持续大量的读写操作可能会对系统性能造成影响。

简而言之,RocketMQ 的存储机制旨在为高并发高效的读写提供支持,但是在一些极端情况下,仍然需要额外的优化措施以提高稳定性和性能。

当并发量非常高时,出现Broker busy异常了,如何解决?

RocketMQ 在面对高并发场景时,为了改善 "Broker busy" 异常和提高吞吐量,可以启用 transientStorePool 机制。这种机制的实现方式是,Broker 在写入消息时,将消息直接写入由 JVM 管理的 offheap 堆外内存,这样的设计能有效提升并发性能。

那么,为什么启用 transientStorePool 可以提高并发处理能力呢?

当开启该机制后,消息首先写入 JVM 的 offheap内存,然后异步刷新到 pageCache,最终由 pageCache 异步刷新到 CommitLog。大量的写请求将向 JVM 的 heap 内存进行,而大量的读请求仍然从 pageCache 进行,这种读写分离的机制极大地提高了 RocketMQ 的并发性能。

但是为什么 transientStorePool 机制不作为默认机制呢?

虽然 transientStorePool 能显著提升并发性能,但其也存在风险。当消息写入到 JVM 管理的 offheap 堆外内存后,如果 JVM 进程重启或者宕机,那些尚未被及时落盘的消息就会丢失。但如果采用默认的写入方法,即先写入操作系统管理的 pageCache,那么在 JVM 进程重启后,那些保存在 pageCache 中的信息不会丢失,只有当整个服务器宕机重启时,pageCache 中的消息才有可能丢失。因此,数据最安全的处理方式是,将其直接写入到 CommitLog。

总结:开启 transientStorePool 机制可以极大地提高 RocketMQ 的并发处理能力,然而这可能会带来数据的丢失。因此,它更适合那些并发处理能力要求高、且可以接受部分数据丢失的场景。

如果我们想要写入数据不丢失,应该怎么处理?

在设计与金融场景以及其他要求数据不能丢失的环境中,我们会采用同步方式将数据写入CommitLog。成功执行写入操作后才返回,确保了数据的完整性和安全性。只有在broker的物理存储设备出现故障的情况下,才有可能导致数据丢失。为了提供进一步提高数据的安全性,也可以通过多台服务器进行数据备份。

但值得注意的是, 尽管实现了对数据的安全性提升, 使用同步写入CommitLog方式会降低系统的性能到几个数量级。

Rocketmq如何支持分布式事务消息

 场景

A(存在DB操作)、B(存在DB操作)两方需要保证分布式事务一致性,通过引入中间层MQ,

A和MQ保持事务一致性(异常情况下通过MQ反查A接口实现check),

B和MQ保证事务一致(通过重试),从而达到最终事务一致性。

「原理:大事务 = 小事务 + 异步」

「1.MQ与DB一致性原理(两方事务)」

流程图

上图是RocketMQ提供的保证MQ消息、DB事务一致性的方案。

「MQ消息、DB操作一致性方案:」

1)发送消息到MQ服务器,此时消息状态为SEND_OK。此消息为consumer不可见。

2)执行DB操作;DB执行成功Commit DB操作,DB执行失败Rollback DB操作。

3)如果DB执行成功,回复MQ服务器,将状态为COMMIT_MESSAGE;如果DB执行失败,回复MQ服务器,将状态改为ROLLBACK_MESSAGE。注意此过程有可能失败。

4)MQ内部提供一个名为“事务状态服务”的服务,此服务会检查事务消息的状态,如果发现消息未COMMIT,则通过Producer启动时注册的TransactionCheckListener来回调业务系统,业务系统在checkLocalTransactionState方法中检查DB事务状态,如果成功,则回复COMMIT_MESSAGE,否则回复ROLLBACK_MESSAGE。

「说明:」

上面以DB为例,其实此处可以是任何业务或者数据源。

以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE 均是client jar提供的状态,在MQ服务器内部是一个数字。

TransactionCheckListener 是在消息的commit或者rollback消息丢失的情况下才会回调(上图中灰色部分)。这种消息丢失只存在于断网或者RocketMQ集群挂了的情况下。当RocketMQ集群挂了,如果采用异步刷盘,存在1s内数据丢失风险,异步刷盘场景下保障事务没有意义。所以如果要核心业务用RocketMQ解决分布式事务问题,建议选择同步刷盘模式。

「2.多系统之间数据一致性(多方事务)」

当需要保证多方(超过2方)的分布式一致性,上面的两方事务一致性(通过RocketMQ的事务性消息解决)已经无法支持。这个时候需要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)。

「以上图交易系统为例:」

1)交易系统创建订单(往DB插入一条记录),同时发送订单创建消息。通过RocketMQ事务性消息保证一致性

2)接着执行完成订单所需的同步核心RPC服务(非核心的系统通过监听MQ消息自行处理,处理结果不会影响交易状态)。执行成功更改订单状态,同时发送MQ消息。

3)交易系统接受自己发送的订单创建消息,通过定时调度系统创建延时回滚任务(或者使用RocketMQ的重试功能,设置第二次发送时间为定时任务的延迟创建时间。在非消息堵塞的情况下,消息第一次到达延迟为1ms左右,这时可能RPC还未执行完,订单状态还未设置为完成,第二次消费时间可以指定)。延迟任务先通过查询订单状态判断订单是否完成,完成则不创建回滚任务,否则创建。PS:多个RPC可以创建一个回滚任务,通过一个消费组接受一次消息就可以;也可以通过创建多个消费组,一个消息消费多次,每次消费创建一个RPC的回滚任务。回滚任务失败,通过MQ的重发来重试。

以上是交易系统和其他系统之间保持最终一致性的解决方案。

「3.案例分析」

「1) 单机环境下的事务示意图」

如下为A给B转账的例子。如下为A给B转账的例子。

以上过程在代码层面,甚至可以简化到在一个事物中,执行两条sql语句。

「2) 分布式环境下事务」

和单机事务不同,A、B账户可能不在同一个DB中,此时无法像在单机情况下使用事务来实现。

此时可以通过以下方式实现,将转账操作分成两个操作。

a) A账户

b) MQ消息

A账户数据发生变化时,发送MQ消息,MQ服务器将消息推送给转账系统,转账系统来给B账号加钱。

c) B账户

 

顺序消息 

RocketMq有3种消息类型

  1. 普通消费

  2. 顺序消费

  3. 事务消费

顺序消费场景:在网购的时候,我们需要下单,那么下单需要假如有三个顺序:

第一:创建订单 第二:订单付款 第三:订单完成

也就是这三个环节要有顺序,这个订单才有意义,RocketMQ可以保证顺序消费。

「RocketMQ 实现顺序消费的原理」:produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息

「注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue」

「1. 顺序消息缺陷」

发送顺序消息,无法利用集群Fail Over特性,消费顺序消息的并行度依赖于队列数量队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题遇到消息失败的消息,无法跳过,当前队列消费暂停。

「2. 原理」

produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息。

「3. 扩展」

可以通过实现发送消息的队列选择器方法,实现部分顺序消息。

举例:比如一个数据库通过MQ来同步,只需要保证每个表的数据是同步的就可以。解析binlog,将表名作为队列选择器的参数,这样就可以保证每个表的数据到同一个对列里面,从而保证表数据的顺序消费

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1968643.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用kettle开源工具进行跨库数据同步

数据库同步可以用: 1、Navicat 2、Kettle 3、自己写代码 调用码神工具跨库数据同步 -连接 4、其它 实现 这里使用Kettle来同步,主要是开源的,通过配置就可以实现了 Kettle的图形化界面(Spoon)安装参考方法 ht…

Maven实战.依赖(依赖范围、传递性依赖、依赖调解、可选依赖等)

文章目录 依赖的配置依赖范围传递性依赖传递性依赖和依赖范围依赖调解可选依赖最佳实践排除依赖归类依赖优化依赖 依赖的配置 依赖会有基本的groupId、artifactld 和 version等元素组成。其实一个依赖声明可以包含如下的一些元素&#xff1a; <project> ...<depende…

单例模式及其思想

本文包括以下几点↓ 结论&#xff1a;设计模式不是简单地将一个固定的代码框架套用到项目中&#xff0c;而是一种严谨的编程思想&#xff0c;旨在提供解决特定问题的经验和指导。 单例模式&#xff08;Singleton Pattern&#xff09; 意图 旨在确保类只有一个实例&#xff…

Linux用户-用户组

作者介绍&#xff1a;简历上没有一个精通的运维工程师。希望大家多多关注我&#xff0c;我尽量把自己会的都分享给大家&#xff0c;下面的思维导图也是预计更新的内容和当前进度(不定时更新)。 Linux是一个多用户多任务操作系统,这意味着它可以同时支持多个用户登录并使用系统。…

每日OJ_牛客HJ74 参数解析

目录 牛客HJ74 参数解析 解析代码1 解析代码2 牛客HJ74 参数解析 参数解析_牛客题霸_牛客网 解析代码1 本题通过以空格和双引号为间隔&#xff0c;统计参数个数。对于双引号&#xff0c;通过添加flag&#xff0c;保证双引号中的空格被输出。 #include <iostream> #i…

解决文件夹打不开难题:数据恢复全攻略

在日常的电脑使用过程中&#xff0c;遇到文件夹无法打开的情况无疑是令人头疼的。这不仅可能影响到我们的工作效率&#xff0c;还可能导致重要数据的丢失。本文将深入探讨文件夹打不开的原因&#xff0c;并为您提供两种高效的数据恢复方案&#xff0c;助您轻松应对这一难题。 一…

p33 指针详解(1)(2)(3)

指针的进阶 1.字符指针 void test(int arr[]) { int szsizeof(arr)/sizeof(arr[0]); printf("%d\n", sz); } int main() { int arr[10] {0}; test(arr); return 0; } 这个代码在64位计算机中是8/42 在32位计算机中的是4/41 int main() {char c…

vue2 搭配 html2canvas 截图并设置截图时样式(不影响页面) 以及 base64转file文件上传 或者下载截图 小记

下载 npm install html2canvas --save引入 import html2canvas from "html2canvas"; //使用 html2canvasForChars() { // 使用that来存储当前Vue组件的上下文&#xff0c;以便在回调函数中使用 let that this; // 获取DOM中id为"charts"的元素&…

3.1 拓扑排序

有向图的存储 邻接矩阵 邻接表 拓扑排序 有向无环图&#xff1a;不存在环的有向图 环&#xff1a; 在有向图中&#xff0c;从一个节点出发&#xff0c;最终回到它自身的路径被称为环 入度&#xff1a; 以节点x为终点的有向边的条数被称为x的入度 出度&#xff1a; 以节…

汽车配件销售系统2024

下载在最后,编号ssm007 技术栈: ssmmysqljsp 展示: 下载地址: CSDN现在上传有问题,有兴趣的朋友先收藏.正常了贴上下载地址 备注: 运行有问题请私信我,私信按钮在文章左边) 另外接各种定制系统,java,spring,c,c,python

upload-labs靶场(超详解)1-16关

pass1 从代码中可以看出&#xff0c;是通过js进行文件格式检查 <script type"text/javascript">function checkFile() {var file document.getElementsByName(upload_file)[0].value;if (file null || file "") {alert("请选择要上传的文件…

Nmap/DNS信息收集实验

​实验背景 在安全服务项目中&#xff0c;需要对网络结构进行分析评估&#xff0c;其中风险评估第一步就是信息收集&#xff0c;主要包括活跃主机发现、开放端口号、系统指纹信息等。 实验设备 一个网络 net:cloud0 一台模拟黑客 kali 主机 一台靶机 windows 主机 实验…

go 语言中 init() 函数是什么时候执行的?

文章目录 一、init() 函数什么时候执行&#xff1f;二、init() 函数特点三、代码执行顺序四、多个 init() 函数执行顺序1、一个源文件中多个 init() 函数2、一个包中多个 init() 函数3、多个包中多个 init() 函数&#xff08;不存在依赖&#xff09;4、多个包中多个 init() 函数…

MySQL--数据类型

前言&#xff1a;本博客仅作记录学习使用&#xff0c;部分图片出自网络&#xff0c;如有侵犯您的权益&#xff0c;请联系删除 在MySQL数据库管理系统中&#xff0c;可以通过存储引擎来决定表的类型。同时&#xff0c;MySQL数据库管理系统也提供了数据类型决定表存储数据的类型 …

记录导致计算轮廓面积出错的一个坑点

1.前言 计算轮廓面积是常见的几何算法话题&#xff0c;获取轮廓面积、计算轮廓法线等场景会涉及到。计算轮廓面积的方法有很多&#xff0c;一种常用的是微积分思路的分段求和办法&#xff0c;即组成轮廓的每条线段与X轴或Y轴进行有向投影&#xff0c;轮廓边线与X轴或Y轴的投影之…

【SQL Server】SQL Server基础知识概览

目录 第1章&#xff1a;SQL Server 概览 SQL Server 版本介绍 SQL Server 架构 SQL Server 组件 第1章&#xff1a;SQL Server 概览 SQL Server 版本介绍 SQL Server 是 Microsoft 开发的一款关系型数据库管理系统 (RDBMS)&#xff0c;广泛应用于企业级数据存储和处理场景…

Mysql学习-day15

Mysql学习-day15 1. 行列转换 在MySQL中&#xff0c;行列转换可以通过使用CASE语句结合聚合函数来实现。 表t_score数据如图所示 我们想要以学科为列名&#xff0c;展示每个学生的科目成绩&#xff0c;可以先用CASE语句来选出每科的成绩&#xff0c;再进行求和。 选择科目时…

【C++】模拟实现list

&#x1f984;个人主页:修修修也 &#x1f38f;所属专栏:数据结构 ⚙️操作环境:Visual Studio 2022 目录 一.了解项目及其功能 &#x1f4cc;了解list官方标准 了解模拟实现list &#x1f4cc;了解更底层的list实现 二.list迭代器和vector迭代器的异同 &#x1f4cc;迭代…

SSH实现电脑VScode免密登录到虚拟机其原理

在网上想看一下这个原理。发现写的还是比较乱&#xff0c;所以自己总结了一份方便回顾 SSH免密登录的原理主要基于非对称密钥加密技术&#xff0c;比较常用的是RSA算法。 以下是SSH免密登录的详细步骤和原理&#xff1a; 1. 生成密钥对 在客户端上生成一对密钥&#xff0c;…

系统复习Java日志体系

一&#xff0c;我们采用硬编码体验一下几个使用比较多的日志 分别导入几种日志的 jar 包 <?xml version"1.0" encoding"UTF-8"?><project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSch…