Kafka的生产者和消费者机制

news2024/11/16 9:35:21

目录

1.基础的客户端

1.1消息发送者的主流程

1.2消息消费者主流程

2.客户端工作机制

2.1消费者分组消费机制

2.2生产者拦截器机制

2.3消息序列化机制

2.4消息分区路由机制

2.5生产者消息缓存机制

2.6发送应答机制

2.7生产者消息幂等性

(1)生产者消息幂等性介绍

(2)解决方案

2.8生产者消息事务机制

(1)事务引入原因

(2)具体流程

3.客户端流程总结


Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。

1.基础的客户端

1.1消息发送者的主流程

  1. 设置Producer核心属性
  2. 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value
  3. 使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送三种发送方式。

注意:

  • 单向发送:不关心服务端的应答;
  • 同步发送:获取到服务端应答消息前,会阻塞当前线程;
  • 异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数。

1.2消息消费者主流程

  1. 设置Consumer核心属性
  2. 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
  3. 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送

2.客户端工作机制

2.1消费者分组消费机制

offset偏移量表示每个消费者组在每个Partiton中已经消费处理的进度。在Kafka中,可以看到消费者组的Offset记录情况。 

个Offset偏移量,需要消费者处理完成后主动向Kafka的Broker提交。提交完成后,Broker就会更新消费进度,表示这个消息已经被这个消费者组处理完了。但是如果消费者没有提交Offset,Broker就会认为这个消息还没有被处理过,就会重新往对应的消费者组进行推送,不过这次,一般会尽量推送给同一个消费者组当中的其他消费者实例。

在示例当中,是通过业务端主动调用Consumer的commitAsync(异步)方法或者commitSync(同步)方法主动提交的,Kafka中自然也提供了自动提交Offset的方式。使用自动提交,只需要在Comsumer中配置ENABLE_AUTO_COMMIT_CONFIG属性即可。

(1)Offset是Kafka进行消息推送控制的关键之处

  1. Offset是根据Group、Partition分开记录的。消费者如果一个Partition对应多个Consumer消费者实例,那么每个Consumer实例都会往Broker提交同一个Partition的不同Offset,这时候Broker要听谁的?所以一个Partition最多只能同时被一个Consumer消费。也就是说,示例中四个Partition的Topic,那么同一个消费者组中最多就只能配置四个消费者实例
  2. 这么关键的Offset数据,保存在Broker端,但是却是由"不靠谱"的消费者主导推进,这显然是不够安全的。那么应该如何提高Offset数据的安全性呢?如果你有兴趣自己观察,会发现在Consumer中,实际上也提供了AUTO_OFFSET_RESET_CONFIG参数,来指定消费者组在服务端的Offset不存在时如何进行后续消费。(有可能服务端初始化Consumer Group的Offset失败,也有可能Consumer Group当前的Offset对应的数据文件被过期删除了。)这就相当于服务端做的兜底保障。

(2)消费者应该要如何保证offset的安全性

有两种方式:一种是异步提交。就是消费者在处理业务的同时,异步向Broker提交Offset。这样好处是消费者的效率会比较高,但是如果消费者的消息处理失败了,而offset又成功提交了。这就会造成消息丢失

另一种方式是同步提交。消费者保证处理完所有业务后,再提交Offset。这样的好处自然是消息不会因为offset丢失了。因为如果业务处理失败,消费者就可以不去提交Offset,这样消息还可以重试。但是坏处是消费者处理信息自然就慢了。另外还会产生消息重复因为Broker端不可能一直等待消费者提交。如果消费者的业务处理时间比较长,这时在消费者正常处理消息的过程中,Broker端就已经等不下去了,认为这个消费者处理失败了。这时就会往同组的其他消费者实例投递消息,这就造成了消息重复处理

