Kafka生产者相关概念

news2025/1/23 10:45:56

文章目录

    • Kafka工作流程
    • Kafka文件存储
    • 生产者分区策略
    • 生产者ISR
    • 生产者ack机制
    • 数据一致性问题
    • ExactlyOnce

Kafka工作流程

Kafka中消息是以topic进行分类的,Producer生产消息,Consumer消费消息,都是面向topic的。

Kafka工作流程

Topic是逻辑上的概念,Partition是物理上的概念,每个Partition对应着一个log文件,该log文件中存储的就是producer生产的数据。

写入方式
producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞率)。

分区(Partition)
Kafka集群有多个消息代理服务器(broker-server)组成,发布到Kafka集群的每条消息都有一个类别,用主题(topic)来表示。通常,不同应用产生不同类型的数据,可以设置不同的主题。一个主题一般会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者都可以接收到生成者写入的新消息。

Kafka文件存储

kafka集群为每个主题维护了分布式的分区(partition)日志文件,物理意义上可以把主题(topic)看作进行了分区的日志文件(partition log)。主题的每个分区都是一个有序的、不可变的记录序列,新的消息会不断追加到日志中。分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,叫做偏移量(offset),这个偏移量能够唯一地定位当前分区中的每一条消息。

消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:

下图中的topic有3个分区,每个分区的偏移量都从0开始,不同分区之间的偏移量都是独立的,不会相互影响。
这里写图片描述

这里写图片描述

我们可以看到,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值。

发布到Kafka主题的每条消息包括键值和时间戳。消息到达服务器端的指定分区后,都会分配到一个自增的偏移量。原始的消息内容和分配的偏移量以及其他一些元数据信息最后都会存储到分区日志文件中。消息的键也可以不用设置,这种情况下消息会均衡地分布到不同的分区。

Kafka文件存储也是通过本地落盘的方式存储的,主要是通过相应的log与index等文件保存具体的消息文件。

生产者不断的向log文件追加消息文件,为了防止log文件过大导致定位效率低下,Kafka的log文件以1G为一个分界点,当.log文件大小超过1G的时候,此时会创建一个新的.log文件,同时为了快速定位大文件中消息位置,Kafka采取了分片和索引的机制来加速定位。

在kafka的存储log的地方,即文件的地方,会存在消费的偏移量以及具体的分区信息,分区信息的话主要包括.index和.log文件组成,分区目的是为了备份,所以同一个分区存储在不同的broker上,即当third-2存在当前机器kafka01上,实际上再kafka03中也有这个分区的文件(副本),分区中包含副本,即一个分区可以设置多个副本,副本中有一个是leader,其余为follower。

生产者分区策略

分区的原因

  • 方便在集群中扩展:每个partition通过调整以适应它所在的机器,而一个Topic又可以有多个partition组成,因此整个集群可以适应适合的数据
  • 可以提高并发:以Partition为单位进行读写。类似于多路。

分区的原则

  • 指明partition(这里的指明是指第几个分区)的情况下,直接将指明的值作为partition的值。

  • 没有指明partition的情况下,但是存在值key,此时将key的hash值与topic的partition总数进行取余得到partition值。

  • 值与partition均无的情况下,第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与topic可用的partition总数取余得到partition值,即round-robin算法。

生产者ISR

为保证producer发送的数据能够可靠的发送到指定的topic中,topic的每个partition收到producer发送的数据后,都需要向producer发送ackacknowledgement,如果producer收到ack就会进行下一轮的发送,否则重新发送数据。
消息发送示意图

发送ack的时机

确保有follower与leader同步完成,leader在发送ack,这样可以保证在leader挂掉之后,follower中可以选出新的leader(主要是确保follower中数据不丢失)

follower同步完成多少才发送ack

半数以上的follower同步完成,即可发送ack
全部的follower同步完成,才可以发送ack
副本数据同步策略
半数follower同步完成即发送ack

优点是延迟低

