Kafka日志及常见问题

news2024/12/26 3:50:23

目录

1.Topic下的消息是如何存储的

1.1log文件追加记录所有消息

1.2index和timeindex加速读取日志信息

2.文件清理机制

2.1如何判断哪些日志文件过期了

2.2日志清理策略

3.Kafka的文件高效读写机制

3.1Kafka的文件结构

3.2顺序写磁盘

3.3零拷贝

3.3.1传统IO

3.3.2mmap文件映射机制

3.3.3sendfile文件传输机制

4.合理配置刷盘频率

5.客户端消费进度管理

6.常见问题

6.1消费者防止消息重新消费

6.2消息零丢失方案

6.2.1产生原因

6.2.2解决方案

6.3消息积压问题

6.4如何保证消息顺序

6.4.1如何保证Producer发到Partition上的消息是有序的

6.4.2Partition中的消息有序后,如何保证Consumer的消费顺序是有序的


本章主要讲解Kafka中每个Broker如何高效地处理以及保存消息。

1.Topic下的消息是如何存储的

在搭建Kafka服务时,我们在server.properties配置文件中通过log.dir属性指定了Kafka的日志存储目录。 实际上,Kafka的所有消息就全都存储在这个目录下。

在这些核心数据文件中,.log结尾的就是实际存储消息的日志文件。他的大小固定为1G,写满后就会新增一个新的文件。一个文件也成为一个segment文件名,名字以当前日志文件记录的第一条消息的偏移量命名。

.index和.timeindex是日志文件对应的索引文件。不过.index是以偏移量为索引来记录对应的.log日志文件中的消息偏移量。而.timeindex则是以时间戳为索引。

另外的两个文件,partition.metadata简单记录当前Partition所属的cluster和Topic。leader-epoch-checkpoint文件参见上面的epoch机制

1.1log文件追加记录所有消息

在每个文件内部,Kafka都会以追加的方式写入新的消息日志。position就是消息记录的起点,size就是消息序列化后的长度。Kafka中的消息日志,只允许追加,不支持删除和修改

每个Log文件都保持固定的大小。如果当前文件记录不下了,就会重新创建一个log文件,并以这个log文件写入的第一条消息的偏移量命名。这种设计其实是为了更方便进行文件映射,加快读消息的效率。

1.2index和timeindex加速读取日志信息

index和timeindex都是以相对偏移量的方式建立log消息日志的数据索引。比如说 0000.index和0550.index中记录的索引数字,都是从0开始的。表示相对日志文件起点的消息偏移量。而绝对的消息偏移量可以通过日志文件名 + 相对偏移量得到

index文件的作用类似于数据结构中的跳表,他的作用是用来加速查询log文件的效率。而timeindex文件的作用则是用来进行一些跟时间相关的消息处理,比如文件清理。

2.文件清理机制

2.1如何判断哪些日志文件过期了

  • log.retention.check.interval.ms:定时检测文件是否过期。默认是 300000毫秒,也就是五分钟。
  • log.retention.hours , log.retention.minutes, log.retention.ms 。 这一组参数表示文件保留多长时间。默认生效的是log.retention.hours,默认值是168小时,也就是7天。如果设置了更高的时间精度,以时间精度最高的配置为准。
  • 在检查文件是否超时时,是以每个.timeindex中最大的那一条记录为准

2.2日志清理策略

日志清理策略(log.cleanup.policy):

(1)delete表示删除日志文件;

(2)compact表示压缩日志文件

当log.cleanup.policy选择delete时,还有一个参数可以选择。log.retention.bytes:表示所有日志文件的大小。当总的日志文件大小超过这个阈值后,就会删除最早的日志文件。默认是-1,表示无限大。

压缩日志文件虽然不会直接删除日志文件,但是会造成消息丢失。压缩的过程中会将key相同的日志进行压缩,只保留最后一条

3.Kafka的文件高效读写机制

3.1Kafka的文件结构

Kafka的数据文件结构设计可以加速日志文件的读取。比如同一个Topic下的多个Partition单独记录日志文件,并行进行读取,这样可以加快Topic下的数据读取速度。然后index的稀疏索引结构,可以加快log日志检索的速度。

3.2顺序写磁盘