这类问题的根源在于Offset反映的是消息的处理进度。而消息处理进度跟业务的处理进度又是不同步的。所有我们可以换一种思路,将Offset从Broker端抽取出来,放到第三方存储比如Redis里自行管理。这样就可以自己控制用业务的处理进度推进Offset往前更新。

2.2生产者拦截器机制

拦截器机制一般用得比较少,主要用在一些统一添加时间等类似的业务场景。比如,用Kafka传递一些POJO,就可以用拦截器统一添加时间属性。但是我们平常用Kafka传递的都是String类型的消息,POJO类型的消息,Kafka可以传吗?这就要用到下面的消息序列化机制。

2.3消息序列化机制

Kafka内部发送和接收消息的时候,使用的是byte[]字节数组的方式(RPC底层也是用这种通讯格式)

在Kafka中,对于常用的一些基础数据类型,都已经提供了对应的实现类。但是,如果需要使用一些自定义的消息格式,比如自己定制的POJO,就需要定制具体的序列化机制了。(需要考虑的是如何用二进制来描述业务数据

序列化机制的实现方法:

如对于一个通常的POJO类型,可以将他的属性拆分成两种类型:一种类型是定长的基础类型,比如Integer, Long,Double等。这些基础类型转化成二进制数组都是定长的。这类属性可以直接转成序列化数组,在反序列化时,只要按照定长去读取二进制数据就可以反序列化了。另一种是不定长的浮动类型,比如String,或者基于String的JSON类型等。这种浮动类型的基础数据转化成二进制数组,长度都是不一定的。对于这类数据,通常的处理方式都是先往二进制数组中写入一个定长的数据的长度数据(Integer或者Long类型),然后再继续写入数据本身。这样,反序列化时,就可以先读取一个定长的长度,再按照这个长度去读取对应长度的二进制数据,这样就能读取到数据的完整二进制内容。

2.4消息分区路由机制

Kafka默认提供了三种消费者的分区分配策略:

  1. range策略:比如一个Topic有10个Partiton(partition 0~9) 一个消费者组下有三个Consumer(consumer1~3)。Range策略就会将分区0~3分给一个Consumer,4~6给一个Consumer,7~9给一个Consumer。
  2. round-robin策略:轮询分配策略,可以理解为在Consumer中一个一个轮流分配分区。比如0,3,6,9分区给一个Consumer,1,4,7分区给一个Consumer,然后2,5,8给一个Consumer。
  3. sticky策略:粘性策略。这个策略有两个原则:
  • 开始分区时,尽量保持分区的分配均匀。比如按照Range策略分(这一步实际上是随机的)。
  • 分区的分配尽可能的与上一次分配的保持一致。比如在range分区的情况下,第三个Consumer的服务宕机了,那么按照sticky策略,就会保持consumer1和consumer2原有的分区分配情况。然后将consumer3分配的7~9分区尽量平均的分配到另外两个consumer上。这种粘性策略可以很好的保持Consumer的数据稳定性。

也可以自定义实现分区路由机制。

2.5生产者消息缓存机制

Kafka生产者为了避免高并发请求对服务端造成过大压力,每次发消息时并不是一条一条发往服务端,而是增加了一个高速缓存,将消息集中到缓存后,批量进行发送。这种缓存机制也是高并发处理时非常常用的一种机制。

Kafka的消息缓存机制涉及到KafkaProducer中的两个关键组件: accumulatorsender

RecordAccumulator,就是Kafka生产者的消息累加器。Kafka Producer要发送的消息都会在ReocrdAccumulator中缓存起来,然后再分批发送给kafka broker。在RecordAccumulator中,会针对每一个Partition,维护一个Deque双端队列,这些Dequeue队列基本上是和Kafka服务端的Topic下的Partition对应的。每个Dequeue里会放入若干个ProducerBatch数据。 (涉及到两个参数:BUFFER_MEMORY_CONFIG 是指RecordAccumulator缓冲区大小,BATCH_SIZE_CONFIG 是指缓冲区中每一个batch的大小)

KafkaProducer每次发送的消息,都会根据key分配到对应的Deque队列中。然后每个消息都会保存在这些队列中的某一个ProducerBatch中。而消息分发的规则,就是由上面的Partitioner组件完成的。

Sender就是KafkaProducer中用来发送消息的一个单独的线程。从这里可以看到,每个KafkaProducer对象都对应一个sender线程。他会负责将RecordAccumulator中的消息发送给Kafka。

nder也并不是一次就把RecordAccumulator中缓存的所有消息都发送出去,而是每次只拿一部分消息。他只获取RecordAccumulator中缓存内容达到BATCH_SIZE_CONFIG大小的ProducerBatch消息。当然,如果消息比较少,ProducerBatch中的消息大小长期达不到BATCH_SIZE_CONFIG的话,Sender也不会一直等待。最多等待LINGER_MS_CONFIG时长。然后就会将ProducerBatch中的消息读取出来。(LINGER_MS_CONFIG默认值是0 )

然后,Sender对读取出来的消息,会以Broker为key,缓存到一个对应的队列当中。这些队列当中的消息就称为InflightRequest。接下来这些Inflight就会一 一发往Kafka对应的Broker中,直到收到Broker的响应,才会从队列中移除。这些队列也并不会无限缓存,最多缓存MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION(默认值为5)个请求。

注意:生产者缓存机制的主要目的是将消息打包,减少网络IO频率。所以,在Sender的InflightRequest队列中,消息也不是一条一条发送给Broker的,而是一批消息一起往Broker发送。而这就意味着这一批消息是没有固定的先后顺序的。

最后,Sender会通过其中的一个Selector组件完成与Kafka的IO请求,并接收Kafka的响应。

补充:Kafka的生产者缓存机制是Kafka面对海量消息时非常重要的优化机制。合理优化这些参数,对于Kafka集群性能提升是非常重要的。比如如果你的消息体比较大,那么应该考虑加大batch.size,尽量提升batch的缓存效率。而如果Producer要发送的消息确实非常多,那么就需要考虑加大total.memory参数,尽量避免缓存不够造成的阻塞。如果发现生产者发送消息比较慢,那么可以考虑提升max.in.flight.requests.per.connection参数,这样能加大消息发送的吞吐量。

2.6发送应答机制

在Producer将消息发送到Broker后,要怎么确定消息是不是成功发到Broker上了呢?

这里涉及到的,就是在Producer端一个不太起眼的属性ACKS_CONFIG这个属性更大的作用在于保证消息的安全性,尤其在replica-factor备份因子比较大的Topic中,尤为重要

  • acks=0,生产者不关心Broker端有没有将消息写入到Partition,只发送消息就不管了。吞吐量是最高的,但是数据安全性是最低的。
  • acks=all or -1生产者需要等Broker端的所有Partiton(Leader Partition以及其对应的Follower Partition )都写完了才能得到返回结果,这样数据是最安全的,但是每次发消息需要等待更长的时间,吞吐量是最低的。
  • acks设置成1,则是一种相对中和的策略。Leader Partition在完成自己的消息写入后,就向生产者返回结果。

应用场景:在生产环境中,acks=0可靠性太差,很少使用。acks=1,一般用于传输日志等,允许个别数据丢失的场景。使用范围最广。acks=-1,一般用于传输敏感数据,比如与钱相关的数据。

注意:如果ack设置为all或者-1 ,Kafka也并不是强制要求所有Partition都写入数据后才响应。在Kafka的Broker 服务端会有一个配置参数min.insync.replicas,控制Leader Partition在完成多少个Partition的消息写入后,往Producer返回响应。这个参数可以在broker.conf文件中进行配置。

2.7生产者消息幂等性

(1)生产者消息幂等性介绍

当Producer的acks设置为1或-1时,Producer每次发送消息都是需要获取Broker端返回的RecordMetadate的。这个过程就需要两次跨网络请求。

如果要保证消息安全,那么对于每个消息,这两次网络请求就必须要求是幂等的。但是,网络是不靠谱的,在高并发场景下,往往没办法保证这两个请求是幂等的。Producer发送消息的过程中,如果第一步请求成功了, 但是第二步却没有返回。这时,Producer就会认为消息发送失败了。那么Producer必然会发起重试。重试次数由参数ProducerConfig.RETRIES_CONFIG,默认值是Integer.MAX。

这时问题就来了。Producer会重复发送多条消息到Broker中。Kafka如何保证无论Producer向Broker发送多少次重复的数据,Broker端都只保留一条消息,而不会重复保存多条消息呢?这就是Kafka消息生产者的幂等性问题。

(2)解决方案

分布式数据传递过程中的三个数据语义

at-least-once:至少一次;at-most-once:最多一次;exactly-once:精确一次。

通常意义上,at-least-once可以保证数据不丢失,但是不能保证数据不重复。而at-most-once保证数据不重复,但是又不能保证数据不丢失。这两种语义虽然都有缺陷,但是实现起来相对来说比较简单。但是对一些敏感的业务数据,往往要求数据即不重复也不丢失,这就需要支持Exactly-once语义。而要支持Exactly-once语义,需要有非常精密的设计。

Kafka为了保证消息发送的Exactly-once语义,增加了几个概念:

  • PID:每个新的Producer在初始化的过程中就会被分配一个唯一的PID。这个PID对用户是不可见的。
  • Sequence Numer: 对于每个PID,这个Producer针对Partition会维护一个sequenceNumber。这是一个从0开始单调递增的数字。当Producer要往同一个Partition发送消息时,这个Sequence Number就会加1。然后会随着消息一起发往Broker
  • Broker端则会针对每个<PID,Partition>维护一个序列号(SN),只有当对应的SequenceNumber = SN+1时,Broker才会接收消息,同时将SN更新为SN+1。否则,SequenceNumber过小就认为消息已经写入了,不需要再重复写入。而如果SequenceNumber过大,就会认为中间可能有数据丢失了。对生产者就会抛出一个OutOfOrderSequenceException。

2.8生产者消息事务机制

(1)事务引入原因

通过生产者消息幂等性问题,能够解决单生产者消息写入单分区的的幂等性问题

但是,如果是要写入多个分区呢?比如像我们的示例中,就发送了五条消息,他们的key都是不同的。这批消息就有可能写入多个Partition,而这些Partition是分布在不同Broker上的。这意味着,Producer需要对多个Broker同时保证消息的幂等性

这时候,通过上面的生产者消息幂等性机制就无法保证所有消息的幂等了。这时候就需要有一个事务机制,保证这一批消息最好同时成功的保持幂等性。或者这一批消息同时失败,这样生产者就可以开始进行整体重试,消息不至于重复

(2)具体流程

Producer中的几个API:

注意:

1.一个TransactionId只会对应一个PID

如果当前一个Producer的事务没有提交,而另一个新的Producer保持相同的TransactionId,这时旧的生产者会立即失效,无法继续发送消息。

2.跨会话事务对齐

如果某个Producer实例异常宕机了,事务没有被正常提交。那么新的TransactionId相同的Producer实例会对旧的事务进行补齐保证旧事务要么提交,要么终止。这样新的Producer实例就可以以一个正常的状态开始工作。

3.客户端流程总结

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2073469.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

浅谈【数据结构】链表之单链表

目录 1、什么是数据&#xff1f; 2、什么是结构 3、什么是数据结构&#xff1f; 4、线性结构(线性表&#xff09; 4.1线性表的物理结构的实现 5、链表 5.1无头结点的单链表 5.2新内容、老面孔 5.3数组和链表的优缺点 5.4链表的概念 5.5链表的创建步骤 5.5.1创建过程…

【Linux】自动化构建工具makefile

目录 背景 makefile简单编写 .PHONY makefile中常用选项 makefile的自动推导 背景 会不会写makefile&#xff0c;从一个侧面说明了一个人是否具备完成大型工程的能力 ​ ◉ 一个工程中的源文件不计数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;mak…

MyBatis框架搭建与代码解读

前言: MyBatis是一个灵活的持久层框架&#xff0c;适合与数据库交互&#xff0c;支持自定义SQL和高级映射。搭建MyBatis的基本步骤包括&#xff1a; 环境准备&#xff1a;安装JDK、Maven和IDE。创建项目&#xff1a;建立Maven项目并设置基本信息。添加依赖&#xff1a;在pom.…

「Python数据分析」Pandas进阶,利用concat()函数连接数据(一)

在我们迈向中高级出局数据分析的过程中&#xff0c;数据的合并和连接&#xff0c;是一个非常重要的技能。 现实中&#xff0c;分散在各种数据库&#xff0c;各种数据表格&#xff0c;各种数据存储设备当中的&#xff0c;各式各样的数据&#xff0c;是我们进行数据分析的基础&a…

泛微开发修炼之旅--42Ecology大日志查看软件LogView Pro及教程

我们在项目上经常要看ecology的日志信息&#xff0c;当日志达到几百兆或者时几个G的时候&#xff0c;想要查看日志并且非常方便搜索日志中的信息&#xff0c;并不容易。 今天给大家大日志工具和教程&#xff0c;在附件中&#xff0c;各位自取吧&#xff01; 文章链接&#xff…

老师如何制作分班查询系统?

随着新学期的钟声敲响&#xff0c;老师们又迎来了一年中最忙碌的时期。不仅要处理日常的教学准备工作&#xff0c;还要面对一项重要而繁琐的任务——新生分班。分班完成后&#xff0c;老师们还需要将分班结果及时准确地通知给每一位家长&#xff0c;确保信息的传递无误。这项工…

epoll+线程池模型

&#x1f525;博客主页&#xff1a; 我要成为C领域大神&#x1f3a5;系列专栏&#xff1a;【C核心编程】 【计算机网络】 【Linux编程】 【操作系统】 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 ​ 负载均衡技术 …

AI工具 GPT 学术优化 (GPT Academic) 安装实践

GPT 学术优化 (GPT Academic)是一个综合的AI GPT工具包&#xff0c;可以完成各种gpt辅助的工作&#xff0c;比如代码解读、翻译、读论文等功能。官网&#xff1a;GitHub - binary-husky/gpt_academic: 为GPT/GLM等LLM大语言模型提供实用化交互接口&#xff0c;特别优化论文阅读…

Flutter 自动化测试 -appium-flutter-driver

上篇文章有讲述如何通过FlutterDriver实现集成测试Flutter 应用自动化测试-集成测试篇 不熟悉的小伙伴可以先去看看。 什么是Appium Flutter Driver&#xff1f; 作为Flutter开发&#xff0c;FlutterDriver是足够帮助他们进行测试的&#xff0c;而作为自动化测试工程师最大的困…

FFmpeg的入门实践系列五(编程入门之属性查看)

欢迎诸位来阅读在下的博文~ 在这里&#xff0c;在下会不定期发表一些浅薄的知识和经验&#xff0c;望诸位能与在下多多交流&#xff0c;共同努力 文章目录 前期博客参考书籍一、AVFormatContext结构体1. 结构定义2. 字段说明3.示例1&#xff08;打开与关闭音视频文件&#xff0…

机器人学——机械臂轨迹规划-1

引言 理想轨迹 步骤-1 步骤-2 笛卡尔空间下的轨迹规划 步骤-1 步骤-2 三次多项式 矩阵形式求解 det(T): 行列式&#xff0c;非齐次多项式&#xff0c;结果不为零&#xff0c;有唯一解、行列式为零&#xff08;无穷解/无解&#xff0c;还需查看增广矩阵的秩&#xff09; 速度…

Linux网络编程:多路转接--select

1. 初识select 系统提供select函数来实现多路复用输入/输出模型. select系统调用是用来让我们的程序监视多个文件描述符的状态变化的; 程序会停在select这里等待&#xff0c;直到被监视的文件描述符有一个或多个发生了状态改变 select只负责等待&#xff0c;可以等待多个fd&a…

内容创作者福音,4款文章改写神器轻松提升文章质量

在信息爆炸的时代&#xff0c;内容创作成为了连接世界的重要桥梁。作为一名专业创作者&#xff0c;我深知保持内容原创性和高质量的重要性。然而&#xff0c;灵感有时会枯竭&#xff0c;改写文章成为一项耗时且艰巨的任务。幸运的是&#xff0c;市面上有一些文章改写神器&#…

Flask+LayUI开发手记(四):弹出层实现增删改查功能

在上一节用dataTable实现数据列表时&#xff0c;已经加了表头工具栏和表内工具栏&#xff0c;栏内的按钮功能都是用来完成数据的增删改查了&#xff0c;这又分成两类功能&#xff0c;一类是删除或设置&#xff0c;这类功能简单&#xff0c;只需要选定记录&#xff0c;然后提交到…

Flutter 自动化测试 - 集成测试篇

Flutter集成测试 Flutter官方对Flutter应用测试类型做了三个阶段划分&#xff0c;分别为Unit&#xff08;单元&#xff09;测试、Widget&#xff08;组件&#xff09;测试、Integration&#xff08;集成&#xff09;测试。按照维护成本来看的话从左到右依次增高&#xff0c;按照…

预测癌症免疫治疗反应-TIDE数据库学习及知识整理

TIDE&#xff08;Tumor Immune Dysfunction and Exclusion&#xff09; 是一个用于预测癌症患者对免疫检查点抑制剂&#xff08;如PD-1/PD-L1抑制剂&#xff09;反应的算法。研究者通过检测肿瘤建模队列中每个基因的表达与效应性毒性T淋巴细胞(CTL)浸润水平的相互关系及对生存情…

Open3D 近似点体素滤波(36)

Open3D 近似点体素滤波(36) 一、算法介绍二、算法实现1.代码2.效果一、算法介绍 这个算法也是体素滤波, 它保留的点是近似点,也就是新的点,原始点云中对应位置是不存在这些点的。其他的看着类似,下面是代码,滤波抽稀结果 二、算法实现 1.代码 代码如下(示例): …

学习文件IO,让你从操作系统内核的角度去理解输入和输出(Java实践篇)

本篇会加入个人的所谓鱼式疯言 ❤️❤️❤️鱼式疯言:❤️❤️❤️此疯言非彼疯言 而是理解过并总结出来通俗易懂的大白话, 小编会尽可能的在每个概念后插入鱼式疯言,帮助大家理解的. &#x1f92d;&#x1f92d;&#x1f92d;可能说的不是那么严谨.但小编初心是能让更多人…

【在Linux世界中追寻伟大的One Piece】应用层协议HTTP

目录 1 -> HTTP协议 2 -> 认识URL 2.1 -> urlencode和urldecode 3 -> HTTP协议请求与响应格式 3.1 -> HTTP请求 3.2 -> HTTP响应 4 -> HTTP的方法 4.1 -> HTTP常见方法 5 -> HTTP的状态码 6 -> HTTP常见Header 7 -> 最简单的HTTP服…

Linux系统报错“version ‘GLIBC_2.34‘ not found”解决方法

注意&#xff0c;此文章慎用&#xff0c;glibc不可随意升级&#xff0c;可能导致系统崩溃 一、查看版本 ldd --version 二、添加高版本源 sudo vi /etc/apt/sources.list.d/my.list 进入编辑页面 "i"键进入插入模式 输入源 deb http://th.archive.ubuntu.com/…