缺点是选举新的leader的时候,容忍n台节点的故障,需要2n+1个副本(因为需要半数同意,所以故障的时候,能够选举的前提是剩下的副本超过半数),容错率为1/2

全部follower同步完成完成发送ack

优点是容错率搞,选举新的leader的时候,容忍n台节点的故障只需要n+1个副本即可,因为只需要剩下的一个人同意即可发送ack了

缺点是延迟高,因为需要全部副本同步完成才可

kafka选择的是第二种,因为在容错率上面更加有优势,同时对于分区的数据而言,每个分区都有大量的数据,第一种方案会造成大量数据的冗余。虽然第二种网络延迟较高,但是网络延迟对于Kafka的影响较小。

ISR(同步副本集)
猜想

采用了第二种方案进行同步ack之后,如果leader收到数据,所有的follower开始同步数据,但有一个follower因为某种故障,迟迟不能够与leader进行同步,那么leader就要一直等待下去,直到它同步完成,才可以发送ack,此时需要如何解决这个问题呢?

解决

leader中维护了一个动态的ISR(in-sync replica set),即与leader保持同步的follower集合,当ISR中的follower完成数据的同步之后,给leader发送ack,如果follower长时间没有向leader同步数据,则该follower将从ISR中被踢出,该之间阈值由replica.lag.time.max.ms参数设定。当leader发生故障之后,会从ISR中选举出新的leader

生产者ack机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没有必要等到ISR中所有的follower全部接受成功。

Kafka为用户提供了三种可靠性级别,用户根据可靠性和延迟的要求进行权衡选择不同的配置。

ack参数配置

0:producer不等待broker的ack,这一操作提供了最低的延迟,broker接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据

1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将丢失数据。(只是leader落盘)

-1(all):producer等待broker的ack,partition的leader和ISR的follower全部落盘成功才返回ack,但是如果在follower同步完成后,broker发送ack之前,如果leader发生故障,会造成数据重复。(这里的数据重复是因为没有收到,所以继续重发导致的数据重复)

producer返ack,0无落盘直接返,1只leader落盘然后返,-1全部落盘然后返

数据一致性问题

img

LEO(Log End Offset): 每个副本最后的一个offset
HW(High Watermark): 高水位,指代消费者能见到的最大的offset,ISR队列中最小的LEO。
follower故障和leader故障

follower故障: follower发生故障后会被临时提出ISR,等待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步,等待该follower的LEO大于等于该partition的HW,即follower追上leader之后,就可以重新加入ISR了。
leader故障:leader发生故障之后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据的一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader中同步数据。
这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复

ExactlyOnce

将服务器的ACK级别设置为-1(all),可以保证producer到Server之间不会丢失数据,即At Least Once至少一次语义。将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once至多一次。

At Least Once可以保证数据不丢失,但是不能保证数据不重复,而At Most Once可以保证数据不重复,但是不能保证数据不丢失,对于重要的数据,则要求数据不重复也不丢失,即Exactly Once即精确的一次。

在0.11版本的Kafka之前,只能保证数据不丢失,在下游对数据的重复进行去重操作,多余多个下游应用的情况,则分别进行全局去重,对性能有很大影响。

0.11版本的kafka,引入了一项重大特性:幂等性,幂等性指代Producer不论向Server发送了多少次重复数据,Server端都只会持久化一条数据。幂等性结合At Least Once语义就构成了Kafka的Exactly Once语义。

启用幂等性,即在Producer的参数中设置enable.idempotence=true即可,Kafka的幂等性实现实际是将之前的去重操作放在了数据上游来做,开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一个Partition的消息会附带Sequence Number,而Broker端会对<PID,Partition,SeqNumber>做缓存,当具有相同主键的消息的时候,Broker只会持久化一条。

但PID在重启之后会发生变化,同时不同的Partition也具有不同的主键,所以幂等性无法保证跨分区跨会话的Exactly Once。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1532398.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Godot4.2】实现简单时钟组件

