文章目录
- 一、Kafka在哪些场景下有丢消息的可能?
- 二、面试流经典答法
- 三、为什么金融场景没人会用Kafka?
- 总结
kafka如何保证消息不丢失? 这是面试最常问到的问题。但是其实这是一个最体现综合实力的开放性题目。把这问题真正弄明白,面试时绝对值得涨个五毛钱的薪水。但是很可惜,很多人把这种问题当成了八股文来背。我见过最简单的回答是,生产者的ack应答机制+消费者手动提交Offset。一分钟答完收工,然后看着我,大眼瞪小眼。
但是真相往往就被八股文们给覆盖了。先说结论,Kafka为了保证他的高性能,高吞吐,牺牲了他的数据安全性。所以,至少目前看来,Kafka是不能保证消息安全的。所以,Kafka大都是用在日志、大数据采集这样的允许消息少量丢失的场景。
具体为什么,跟我一起来分析分析。
一、Kafka在哪些场景下有丢消息的可能?
在MQ场景下,不管是哪一种MQ产品,都有两个通用的丢消息的元凶,就是网络+缓存。所以,关于MQ消息丢失的问题,都可以从以下这几个方面来讨论。
1、生产者发送消息到MQ,有可能丢消息–跨网络
2、生产者把消息发到MQ后,MQ服务突然崩溃,有可能丢消息–Pagecache缓存
3、MQ中的Server端都会提供主从机制,防止Master节点的单点崩溃。但是往从节点发消息,有可能会丢失。这样如果消息没有同步,而Master节点上的消息又因为缓存的原因丢失,就有可能造成集群丢消息 --跨网络
4、消费者去MQ上拉取消息,有可能丢消息–跨网络
回到Kafka的场景,他的消息模型是这样的:
二、面试流经典答法
那如果面试官问到Kafka如何保证消息不丢失,你会如何回答呢? 及格线的答法是这样的:
1、生产者端,配置ack应答参数。
Kafka的消息生产者Producer,支持定制一个参数,ProducerConfig.ACKS_CONFIG。
- acks配置为0 : 生产者只负责往Broker端发消息,而不关注Broker的响应。也就是说不关心Broker端有没有收到消息。性能高,但是数据会有丢消息的可能。
- acks配置为1:当Broker端的Leader Partition接收到消息后,只完成本地日志文件的写入,然后就给生产者答复。其他Partiton异步拉取Leader Partiton的消息文件。这种方式如果其他Partiton拉取消息失败,也有可能丢消息。
- acks配置为-1或者all:Broker端会完整所有Partition的本地日志写入后,才会给生产者答复。数据安全性最高,但是性能显然是最低的。
有这个acks配置后,Kafka的生产者消息安全性基本就分析到位了。另外还有一些加分项,就是如果你能考虑到发送消息的幂等性,防止生产者重复发送消息。那么可以补充下Kakfa的生产者幂等性配置。这个机制需要acks配置为-1或者all才能生效(当然,还有其他条件)。
2、Broker端,配置多Partition分区。
在Broker端,可以给Topic配置更大的备份因子replication-factors。配置了备份因子后,Kafka会给每个Partition分配多个备份Partition。这些Partiton会尽量平均的分配到多个Broker上。并且,在这些Partiton中,会选举产生Leader Partition和Follower Partition。这样,当Leader Partition发生故障时,其他Follower Partition上还有消息的备份。就可以重新选举产生Leader Partition,继续提供服务。
这样整个集群内的消息不会丢失。
3、消费者端,避免异步丢消息。
消费者端由于有消息重试机制,正常情况下是不会丢消息的。每次消费者处理一批消息,需要在处理完后给Broker应答,提交当前消息的Offset。Broker接到应答后,会推进本地日志的Offset记录。如果Broker没有接到应答,那么Broker会重新向同一个消费者组的消费者实例推送消息,最终保证消息不丢失。
消费者端唯一需要注意的是,不要异步处理业务逻辑。因为如果业务逻辑异步进行,而消费者已经同步提交了Offset,那么如果业务逻辑执行过程中出现了异常,失败了,那么Broker端已经接收到了消费者的应答,后续就不会再重新推送消息,这样就造成了业务层面的消息丢失。
当然这里也有很多加分项,就是如何防止消息的重复消费。比如消费者端业务层面增加幂等性判断,或者在技术层间使用消费者的LowLevel API,将Offset移到数据库中自行管理,将数据库的业务操作与Offset变更放到同一个数据库事务中去处理,保证消息和业务的一致性。等等。
三、为什么金融场景没人会用Kafka?
大部分的八股文也就是这样来分析这个问题的。上述的那些方法,虽然都会降低Kafka的性能,但是也还是可以保证消息不丢失的啊。如果这样的话,Kafka就是一个挺完美的MQ产品了。高并发的日志,大数据采集场景,Kafka的性能没得说。对数据安全要求高的金融场景,Kafka降低一点性能也可以hold住。但是事实是这样吗?
所有的面试分享都很容易推导出这个结论。但是事实是,不会有任何一个架构师选择用Kafka来传递安全性要求极高的消息。阿里为了适应自己的金融服务,更是重新推出了RocketMQ。那到底是为什么呢?其实原因就在于,Kafka本身为了保证他的超高性能,就没有保证消息的绝对安全。
1、Kafka的文件设计不适合多分区。
Kafka的日志文件是以Partitoin为单位进行落地的。也就是说,每个Partition对应一组log日志文件。虽然Kafka写log文件的性能堪称一流,但是这也造成文件比较零散。当Kafka中的Topic和Partition比较多时,在日志文件也会随之变得更多,这样,寻找文件的性能消耗就会变大。 所以,当Topic和Partition过多时,Kafka的性能下降会非常明显。而在对数据敏感的业务场景中,天生就需要对数据进行更详细的区分,也就需要更多的Topic。
对比之下,RocketMQ用一组统一的CommitLog收集所有Topic的消息,这样就能够很好的避免日志文件碎片化的问题。其实RocketMQ的数据文件读写方式,很大程度上就是借鉴了Kafka,但是做了一些调整之后,就能更贴合阿里庞大的电商场景。
2、Kafka不支持同步刷盘
缓存断电就会丢失,这是大家都能理解的,所以缓存中的数据如果没有及时写入到硬盘,也就是常说的刷盘,那么当服务突然崩溃,就会有丢消息的可能。所以,最安全的方式是写一条数据,就刷一次盘,成为同步刷盘。
但是,这里真正容易产生困惑的,是这里所说的缓存,并不是我们平常开发过程中接触到的缓存,而是操作系统内核态的缓存-pageCache。这是应用程序接触不到的一部分缓存。比如我们用应用程序打开一个文件,实际上文件里的内容,是从内核态的PageCache中读取出来的。因为与磁盘这样的硬件交互,相比于内存,性能是很低的,操作系统为了提升性能,会将磁盘中的文件加载到PageCache缓存中,再向应用程序提供数据。修改文件时也是一样的。用记事本修改一个文件的内容,不管你保存多少次,内容都是写到PageCache里的。然后操作系统会通过脏页机制,在未来的某个时刻将所有的PageCache统一写入磁盘的操作,这个操作就是刷盘。比如在操作系统正常关系的过程中,也会触发刷盘机制。
说这么多,就是告诉你,其实对于缓存断掉,造成数据丢失,这个问题,应用程序其实是没有办法插手的。他并不能够决定自己产生的数据在什么时候刷入到硬盘当中。应用程序唯一能做的时,就是尽量频繁的通知操作系统进行刷盘操作。但是,这必然会降低应用的执行性能,而且,也不是能百分之百保证数据安全的。应用程序在这个问题上,只能取舍,不能解决。
Kafka其实在Broker端设计了一系列的参数,来控制刷盘操作的频率。如果对这些频率进行深度定制,是可以实现同步刷盘效果的。但是,这样的定制显然会大大降低Kafka的执行效率,这与Kafka的设计初衷是不符合的。
相关的参数包括这几个:
- log.flush.interval.messages:表示当同一个Partiton的消息条数积累到这个数量时,就会申请一次刷盘操作。默认是Long.MAX。
- log.flush.interval.ms:当一个消息在内存中保留的时间,达到这个数量时,就会申请一次刷盘操作。他的默认值是空。如果这个参数配置为空,则生效的是下一个参数。
- log.flush.scheduler.interval.ms:检查是否有日志文件需要进行刷盘的频率。默认也是Long.MAX。
这里可以看到,Kafka为了最大化性能,默认是将刷盘操作交由了操作系统进行统一管理。
而对比下RocketMQ,RocketMQ实现了同步刷盘的机制,也就是每写入一个消息,就会发起一起刷盘的操作。但是,Kafka并没有直接提供同步刷盘的功能。因为这相当于是破坏了操作系统的优化机制,强行将一列火车拖到乡间小路上去跑。
3、Partition故障恢复机制可能会丢消息
这个问题就隐藏得比较深。
首先需要理解Kafka的LEO和HW机制。
Kafka可以给每个Topic配置replication-factor备份因子,在Broker端就会给每个Partition维护一组Partition。这一组Partition会尽量平均的分配到不同的Broker上。然后,这一组Partition中会选举产生一个Leader Partition,负责与客户端进行交互。其他的Follower Partiton则负责从Leader Partition中同步消息,并辅助完成一部分的读请求。同步消息的过程,则是通过LEO和HW机制来完成的。
LEO 记录的是每个Partiton记录的最后一个Offset。
HW 记录的是一组Partiton中最小的LEO。
Leader Partition负责与客户端交互,他最先写入消息。然后其他的Follower才去同步Leader Partition的消息,同时,推高自己的LEO以及整个Partition的HW。
当一个Follower Partiton发生故障时,不会影响消息的整体写入。这时,Kafka就会忽略这个有问题的Partiton,继续在其他Partiton之间写入以及同步数据,推高LEO和HW。
当这个出故障的Partition出现问题时,并不会立即恢复加入到Partition集合中,而是会根据自己记录的HW值,清空掉HW之后的数据,然后开始拉取消息。直到自己记录的LEO值,赶上了整个Partition集合的HW之后,才正式加入Partiton集合,完全恢复正常工作。
这种情况还好,不会对数据有什么影响。但是如果是Leader Partition发生故障了呢?这时,就需要重新选举产生一个新的Leader Partitoin。但是新的Leader Partition的消息记录与旧的Leader Partition会有差距。这时,Kafka的选择方式就是以新产生的Leader Partition的消息为准。所有其他Partiton中高于新Leader Partition的LEO值的那一部分消息全部清空,保持与Leader Partition同步,后续再继续记录消息。
而旧的Leader Partition服务恢复过来后,也会作为Follower Partition,截取掉比自己以前记录的HW值更新的消息,然后重新去同步消息。
这时,问题就出来了。旧Leader Partition中比较新的那几条数据,就在集群内部被彻底抛弃掉了,也就是说这部分数据丢失了。
那对比RocketMQ是不是也有这样的问题呢?RocketMQ的主从集群,主节点和从节点的地位是不变的,不会有重新选举的过程,所以,自然也不会有这种消息丢失的情况。而RocketMQ的Dledger集群,虽然也会有主节点切换,但是他的Deldger集群采用的是二阶段的文件写入,也就是说原本Kafkak中会丢失的这一部分消息,在RocketMQ中并不会被截取掉,而是会被记录为uncommited状态,等待消息继续同步,同步完成后再标记为commited状态。
总结
关于Kafka保证消息不丢失的问题,就简单总结到这里,但这其实并不是结束。相反,随着你对Kafka理解得越深,你会发现这个问题会有更多的发散空间。像MQ如何保证消息不丢失?如何不重复消费?如何处理消息积压?等等,这都是一系列非常开放的面试题。对于你是否真正理解了每个MQ产品,是非常好的检验标准。所以,这么好的题目,如果只是简简单单背个八股文,那太可惜了。