消息中间件之Kafka(一)

news2025/1/23 21:16:03

1.简介

  • 高性能的消息中间件,在大数据的业务场景下性能比较好,kafka本身不维护消息位点,而是交由Consumer来维护,消息可以重复消费,并且内部使用了零拷贝技术,性能比较好
    Broker持久化消息时采用了MMAP的技术,Consumer拉取消息时使用的sendfile技术
  • Kafka是最初由Linkedin公司开发,是一个分布式、支持分区(parition)、多副本的(replica),
    基于Zookeeper协调的分布式消息系统,它最大的特性就是可以实时地处理大量数据以满足各种需求场景
    比如:基于Hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写
  • 吞吐量在10w~100w

2.模型设计图

在这里插入图片描述

3.特点

  • 一个parition只能交给一个consumer消费,因为交给多个consumer让其进行poll拉取消息进行消费,会引起重复消费的问题
  • 服务端(broker)和客户端(producer、consumer)之间通信通过TCP协议来完成
  • Topic、Partition和Broker之间的关系.
    一个topic,代表逻辑上的一个业务数据集,比如按数据库里不同表的数据操作消息区分放入不同topic,订单相关操作消息放入订单topic,用户相关操作消息放入用户topic,对于大型网站来说,后端数据都是海量的.订单消息很可能时非常巨量的,比如有几百个G甚至达到TB级别,如果把这么多数据都放在一台机器上可能会有容量限制问题,那么就可以在topic内部划分多个partition来分片存储数据,不同的parition可以位于不同的机器上,每台机器上都运行一个Kafka的进程Broker
  • 同时未发布和订阅提供高吞吐量,Kafka的设计目标是以时间复杂度O(1)的方式提供消息持久化能力的,即使对TB级别以上数据也能保证常数时间的访问性能,即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输
  • 消费消息采用Pull模式,消息被处理的状态是在Consumer端维护的,而不是由服务端维护,Broker无状态,Consumer自己保存offset

4.核心组件

  • Topic:Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
  • Parition:物理上的概念,一个topic可以分为多个partition,每个partition的内部时有序的
  • Broker: 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
  • ConsumerGroup: 每个Consumer属于一个特定的ConsumerGroup,一条消息可以被多个不同的ConsumerGroup消费,到那时一个ConsumerGroup中只能有一个Consumer能够消费该消息
  • Consumer:消息消费者,从Broker读取消息的客户端
  • Producer: 消息生产者,向Broker发送消息的客户端

5.设计机制

5.1 Kafka核心总控制器Controller

在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),
它负责管理整个集群中所有分区和副本的状态
1.当某个分区的leader副本出现故障时,由控制器负责为该分区选举出新的leader副本
2.当检测到某个分区的ISR(In-Sync-Replica)集合发生变化时,由控制器负责通知所有的broker更新元数据信息
3.当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到

5.1.1 Controller选举机制

在Kafka集群启动的时候,会自动选举以太broker作为controller来管理整个集群,
选举的过程时集群中每个broker都会尝试在zookeeper上创建一个/controller的临时节点,
zookeeper会保证有且仅有一个broker能创建成功,这个broker就会成为集群的总控制器controller
当这个controller角色的broker宕机了,此时zookeeper临时节点就会消失,集群里其他broker
会一直监听这个临时节点,发现临时节点消失了,就竞争再次创建临时节点,这就是我们说的选举机制,zookeeper又会保证有一个broker成为新的controller

5.2 Partition副本选举Leader机制

controller感知到分区Leader所在的broker挂了(controller监听了很多zk节点可以感知到broker存活)
controller会从ISR(In-sync replica)列表(参数unclean.leader.election.enable=false)里挑第一个broker作为leader(第一个broker最先放进ISR列表,
可能时同步数据最多的副本)
如果参数unclean.leader.election.enable为true,代表ISR列表里所有副本都挂了的时候可以在ISR列表以外的副本中选leader
这种设置,可以提高可用性,但是选出的新leader有肯恩那个数据少很多,
副本进入ISR列表有两个条件:
1.副本节点不能那个产生分区,必须能与zk保持会话以及跟leader副本网络连通
2.副本能复制leader上的所有写操作,并且不能落后太多。(与leader副本同步之后的副本,是由replica.lag.time.max.ms配置决定的,超过
这个时间都没有跟leader同步过的一次的副本会被移出ISR列表)

