前置知识
Kafka基本概念https://blog.csdn.net/dxh9231028/article/details/141270920?spm=1001.2014.3001.5501Kafka运行机制(一):Kafka集群启动,controller选举,生产消费流程https://blog.csdn.net/dxh9231028/article/details/141307210?spm=1001.2014.3001.5501
1. 消息确认
生产者端,客户端在通过生产者生产消息时,需要知道消息是否发送成功,防止消息丢失或进行其他操作。消费者端,消费者也需要确认自己在消费数据后,提交偏移量是否成功,防止重复消费。
生产者
生产者的消息确认机制通常只有在JavaApi中,或生产者配置了重试机制时才有意义。在JavaApi中Kafka驱动提供了同步提交和异步提交两种提交方式(因为生产者批处理的机制,Kafka可以配置缓冲区大小,消息可以暂存到缓冲区中,等到时间或消息大小达到指定值时发送),同步提交会阻塞当前线程,等待kafka返回确认信号,并更具生产者配置选择是否重试。而异步提交则不会阻塞代码,Kafka返回确认信号后执行指定的回调函数。命令行配置重试机制,也可以在Kafka返回失败的信号时,触发重新提交。而Kafka何时或者说何种条件下才会返回确认信息,则是由acks配置项控制。
生产者消息确认策略是由acks配置项控制具体如下:
acks=0
:生产者发送消息后不会等待任何确认。这种模式下,消息的丢失不可避免,适用于对消息丢失容忍的场景。acks=1
:生产者发送消息后,会等待 Leader 副本的确认。Leader 确认后,生产者会认为消息已成功写入。这种模式下,消息丢失的风险较低,但如果 Leader 宕机,消息可能会丢失。acks=all
或acks=-1
:生产者发送消息后,会等待所有 ISR(In-Sync Replicas)副本的确认。只有当所有 ISR 副本都确认了消息,生产者才会认为消息写入成功。这种模式下,消息的可靠性最高,但写入延迟可能增加。
实际生产中并非所有情况都不允许消息丢失,在视频相关的功能中,丢失几帧数据并不影响视频整体流畅度,反而是服务的响应速度对流畅度的影响更大,在这种情况下可以完全不在乎是否丢包,将ack设为0,以达到极致的性能。
其中,0和1都比较好理解,当设置为all或-1时所有的ISR副本是什么呢?
ISR是集群元数据的一部分,其中记录了各个Leader分区以及和其日志偏移量差距不差,上次同步数据时间据当前时间不长的Follower副本,和其日志偏移量和上次同步数据的时间的映射关系。
在上文中我们讲述了主题副本分区中的Leader和Follower的角色和相关功能,其中Leader 副本负责处理所有的生产请求,并将数据写入自己的日志中。Follower 副本则从 Leader 副本同步数据,将其写入到自己的日志中。当Follower在同步Ledaer的数据时,Leader会讲这次同步的Follower,其同步数据的偏移量以及同步的时间的映射关系通过controlelr节点,再由controller节点存入元数据中的ISR数据列表。当Leader将新的偏移量和时间存入ISR中时,ISR不仅会更新数据,还会检查当前内部保存的Follower的同步数据偏移量和时间是否和Leader日志中的偏移量和当前时间相差太多(配置中的replica.lag.max.messages和
replica.lag.time.max.ms
),如果超过预期值则会将该Follwer从ISR踢出。
踢出ISR并不会影响改分区副本的正常功能,不过当选举新的Leader会从ISR中选取,并且当设置acks为all时,生产消息的确认,也是通过当前生产消息的偏移量与ISR中的值进行比对,当ISR中记录的所有的Follwer偏移量都超过了这个消息对应的偏移量,则认为所有分区都已经成功,返回客户端成功响应,不会确认未再ISR中的分区副本。整体流程如下图
消费者
消费者的确认机制是依赖于提交偏移量的。不同于生产者生产消息,如果生产成功时不返回任何确认信号,客户端则无法知道自己是否生产成功。而消费者消费成功会获取最终数据,所以只要获得了数据,就是消费成功,不需要什么确认信息。不过消费者在消费成功时需要向__consumer-offsets主题中提交一个自己消费分区当前的偏移量,所以只有当成功向向__consumer-offsets主题提交了偏移量后,才叫消费成功,如果没有成功提交偏移量,后续仍然会重复消费。
消费者提交偏移量分为手动提交和自动提交,手动提交时会马上提交偏移量,即使没有消费成功,这种模式下,当消费者由于某些意外消费失败,偏移量也会加一,就会造成消息丢失。
自动提交会在消费成功时自动提交,在这种模式下,当消费者由于某些意外消费成功,但意外宕机导致没有提交偏移量,则会造成消息重复消费。
2. Kafka消息日志的存储和回收
1. 日志存储机制
Kafka 中的每个主题包含若干分区,每个分区在物理上对应一个日志文件夹,该文件夹包含一系列的日志段文件。
日志段文件
- 日志段:每个分区的日志被分成多个日志段文件。日志段是一个物理文件,存储在 Kafka 的存储目录下。每个日志段文件以偏移量范围命名。
- 日志文件命名:日志段文件的命名格式通常是
log-start-offset.log
,其中log-start-offset
是该日志段中第一条消息的偏移量。 - 索引文件:除了日志段文件外,每个分区还包含两个索引文件:时间索引文件和偏移量索引文件,用于快速查找消息。
写入机制
- 顺序写入:Kafka 中的消息是顺序写入日志段的,每条消息都有一个唯一的偏移量,表示该消息在分区中的位置。
- 追加模式:消息总是被追加到当前的日志段文件中。当日志段的大小达到配置的阈值时,Kafka 会滚动创建新的日志段。
2. 日志存储配置
Kafka 提供了一系列配置参数,控制日志的存储行为:
-
log.segment.bytes
:每个日志段文件的最大大小,默认为 1GB。当日志段大小达到这个限制时,Kafka 会创建一个新的日志段。log.segment.bytes=1073741824 # 每个日志段大小限制为1GB
-
log.roll.ms
:日志段滚动的时间限制。即使日志段未达到log.segment.bytes
限制,Kafka 也会根据时间限制滚动日志段,默认为 7 天。log.roll.ms=604800000 # 每7天滚动一次日志段
3. 日志回收机制
Kafka 的日志回收是通过删除策略和压缩策略实现的,用于管理存储空间并保持系统的高效运行。
删除策略(Log Cleanup Policy)
Kafka 提供两种主要的日志回收策略:
-
基于时间的保留策略:
-
log.retention.ms
:定义了消息在日志中的最长保留时间。超过这个时间的消息会被标记为可删除。 -
log.retention.minutes
和log.retention.hours
:这是log.retention.ms
的简写形式,以分钟和小时为单位配置。log.retention.hours=168 # 保留7天的消息
-
-
基于大小的保留策略:
-
log.retention.bytes
:定义每个分区日志的最大存储大小。当日志文件的大小超过这个值时,最旧的消息会被删除。log.retention.bytes=1073741824 # 每个分区的日志大小限制为1GB
-
日志清理策略(Log Cleanup Policy)
Kafka 允许用户为每个主题配置日志清理策略:
-
delete
策略:这是默认策略。当消息超过保留时间或日志大小限制时,Kafka 会自动删除旧消息,以释放存储空间。log.cleanup.policy=delete
-
compact
策略:此策略不删除旧消息,而是保留每个键的最新版本。log.cleanup.policy=compact
compact策略下,通过生产者生产消息时给该条消息设置一个key,当多条消息都有一个key值时,compact会删除旧的key对应的消息,保存最新的key消息。
compact可以和delete结合使用,也就是同时配置两种策略,在这种策略下会对有key的消息保存最新版本,而对于没有key的消息依据删除配置删除。
Kafka 使用后台线程定期检查日志段,并根据删除策略和保留策略执行清理操作。这些线程负责删除旧的日志段或压缩日志段,以保证 Kafka 的存储空间得到合理管理。