概述 Godot的CanvasItem类型提供的绘图函数&#xff0c;以及向量旋转&#xff0c;玩起来很有意思。 基于_draw的实现 用向量旋转的方法&#xff0c;可以实现时钟的分针、时针、秒针的旋转。再通过每帧调用_draw更新绘图就可以让时钟动起来了。 当然我们还需要从Time单例那里…

Amuse .NET application for stable diffusion

Amuse github地址&#xff1a;https://github.com/tianleiwu/Amuse .NET application for stable diffusion, Leveraging OnnxStack, Amuse seamlessly integrates many StableDiffusion capabilities all within the .NET eco-system Welcome to Amuse! Amuse is a profes…

【Maven】使用maven-jar、maven-assembly、maven-shade优雅的实现第三方依赖一同打Jar包

文章目录 一.前言二.常规Jar 打包&#xff1a;maven-jar-plugin三.Shade 打包&#xff1a;maven-shade-plugin1.如何使用2.将部分jar包添加或排除3.将依赖jar包内部资源添加或排除4.自动将所有不使用的类排除5.将依赖的类重命名并打包进来 &#xff08;隔离方案&#xff09;6.修…

使用ansible批量修改操作系统管理员账号密码

一、ansible server端配置 1、对于Linux主机配置免密登录ssh-copy-id -i ~/.ssh/id_rsa.pub rootremote_ip 2、在/etc/ansible/hosts文件中添加相应主机IP 3、对于Windows主机需要在/etc/ansible/hosts文件中进行以下配置 192.168.83.132 ansible_ssh_useradministrator an…

147 Linux 网络编程3 ,高并发服务器 --多路I/O转接服务器 - select

从前面的知识学习了如何通过socket &#xff0c;多进程&#xff0c;多线程创建一个高并发服务器&#xff0c;但是在实际工作中&#xff0c;我们并不会用到前面的方法 去弄一个高并发服务器&#xff0c;有更加好用的方法&#xff0c;就是多路I/O转接器 零 多路I/O转接服务器 多…

电脑硬盘误删怎么恢复,误删硬盘的文件能不能再恢复

误删硬盘的文件能不能再恢复&#xff1f;很多朋友都很关心这个问题&#xff0c;不用担心&#xff0c;误删硬盘文件是可以恢复的&#xff01;使用电脑不可避免会遇到一些糊涂的时刻&#xff0c;比如误删了重要的文件。当我们发现自己不小心将硬盘上的文件删除时&#xff0c;心里…

【STM32】读写BKP备份寄存器RTC实时时钟

目录 BKP BKP简介 BKP基本结构 BKP测试代码 RTC RTC简介 RTC框图 RTC基本结构 硬件电路 RTC操作注意事项 接线图 初始化 使用BKP解决只初始化一次时间 初始化参考代码 RTC设置时间 RTC读取时间 完整代码 MyRTC.c MyRTC.h main.c BKP BKP简介 BKP&#xff0…

Centos7部署单节点MongoDB(V4.2.25)

&#x1f388; 作者&#xff1a;互联网-小啊宇 &#x1f388; 简介&#xff1a; CSDN 运维领域创作者、阿里云专家博主。目前从事 Kubernetes运维相关工作&#xff0c;擅长Linux系统运维、开源监控软件维护、Kubernetes容器技术、CI/CD持续集成、自动化运维、开源软件部署维护…

Apipost数据模型上线,解决相似数据结构复用问题

在API设计和开发过程中&#xff0c;存在许多瓶颈&#xff0c;其中一个主要问题是在遇到相似数据结构的API时会产生重复性较多的工作&#xff1a;在每个API中都编写相同的数据&#xff0c;这不仅浪费时间和精力&#xff0c;还容易出错并降低API的可维护性。 为了解决这个问题&a…

乐优商城(九)数据同步RabbitMQ