对每个Log文件,Kafka会提前规划固定的大小,这样在申请文件时,可以提前占据一块连续的磁盘空间。然后,Kafka的log文件只能以追加的方式往文件的末端添加(这种写入方式称为顺序写),这样,新的数据写入时,就可以直接往直前申请的磁盘空间中写入,而不用再去磁盘其他地方寻找空闲的空间(普通的读写文件需要先寻找空闲的磁盘空间,再写入。这种写入方式称为随机写)。由于磁盘的空闲空间有可能并不是连续的,也就是说有很多文件碎片,所以磁盘写的效率会很低。

kafka的官网有测试数据,表明了同样的磁盘,顺序写速度能达到600M/s,基本与写内存的速度相当。而随机写的速度就只有100K/s,差距比加大。

(Kakfa顺序读写的实现方式不太需要SSD这样高性能的磁盘。同等容量SSD硬盘的成本比机械硬盘要高出非常多,没有必要。将SSD的成本投入到MySQL这类的服务更合适。)

3.3零拷贝

零拷贝是Linux操作系统提供的一种IO优化机制,而Kafka大量的运用了零拷贝机制来加速文件读写

3.3.1传统IO

内核态的内容复制是在内核层面进行的,而零拷贝的技术,重点是要配合内核态的复制机制,减少用户态与内核态之间的内容拷贝

具体有两种实现方式:

3.3.2mmap文件映射机制

这种方式是在用户态不再缓存整个IO的内容,改为只持有文件的一些映射信息通过这些映射,"遥控"内核态的文件读写。这样就减少了内核态与用户态之间的拷贝数据大小,提升了IO效率。(其实用户态和内核态使用的都是内存,不过操作系统对内存进行了具体的划分)

这种mmap文件映射方式,适合于操作不是很大的文件,通常映射的文件不建议超过2G。所以kafka将.log日志文件设计成1G大小,超过1G就会另外再新写一个日志文件。这就是为了便于对文件进行映射,从而加快对.log文件等本地文件的写入效率。(Kafka利用mmap+顺序写快速的将producer发送到broker的数据持久化到磁盘)

3.3.3sendfile文件传输机制

这种机制可以理解为用户态,也就是应用程序不再关注数据的内容,只是向内核态发一个sendfile指令,让他去复制文件就行了。这样数据就完全不用复制到用户态,从而实现了零拷贝。

相比mmap,连索引都不读了,直接通知操作系统去拷贝就是了。

例如在Kafka中,当Consumer要从Broker上poll消息时,Broker需要读取自己本地的数据文件,然后通过网卡发送给Consumer。这个过程当中,Broker只负责传递消息,而不对消息进行任何的加工。所以Broker只需要将数据从磁盘读取出来,复制到网卡的Socket缓冲区(仅仅是传递了FD文件描述符到Socket缓冲区,相比于mmap,减少了一次内存拷贝),然后通过网络发送出去。这个过程当中,用户态就只需要往内核态发一个sendfile指令,而不需要有任何的数据拷贝过程。Kafka大量的使用了sendfile机制,用来加速对本地数据文件的读取过程。

补充:JDK中8中java.nio.channels.FileChannel类提供了transferTo和transferFrom方法,底层就是使用了操作系统的sendfile机制。

4.合理配置刷盘频率

缓存数据断电就会丢失,所以缓存中的数据如果没有及时写入到硬盘(刷盘),那么当服务突然崩溃,就会有丢消息的可能。

同步刷盘:每写一条数据,就刷一次盘;

Kafka为了最大化性能,默认是将刷盘操作交由了操作系统进行统一管理

下面是Kafka刷盘的一些参数:

  • flush.ms : 多长时间进行一次强制刷盘。
  • log.flush.interval.messages:表示当同一个Partiton的消息条数积累到这个数量时,就会申请一次刷盘操作。默认是Long.MAX。
  • log.flush.interval.ms:当一个消息在内存中保留的时间,达到这个时间值时,就会申请一次刷盘操作。他的默认值是空。如果这个参数配置为空,则生效的是下一个参数。
  • log.flush.scheduler.interval.ms:检查是否有日志文件需要进行刷盘的频率。默认也是Long.MAX。