5.3 消费者消费消息的offset记录机制

每个consumer会定期将自己消费分区的offset提交给kafka内部topic:_consumer_offsets,
提交过去的时候,key时consumerGroupId+topic+分区号,value就是当前offset的值,
kafka会定期清理topic里的消息,最后就保留最新的那条数据,因为_consumer_offset可能会接收高并发的请求,
kafka默认给其分配50个分区(可以通过offsets.topic.num.paritions设置),这样可以通过加机器的方式抗大并发

5.4 消费者的rebalance机制

rebalance就是说如果消费组里的消费者数量有变化或者消费的分区数有变化,fafka会重新分配消费者消费分区的关系,比如consumer group中某个消费者挂了,此时会自动把分配给它的分区交给其他的消费者,如果它又重启了,那么,又会把一些分区重新交还给他
注意:relablance只针对subscribe这种不指定分区消费的情况,如果通过assign这种消费方式指定了分区,kafka不会进行rebalance,如下情况可能会触发消费者realance:
1.消费组里的consumer增加或减少了
2.动态给topic增加了分区
3.消费者组订阅了更多的topic,rebalance过程中,消费者无法从kafka消费消息,这对kafka的TPS会有影响,如果kafka集群内节点较多,比如数百个,那rebalance可能会耗时极多,所以尽量避免在系统高峰期的rebalance发生

5.5 producer发布消息机制

1.写入方式
producer采用push模式将消息发布到broker,每条消息都被append到partition中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)
2.消息路由
producer发送消息到broker时,会根据分区算法选择将其存储到哪一个partition,路由机制为:
2.1 指定了partition,则直接使用
2.2 未指定partition但指定key,通过对key的value进行hash选出一个partition
2.3 partition和key都未指定,使用轮询选出一个partition
3.写入流程
3.1 producer先从zookeeper的/brokers/…/state/节点找到该partition的leader
3.2 producer消息发送给该leader
3.3 leader将消息写入本地log
3.4 followers从leader pull消息,写入本地log后向leader发送ACK
3.5 leader收到所有的ISR中的replica的ACK后,增加HW(high watermark,最后commit的offset)并向producer发送ACK

5.6 HW与LEO机制

5.6.1

HW俗称高水位,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO(log-end-offse)作为HW,consumer最多只能消费到HW所在的位置,另外每个replica都有HW,leader和follower各自负责更新自己的状态.,对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replica同步后更新HW,此时消息才能被Consumer消费,这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取,对于来自内部broker的读取请求,没有HW的限制

5.6.2

由此可见,Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种方式极大的影响了吞吐率。而异步复制的方式下,follower异步的从leader复制数据,数据只要被,leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据,而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率,还可以设置消息发送端对于发出消息持久化机制参数acks的设置

5.6.3 模型图解释

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5.7 日志分段存储

5.7.1

Kafka一个分区的消息数据对应存储在一个文件夹下,以topic名称+分区号命名,消息在分区内是分段(Segment)存储,
每个段的消息都存储在不一样的log文件里,这种特性方便old segment file快速被删除,kafka规定了一个段位的log文件
最大为1G,做这个限制的目的是为了方便把log文件加载到内存去操作

5.7.2 文件类型
  • index文件:部分消息的offset索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的offset到index中,如果要定位消息的offset会在这个文件里快速定位,再去log文件里找具体消息
    00000000000000000000.index
  • log文件:消息存储文件,主要存offset和消息体 00000000000000000000.log
  • timeindex文件: 消息的发送时间索引文件,kafka每次往分区发4K(可配置)消息的发送时间戳与对应的,offset到timeindex文件,如果需要按照时间来定位消息的offset,会先在这个文件里找
    00000000000000000000.log
5.7.3

KafkaBroker有一个参数,log.setgment.bytes,限定了每个日志段文件的大小,最大就是1GB.
一个日志段文件满了,就自动开一个新的日志段文件来写入,避免单个文件过大,影响文件的读写性能,这个过程叫做log rolling,正在被写入的那个日志段文件,叫做active log segment