1. 项目问题分析 现在项目中有三个独立的微服务&#xff1a; 商品微服务&#xff1a;原始数据保存在 MySQL 中&#xff0c;从 MySQL 中增删改查商品数据。搜索微服务&#xff1a;原始数据保存在 ES 的索引库中&#xff0c;从 ES 中查询商品数据。商品详情微服务&#xff1a;做…

【phoenix】flink程序执行phoenix,phoenix和flink-sql-connector-hbase包类不兼容

问题报错 Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ClusterStatusListener$MulticastListener not org.apache.hadoop.hbase.client.ClusterStatusListener$Listener如下图&…

语音识别教程:Whisper

语音识别教程&#xff1a;Whisper 一、前言 最近看国外教学视频的需求&#xff0c;有些不是很适应&#xff0c;找了找AI字幕效果也不是很好&#xff0c;遂打算基于Whisper和GPT做一个AI字幕给自己。 二、具体步骤 1、安装FFmpeg Windows: 进入 https://github.com/BtbN/FF…

使用光标精灵更换电脑鼠标光标样式,一键安装使用

想要让自己在使用电脑时更具个性化&#xff0c;让工作和娱乐更加愉快&#xff0c;改变你的电脑指针光标皮肤可能是一个简单而有效的方法。很多人或许并不清楚如何轻松地调整电脑光标样式&#xff0c;下面我就来分享一种简单的方法。 电脑光标在系统里通常只有几种默认图案&…

支付宝手机网站支付,微信扫描二维码支付

支付宝手机网站支付 支付宝文档 响应示例 <form name"punchout_form" method"post" action"https://openapi.alipay.com/gateway.do?charsetUTF-8&methodalipay.trade.wap.pay&formatjson&signERITJKEIJKJHKKKKKKKHJEREEEEEEEEEEE…

软件的安装与卸载(YUM)

YUM&#xff1a;yum 是一个方便的"应用商店"&#xff0c;你可以通过它轻松地安装、更新和删除软件包&#xff0c;就像从应用商店中下载和安装应用程序一样。&#xff08;这个得用root身份&#xff0c;普通用户权限不够&#xff09; 常用命令&#xff1a; 1.安装软件…

提供数字免疫力:采取整体方法来优化您的网络

采用数字技术已成为许多美国企业的关键竞争优势&#xff0c;导致其在与新部署的云解决方案的安全连接方面的投资不断增加。然而&#xff0c;随着越来越多的关键应用程序迁移到云端&#xff0c;公司保护其敏感数据和资源变得更具挑战性&#xff0c;因为这些资产现在超出了内部防…

计算机网络相关

OSI七层模型 各层功能&#xff1a; TCP/IP四层模型 应用层 传输层 网络层 网络接口层 访问一个URL的全过程 在浏览器中输入指定网页的 URL。 浏览器通过 DNS 协议&#xff0c;获取域名对应的 IP 地址。 浏览器根据 IP 地址和端口号&#xff0c;向目标服务器发起一个 TCP…

c++ 指针大小

C的一个指针占内存几个字节&#xff1f; 结论&#xff1a; 取决于是64位编译模式还是32位编译模式&#xff08;注意&#xff0c;和机器位数没有直接关系&#xff09; 在64位编译模式下&#xff0c;指针的占用内存大小是8字节在32位编译模式下&#xff0c;指针占用内存大小是4字…

11种创造型设计模式(下)

观察者模式 我们可以比喻观察者模式是一种类似广播的设计模式 介绍 观察者模式&#xff1a;对象之间多对一依赖的一种设计方案&#xff0c;被依赖的对象是Subject&#xff0c;依赖的对象是Observer&#xff0c;Subject通知Observer变化。 代码 说明&#xff1a; WeatherStat…

sdsl库编译安装和使用

1. 下载和编译 git clone https://github.com/simongog/sdsl-lite.git cd sdsl-lite# 建一个conda环境 激活环境&#xff0c;安装cmake。 ./install.sh /usr/local/ 2. 示例代码 #include <sdsl/suffix_arrays.hpp> #include <fstream>using namespace sdsl;int…