补充:

(1)Kafka并没有实现写一个消息就进行一次刷盘的“同步刷盘”操作。但是在RocketMQ中却支持了这种同步刷盘机制

(2)刷盘操作在Linux系统中对应了一个fsync的系统调用。

这里真正容易产生困惑的,是这里所提到的in-core state(内核态)这并不是我们平常开发过程中接触到的缓存,而是操作系统内核态的缓存-pageCache。这是应用程序接触不到的一部分缓存。比如我们用应用程序打开一个文件,实际上文件里的内容,是从内核态的PageCache中读取出来的。

因为与磁盘这样的硬件交互,相比于内存,效率是很低的。操作系统为了提升性能,会将磁盘中的文件加载到PageCache缓存中,再向应用程序提供数据。修改文件时也是一样的。用记事本修改一个文件的内容,不管你保存多少次,内容都是写到PageCache里的。然后操作系统会通过他自己的缓存管理机制,在未来的某个时刻将所有的PageCache统一写入磁盘。这个操作就是刷盘。比如在操作系统正常关机的过程中,就会触发一次完整的刷盘机制。

所以,对于刷盘,应用程序其实是没有办法插手的。他并不能够决定自己产生的数据在什么时候刷入到硬盘当中。应用程序唯一能做的,就是尽量频繁的通知操作系统进行刷盘操作。但是,这必然会降低应用的执行性能,而且,也不是能百分之百保证数据安全的。应用程序在这个问题上,只能取舍,不能解决。

5.客户端消费进度管理

kafka为了实现分组消费的消息转发机制,需要在Broker端保持每个消费者组的消费进度。而这些消费进度,就被Kafka管理在自己的一个内置Topic中。这个Topic就是__consumer__offsets。这是Kafka内置的一个系统Topic,在日志文件可以看到这个Topic的相关目录。Kafka默认会将这个Topic划分为50个分区。

同时,Kafka也会将这些消费进度的状态信息记录到Zookeeper中。

 这个系统Topic中记录了所有ConsumerGroup的消费进度。

这个Topic是Kafka内置的一个系统Topic,可以启动一个消费者订阅这个Topic中的消息。

从这里可以看到,Kafka也是像普通数据一样,以Key-Value的方式来维护消费进度。key是groupid+topic+partition,value则是表示当前的offset。这些Offset数据,可以被消费者修改(从指定位置消费)。

6.常见问题

6.1消费者防止消息重新消费

产生原因:在一些大型项目中,消费者的业务处理流程会很长,这时就会带来一些问题。比如,一个消费者在正常处理这一批消息,但是时间需要很长。Broker就有可能认为消息消费失败了,从而让同组的其他消费者开始重试这一批消息。这就给消费者端带来不必要的幂等性问题。

解决:

(1)消费者端的幂等性问题,当然可以交给消费者自己进行处理,比如对于订单消息,消费者根据订单ID去确认一下这个订单消息有没有处理过。这种方式当然是可以的,大部分的业务场景下也都是这样处理的。但是这样会给消费者端带来更大的业务复杂性。

(2)在大型项目中有一种比较好的处理方式就是将Offset放到Redis中自行进行管理。通过Redis中的offset来判断消息之前是否处理过。

伪代码:

  1. 拉取消息
  2. 从Redis获取partition的偏移量
  3. 如果redis获取的偏移量>=kafka实际的偏移量,表示已经消费过了,则丢弃
  4. 业务端调用doMessage()方法,处理业务即可,不用再处理幂等性问题
  5. 处理完成后立即保存redis偏移量
  6. 异步提交

将这段代码封装成一个抽象类,具体的业务消费者端只要继承这个抽象类,然后就可以专注于实现doMessage方法,处理业务逻辑即可,不用再过多关心幂等性的问题。

6.2消息零丢失方案

6.2.1产生原因

6.2.2解决方案

1.生产者发送消息到Broker不丢失