5.8 消费模式

  • 集群消费: 一个ConsumerGroup中,只能有一个消费者消费消息
  • 广播消费:一个ConsumerGroup中,每个消费者都可以消费到消息

6.线上规划

6.1概要设计在这里插入图片描述

6.2 JVM参数设置

export KAFKA_HEAP_OPTS="-Xmx16G 
-Xms16G -Xmn10G -XX:MetaspaceSize=256M 
-XX:+UseG1GC -XX:MaxGCPauseMillis=50
 -XX:G1HeapRegionSIze=16M"

Kafka是由Scala语言开发,运行在JVm上,需要对JVM参数合理设置
修改/bin/kafka-start-server.sh中的jvm设置,假设机器是32G内存,可进行如上设置:

这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于一次minor gc就花费太长时间,当然,因为像Kafka、RocketMQ ES这些中间件,写数据到磁盘会用到操作系统的Page Cache,所以JVM内存不宜分配过大,需要给操作系统的缓存预留出几个G

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1397325.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

像操作本地文件一样操作linux文件 centos7环境下samba共享服务搭建详细教程

1.安装dnf yum -y install dnf 2.安装samba dnf install samba -y 3.配置 3.1创建并设置用户信息 #创建用户 useradd -M -s /sbin/nologin samba echo 123|passwd --stdin samba mkdir /home/samba chown -R samba:samba /home/samba smbpasswd -a samba smaba设置密码示…

nodejs下载安装

一、node下载安装 官网下载 官网 根据自己电脑系统选择合适的版本进行下载,我这里选择window 64 位 下载完点击安装 打开cmd查看安装 此处说明下:新版的Node.js已自带npm,安装Node.js时会一起安装,npm的作用就是对Node.js…

实现仿ChatGPT光标跟随效果

先看效果 实现效果 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>光标闪烁效果</title>…

使用 MinIO 和 PostgreSQL 简化数据事件

本教程将教您如何使用 Docker 和 Docker Compose 在 MinIO 和 PostgreSQL 之间设置和管理数据事件&#xff0c;也称为存储桶或对象事件。 您可能已经在利用 MinIO 事件与外部服务进行通信&#xff0c;现在您将通过使用 PostgreSQL 自动化和简化数据事件管理来增强数据处理能力…

机器人导纳控制实现框架

Safe, Stable and Intuitive Control for Physical Human-Robot Interaction - 知乎关于文章《Safe, Stable and Intuitive Control for Physical Human-Robot Interactio》的简记。 Safe, Stable and Intuitive Control for Physical Human-Robot Interaction目的根据力导数作…

设计一个网页爬虫

定义 User Case 和 约束 注意&#xff1a;没有一个面试官会阐述清楚问题&#xff0c;我们需要定义Use case和约束 Use cases 我们的作用域只是处理以下Use Case&#xff1a; Service 爬取一批 url 生成包含搜索词的单词到页面的反向索引给页面生成标题和片段– 标题和片段是…

ptrade 通过mysql的链接开发一个量化管理平台。

这里只写一下界面及想法。不进行代码的实现。因为对流程不是很熟 ###界面 数据库的链接&#xff1a; ptrade USER 可转债量化分析 PASSWORD 123456 MYSQL_HOST mysql.sqlpub.com MYSQL_PORT 3306 MYSQL_DB ptradedef get_mysql_conn():import pymysqltry:conn pym…

maven编译时依赖报错 Caused by: java.util.zip.ZipException: zip file is empty 错误。

出现这种报错时&#xff0c;可能是maven仓库下对应的依赖出现了问题&#xff0c;需要讲报错依赖位置的依赖进行删除&#xff0c;在编译的时候就会重新下载&#xff0c;就不会出现错误了。 rm -rf /Applications/software/env/repository/org/apache/orc/orc-core/1.9.1/

Yield Guild Games 宣布与区块链游戏中心 Iskra 建立战略合作伙伴关系

Yield Guild Games (YGG) 宣布将向 Iskra 引入其任务系统&#xff0c;Iskra 是一个 Web3 游戏中心和发布平台&#xff0c;拥有超过 400 万注册钱包和 10 万月度活跃用户 (MAU)。在 LINE、Kakao、Wemade 和 Netmarble 等公司的支持下&#xff0c;Iskra 将游戏玩家和游戏工作室聚…