Kafka的消息生产者Producer,支持定制一个参数,ProducerConfig.ACKS_CONFIG。

  • acks配置为0 : 生产者只负责往Broker端发消息,而不关注Broker的响应。也就是说不关心Broker端有没有收到消息。性能高,但是数据会有丢消息的可能。
  • acks配置为1:当Broker端的Leader Partition接收到消息后,只完成本地日志文件的写入,然后就给生产者答复。其他Partiton异步拉取Leader Partiton的消息文件。这种方式如果其他Partiton拉取消息失败,也有可能丢消息。
  • acks配置为-1或者all:Broker端会完整所有Partition的本地日志写入后,才会给生产者答复。数据安全性最高,但是性能显然是最低的。

对于KafkaProducer,只要将acks设置成1或-1,那么Producer发送消息后都可以拿到Broker的反馈RecordMetadata,里面包含了消息在Broker端的partition,offset等信息。通过这这些信息可以判断消息是否发送成功。如果没有发送成功,Producer就可以根据情况选择重新进行发送。

2.Broker端保存消息不丢失

(1)配置多备份因子,防止单点消息丢失。(同步信息,涉及故障恢复,存在消息不安全的可能)

(2)合理优化刷盘频率,防止服务异常崩溃造成消息未刷盘。(刷盘)

3.消费者端防止异步处理丢失消息

消费者端由于有消息重试机制,正常情况下是不会丢消息的。每次消费者处理一批消息,需要在处理完后给Broker应答,提交当前消息的Offset。Broker接到应答后,会推进本地日志的Offset记录。如果Broker没有接到应答,那么Broker会重新向同一个消费者组的消费者实例推送消息,最终保证消息不丢失。

这时,消费者端采用手动提交Offset的方式,相比 自动提交 会更容易控制提交Offset的时机

消费者端唯一需要注意的是,不要异步处理业务逻辑。因为如果业务逻辑异步进行,而消费者已经同步提交了Offset,那么如果业务逻辑执行过程中出现了异常,失败了,那么Broker端已经接收到了消费者的应答,后续就不会再重新推送消息,这样就造成了业务层面的消息丢失。

6.3消息积压问题

  1. 如果务运行正常,只是因为消费者处理消息过慢,造成消息加压。那么可以增加Topic的Partition分区数,将消息拆分到更到的Partition。然后增加消费者个数,最多让消费者个数=Partition分区数,让一个Consumer负责一个分区,将消费进度提升到最大。
  2. 在发送消息时,还是要尽量保证消息在各个Partition中的分布比较均匀。比如,在原有Topic下,可以调整Producer的分区策略,让Producer将后续的消息更多的发送到新增的Partition里,这样可以让各个Partition上的消息能够趋于平衡。如果你觉得这样太麻烦,那就新增一个Topic,配置更多的Partition以及对应的消费者实例。然后启动一批Consumer,将消息从旧的Topic搬运到新的Topic这些Consumer不处理业务逻辑,只是做消息搬运,所以他们的性能是很高的。这样就能让新的Topic下的各个Partition数量趋于平衡。
  3. 如果是消费者的业务问题导致消息阻塞,从而积压大量消息,并影响了系统正常运行。比如消费者序列化失败,或者业务处理全部异常。这时可以采用一种降级的方案先启动一个Consumer将Topic下的消息先转发到其他队列中,然后再慢慢分析新队列里的消息处理问题。类似于死信队列的处理方式

6.4如何保证消息顺序

  1. 因为kafka中各个Partition的消息是并发处理的,所以要保证消息顺序,对于Producer,要保证将一组有序的消息发到同一个Partition里。因为Partition的数据是顺序写的,所以自然就能保证消息是按顺序保存的。
  2. 接下来对于消费者,需要能够按照1,2,3的顺序处理消息

6.4.1如何保证Producer发到Partition上的消息是有序的

(1)Topic只配一个Partition,没有其他Partition可选了,自然所有消息都到同一个Partition上了。

(2)Topic依然配置多个Partition,但是通过定制Producer的Partition分区器,将消息分配到同一个Partition上。例如在电商场景,我可能只是需要保证同一个订单相关的多条消息有序,但是并不要求所有消息有序。这样就可以通过自定义分区路由器,将订单相同的多条消息发送到同一个Partition。消息幂等性中所用的序列号保证了消息是有序的,也保证了消息不丢失

6.4.2Partition中的消息有序后,如何保证Consumer的消费顺序是有序的