验收测试的重要性:确保交付高质量产品

在软件开发生命周期中&#xff0c;验收测试扮演着至关重要的角色&#xff0c;它不仅是项目的最后一道关卡&#xff0c;更是确保交付高质量产品的关键步骤。本文将介绍验收测试的重要性&#xff0c;以及它在软件开发过程中的作用。 1. 确认功能符合需求 验收测试的首要任务是验证…

逆向思维,去重Cube计算优化新技巧

场景描述 在做数据汇总计算和统计分析时&#xff0c;最头疼的就是去重类指标计算&#xff08;比如用户数、商家数等&#xff09;&#xff0c;尤其还要带多种维度的下钻分析&#xff0c;由于其不可累加的特性&#xff0c;几乎每换一种统计维度组合&#xff0c;都得重新计算。数…

匿名发送短信

匿名发送短信 匿名发送短信啦&#xff01;不用程序猿&#xff0c;也能定制专属推送消息&#xff01;每日小惊喜&#xff01; 还可以领取课程资料&#xff01;软考中级软件设计师&#xff0c;高级信息系统项目管理师&#xff01; 先说在哪里 微信搜公众号&#xff1a;暮看云 微…

人工智能 | 自然语言处理的发展历程

github&#xff1a;https://github.com/MichaelBeechan CSDN&#xff1a;https://blog.csdn.net/u011344545 自然语言处理的发展 方向一&#xff1a;技术进步1. 基于规则的语法&#xff08;1950-1990&#xff09;2. 统计语言处理&#xff08;1990-2010&#xff09;3. 基于深度学…

ChatGPT正确打开方式与GPT-4.5的key最新获取方式

前言 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家&#xff1a;https://www.captainbed.cn/z ChatGPT体验地址 文章目录 前言4.5key价格泄漏ChatGPT4.0使用地址ChatGPT正确打开方式最新功能语音助手存档…

计算机网络-标准化工作及相关组织与性能指标(标准分类 标准化工作 RFC 速率 带宽 吞吐量 时延 时延带宽积 RTT 利用率)

文章目录 标准化工作及相关组织标准化工作标准分类RFC流程标准化的相关组织小结 性能指标速率带宽吞吐量时延发送时延传播时延排队时延与处理时延补充 高速链路 时延带宽积往返时间RTT利用率小结 标准化工作及相关组织 标准化工作 即需要统一标准&#xff0c;这样才能兼容 …

Linux 时间同步 - Chrony服务

Linux 时间同步 - Chrony服务 引言一、简单使用二、详解2.1 chrony.conf2.2 chronyd2.3 chronyc 引言 为什么需要时间同步? 其意义可参考秦朝统一度量衡&#xff0c;车同轨&#xff0c;书同文。核心就是方便协同工作。 Chrony能更精确、更快的同步时钟&#xff0c;传统ntp需要…

014集:python访问互联网:网络爬虫实例—python基础入门实例

以pycharm环境为例&#xff1a; 首先需要安装各种库(urllib&#xff1a;requests&#xff1a;Openssl-python等) python爬虫中需要用到的库&#xff0c;大致可分为&#xff1a;1、实现 HTTP 请求操作的请求库&#xff1b;2、从网页中提取信息的解析库&#xff1b;3、Python与…

代码随想录算法训练营day13|239.滑动窗口最大值、347.前K个高频元素

239. 滑动窗口最大值 347.前 K 个高频元素 239. 滑动窗口最大值 &#xff08;一刷至少需要理解思路&#xff09; 之前讲的都是栈的应用&#xff0c;这次该是队列的应用了。 本题算比较有难度的&#xff0c;需要自己去构造单调队列&#xff0c;建议先看视频来理解。 题目链接/文…

css3轮播图案例

轮播图案例 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><style>…

HTML111111111

在线编辑器 在线 HTML 空元素 没有内容的 HTML 元素被称为空元素。空元素是在开始标签中关闭的。 即使 在所有浏览器中都是有效的&#xff0c;但使用 其实是更长远的保障。 HTML 水平线 标签在 HTML 页面中创建水平线。 hr 元素可用于分隔内容。 HTML 折行 如果您希望…