Kafka的ConsumerConfig中明确提到Consumer其实是每次并行的拉取多个Batch批次的消息进行处理的。也就是说Consumer拉取过来的多批消息并不是串行消费的。所以在Kafka提供的客户端Consumer中,是没有办法直接保证消费的消息顺序。其实这也比较好理解,因为Kafka设计的重点是高吞吐量,所以他的设计是让Consumer尽最大的能力去消费消息。而只要对消费的顺序做处理,就必然会影响Consumer拉取消息的性能。

所以这时候,我们能做的就是在Consumer的处理逻辑中,将消息进行排序。比如将消息按照业务独立性收集到一个集合中,然后在集合中对消息进行排序。

那么针对消费者顺序消费的问题,有没有其他的处理思路呢?在RocketMQ中提供了一个比较好的方式。RocketMQ中提供了顺序消息的实现。他的实现原理是先锁定一个队列(在RocketMQ中称为MessageQueue,类似于Kafka中的Partition,都是实际存储消息的队列结果),消费完这一个队列后,才开始锁定下一个队列,并消费队列中的消息。再结合MessageQueue中的消息有序性,就能保证整体消息的消费顺序是有序的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2076623.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

应用层与传输层

1.应用层 很多时候这一层的协议是程序员自定义的应用层协议(相当于一种约定,约定数据如何进行传输)。 eg: 实现登录的场景: 此时前端就需要与后端约定请求(假设约定使用ajax请求)中的一些参…

接口自动化测试面试题目详解

1、get和post区别是什么? 答:POST和GET都是向服务器提交数据,并且都会从服务器获取数据。 区别: (1)传送方式:get通过地址栏传输,post通过报文传输 (2)传…

c语言 自定义类型--枚举 、联合 #枚举类型的定义 #枚举的优点 #枚举的使用 #联合类型的定义 #联合的特点 #联合大小的计算

文章目录 前言 一、枚举 (一)、枚举类型的定义 (二)、枚举的优点 (三)、枚举的使用 二、联合 (一)、联合类型的定义 (二)、联合的特点 (三)、联合大小的计算 总结 前言 路漫漫其修远兮,吾将上下而求索。 枚举、联合跟结构体很像,想要细致地了…

基于SpringBoot+Vue+uniapp的“村游网”系统的微信小程序开发的详细设计和实现(源码+lw+部署文档+讲解等)

文章目录 前言详细视频演示具体实现截图技术栈后端框架SpringBoot前端框架Vue持久层框架MyBaitsPlus 系统测试系统测试目的系统功能测试系统测试结论 为什么选择我代码参考数据库参考源码获取源码获取 前言 🌞博主介绍 :✌全网粉丝15W,CSDN特邀作者、21…

基于状态机实现WIFI模组物联网

1.0 状态机框架原理 如果成功的话就连接热点,如果失败就返回AT通信检查,如果AT通信检查还是失败就放回硬件复位这个状态,如果热点链接成功,就连接MQTT指令,如果失败就返回AT通信检查,如果成功就连接云平台通…

跟着B站前端面试总结回顾前端基础知识(一)

组件划分标准 组件划分_哔哩哔哩_bilibili 在前端Vue开发中,组件的划分是构建高效、可维护应用的关键步骤。Vue组件的划分标准通常基于多个方面的考虑,包括但不限于功能独立性、复用性、可维护性和可扩展性。以下是一些Vue组件划分的标准: …

破解历史合同“旧题” 答好集体经济“新篇”

——汕头市龙湖区:全面推进乡村振兴战略 实现农村集体经济新飞跃 农村集体资产资源是乡村赖以生存的家底,也是村集体经济发展壮大、更好推动乡村振兴战略加力提速的承载。自2023年10月开始,在汕头市龙湖区的广袤乡村上,一场关于村…

图解搜索算法(BFS、DFS、Dijstra算法、KSP算法、A*算法)

文章目录 深度优先搜索算法广度优先搜索算法Dijkstra算法KSP算法A*算法 由于在工作中用到了BFS算法、DFS算法、Dijkstra算法、KSP算法,因此将上述算法的工作原理记录一下,同时用图解的方式解释相应的算法。A*算法由于本文在工作中,还没用过&a…

深度学习语义分割篇——LR-ASPP原理详解+源码实战

🍊作者简介:秃头小苏,致力于用最通俗的语言描述问题 🍊专栏推荐:深度学习网络原理与实战 🍊近期目标:写好专栏的每一篇文章 🍊支持小苏:点赞👍🏼、…

简单使用富有创造力的DALL·E 3 图像生成器——OpenAI Images Generations API

OpenAI Images Generations API 申请及使用 DALL-E 3 是 OpenAI 开发的两个版本的图像生成模型,它们能够根据文本描述生成高质量的图像。 本文档主要介绍 OpenAI Images Generations API 操作的使用流程,利用它我们可以轻松使用官方 OpenAI DALL-E 的图…

类和构造函数之间的继承

类之间构造函数的继承是面向对象编程中的一个重要概念,它允许一个类(子类)继承另一个类(父类)的属性和方法。通过这种方式,子类可以复用父类的代码,从而避免重复,提高代码的可维护性…

Swagger的增强knife4j

效果图 导入依赖 <!--不是导入swagger原因就是&#xff1a;knife4j对swagger增强--><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-spring-boot-starter</artifactId><version>3.0.2</version><…

考研数学| 张宇线代9讲换李永乐线代讲义,强化来得及吗?

张宇线代9讲&#xff1a; 张宇老师的线代九讲和张宇老师的线代课程搭配使用&#xff0c;相对于更适合线代底子足够强的同学。整体书的题目难度较大&#xff0c;所以开始刷了之后发现不适合自己可以选择换一本习题册&#xff0c;当然如果担心时间不够还是想尽力刷下去的话可以去…

8款每天都能用到的网站和APP,值得收藏

5个网站3个APP&#xff0c;都是自己每天常用的一些工具&#xff0c;强大实用&#xff0c;能给自己省不少事&#xff0c;分享给大家~ 1、奶牛快传 https://cowtransfer.com/ 一个让你轻松上传和分享大文件的网站。跟那些需要下载app、操作复杂的服务不同&#xff0c;奶牛快传…

随机森林与线性回归

集成学习方法之随机森林 集成学习&#xff08;Ensemble Learning&#xff09;是一种通过组合多个分类器来提高预测性能的方法。主要分为三种类型:Bagging、Boosting和Stacking。以下是集成学习的基本步骤和概念&#xff1a; 1数据采样&#xff1a;从训练集中有放回地随机抽取…

前端手写源码系列(一)—— 手写防抖和节流

目录 1.实现防抖函数&#xff08;debounce&#xff09;2.实现节流函数&#xff08;throttle&#xff09;时间戳的方式&#xff1a;定时器方式&#xff1a; 3.总结 1.实现防抖函数&#xff08;debounce&#xff09; 防抖函数原理&#xff1a;把触发非常频繁的事件合并成一次去执…

修复照片,2024年好用的图片修复工具推荐

在日常中&#xff0c;我们总爱用镜头捕捉生活的每一个瞬间&#xff0c;但随着时间的推移&#xff0c;那些珍贵的记忆可能会因像素不高、保存不当等原因变得模糊不堪。这时需要将模糊的照片修复清晰成为我们迫切的需求。下面给大家分享4种2024年好用值得推荐的修复工具&#xff…

【自动化】一共获取6600多公司信息【逆向】一页15还加密。

一、【逆向】一页15还加密。 二、【自动化】一共获取6600多公司信息 三、对于两种方式我喜欢第二种自动化 from DrissionPage import ChromiumPage, ChromiumOptions import time # chrome:version co = ChromiumOptions().set_paths(browser_path=r"C:\Users\lenovo\A…

qt-19 QMainWindow窗口组件-菜单栏-工具栏

QMainWindow窗口组件-菜单栏-工具栏 showwidgetshowwidget.hshowwidget.cpp processorprocessor.hprocessor.cpp main.cpp运行图 showwidget showwidget.h #ifndef SHOWWIDGET_H #define SHOWWIDGET_H#include <QWidget> #include <QLabel> #include <QTextEd…

Sparse Kernel Canonical Correlation Analysis

论文链接&#xff1a;https://arxiv.org/pdf/1701.04207 看这篇论文终于看懂核函数了。。谢谢作者