Kafka常见生产问题详解

news2024/10/5 19:10:04

目录

生产环境常见问题分析

消息零丢失方案

1、生产者发消息到Broker不丢失

2、Broker端保存消息不丢失

3、消费者端防止异步处理丢失消息

消息积压如何处理

如何保证消息顺序

​问题一、如何保证Producer发到Partition上的消息是有序的

问题二:Partition中的消息有序后,如何保证Consumer的消费顺序是有序的


生产环境常见问题分析

消息零丢失方案

1、生产者发消息到Broker不丢失

​ Kafka的消息生产者Producer,支持定制一个参数,ProducerConfig.ACKS_CONFIG。

acks配置为0 : 生产者只负责往Broker端发消息,而不关注Broker的响应。也就是说不关心Broker端有没有收到消息。性能高,但是数据会有丢消息的可能。

acks配置为1:当Broker端的Leader Partition接收到消息后,只完成本地日志文件的写入,然后就给生产者答复。其他Partiton异步拉取Leader Partiton的消息文件。这种方式如果其他Partiton拉取消息失败,也有可能丢消息。

acks配置为-1或者all:Broker端会完整所有Partition的本地日志写入后,才会给生产者答复。数据安全性最高,但是性能显然是最低的。

​       对于KafkaProducer,只要将acks设置成1或-1,那么Producer发送消息后都可以拿到Broker的反馈RecordMetadata,里面包含了消息在Broker端的partition,offset等信息。通过这这些信息可以判断消息是否发送成功。如果没有发送成功,Producer就可以根据情况选择重新进行发送。

2、Broker端保存消息不丢失

        首先,合理优化刷盘频率,防止服务异常崩溃造成消息未刷盘。Kafka的消息都是先写入操作系统的PageCache缓存,然后再刷盘写入到硬盘。PageCache缓存中的消息是断电即丢失的。如果消息只在PageCache中,而没有写入硬盘,此时如果服务异常崩溃,这些未写入硬盘的消息就会丢失。Kafka并不支持写一条消息就刷一次盘的同步刷盘机制,只能通过调整刷盘的执行频率,提升消息安全。主要涉及几个参数:

flush.ms : 多长时间进行一次强制刷盘。

log.flush.interval.messages:表示当同一个Partiton的消息条数积累到这个数量时,就会申请一次刷盘操作。默认是Long.MAX。

 log.flush.interval.ms:当一个消息在内存中保留的时间,达到这个数量时,就会申请一次刷盘操作。他的默认值是空。

​       然后,配置多备份因子,防止单点消息丢失。在Kafka中,可以给Topic配置更大的备份因子replication-factors。配置了备份因子后,Kafka会给每个Partition分配多个备份Partition。这些Partiton会尽量平均的分配到多个Broker上。并且,在这些Partiton中,会选举产生Leader Partition和Follower Partition。这样,当Leader Partition发生故障时,其他Follower Partition上还有消息的备份。就可以重新选举产生Leader Partition,继续提供服务。

3、消费者端防止异步处理丢失消息

       消费者端由于有消息重试机制,正常情况下是不会丢消息的。每次消费者处理一批消息,需要在处理完后给Broker应答,提交当前消息的Offset。Broker接到应答后,会推进本地日志的Offset记录。如果Broker没有接到应答,那么Broker会重新向同一个消费者组的消费者实例推送消息,最终保证消息不丢失。这时,消费者端采用手动提交Offset的方式,相比自动提交会更容易控制提交Offset的时机。

       消费者端唯一需要注意的是,不要异步处理业务逻辑。因为如果业务逻辑异步进行,而消费者已经同步提交了Offset,那么如果业务逻辑执行过程中出现了异常,失败了,那么Broker端已经接收到了消费者的应答,后续就不会再重新推送消息,这样就造成了业务层面的消息丢失。


消息积压如何处理

       

       通常情况下,Kafka本身是能够存储海量消息的,他的消息积压能力是很强的。但是,如果发现消息积压问题已经影响了业务处理进度,这时就需要进行一定的优化。

1、如果业务运行正常,只是因为消费者处理消息过慢,造成消息加压。那么可以增加Topic的Partition分区数,将消息拆分到更到的Partition。然后增加消费者个数,最多让消费者个数=Partition分区数,让一个Consumer负责一个分区,将消费进度提升到最大。

​        另外,在发送消息时,还是要尽量保证消息在各个Partition中的分布比较均匀。比如,在原有Topic下,可以调整Producer的分区策略,让Producer将后续的消息更多的发送到新增的Partition里,这样可以让各个Partition上的消息能够趋于平衡。如果你觉得这样太麻烦,那就新增一个Topic,配置更多的Partition以及对应的消费者实例。然后启动一批Consumer,将消息从旧的Topic搬运到新的Topic。这些Consumer不处理业务逻辑,只是做消息搬运,所以他们的性能是很高的。这样就能让新的Topic下的各个Partition数量趋于平衡。

2、如果是消费者的业务问题导致消息阻塞了,从而积压大量消息,并影响了系统正常运行。比如消费者序列化失败,或者业务处理全部异常。这时可以采用一种降级的方案,先启动一个Consumer将Topic下的消息先转发到其他队列中,然后再慢慢分析新队列里的消息处理问题。类似于死信队列的处理方式。


如何保证消息顺序

  

问题要分两个方面来考虑:

 1、因为kafka中各个Partition的消息是并发处理的,所以要保证消息顺序,对于Producer,要保证将一组有序的消息发到同一个Partition里。因为Partition的数据是顺序写的,所以自然就能保证消息是按顺序保存的。

​ 2、对于消费者,需要能够按照1,2,3的顺序处理消息。

问题一、如何保证Producer发到Partition上的消息是有序的

       首先,要保证Producer将消息都发送到一个Partition上,其实有两种方法。一种简答粗暴的想法就是给Topic只配一个Partition,没有其他Partition可选了,自然所有消息都到同一个Partition上了。表示从创建Topic时就放弃了多Partition带来的吞吐量便利,是不现实的。另一种是Topic依然配置多个Partition,但是通过定制Producer的Partition分区器,将消息分配到同一个Partition上。这样对于某一些要求局部有序的场景是至少是可行的。例如在电商场景,我可能只是需要保证同一个订单相关的多条消息有序,但是并不要求所有消息有序。这样就可以通过自定义分区路由器,将订单相同的多条消息发送到同一个Partition。

       但是Producer都将消息往同一个Partition,也不能保证消息顺序。因为消息可能发送失败。比如Producer依次发送1,2,3,三条消息。如果消息1因为网络原因发送失败了,2 和3 发送成功了,这样消息顺序就乱了。如果把producer的acks参数设置成1或-1,这样每次发送消息后,可以根据Broker的反馈判断消息是否成功。思路是可行的,但是重试的次数,发送消息的数量等都是需要考虑的问题。

回顾一下之前对于生产者消息幂等性的设计:

  

         Kafka的这个sequenceNumber是单调递增的。如果只是为了消息幂等性考虑,那么只要保证sequenceNumber唯一就行了,为什么要设计成单调递增呢?其实Kafka这样设计的原因就是可以通过sequenceNumber来判断消息的顺序。也就是说,在Producer发送消息之前就可以通过sequenceNumber定制好消息的顺序,然后Broker端就可以按照顺序来保存消息。与此同时, SequenceNumber单调递增的特性不光保证了消息是有顺序的,同时还保证了每一条消息不会丢失。一旦Kafka发现Producer传过来的SequenceNumber出现了跨越,那么就意味着中间有可能消息出现了丢失,就会往Producer抛出一个OutOfOrderSequenceException异常。

在生产者的配置类ProducerConfig中很快能找到很多和消息顺序ordering的描述:

public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
                                                                            + " Note that if this configuration is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
                                                                            + " message reordering after a failed send due to retries (i.e., if retries are enabled); "
                                                                            + " if retries are disabled or if <code>enable.idempotence</code> is set to true, ordering will be preserved."
                                                                            + " Additionally, enabling idempotence requires the value of this configuration to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
                                                                            + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. ";

问题二:Partition中的消息有序后,如何保证Consumer的消费顺序是有序的
    public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
    private static final String FETCH_MAX_BYTES_DOC = "The maximum amount of data the server should return for a fetch request. " +
            "Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than " +
            "this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. " +
            "The maximum record batch size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
            "<code>max.message.bytes</code> (topic config). Note that the consumer performs multiple fetches in parallel.";
    public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;

        这里明确提到Consumer其实是每次并行的拉取多个Batch批次的消息进行处理的。也就是说Consumer拉取过来的多批消息并不是串行消费的。所以在Kafka提供的客户端Consumer中,是没有办法直接保证消费的消息顺序。其实这也比较好理解,因为Kafka设计的重点是高吞吐量,所以他的设计是让Consumer尽最大的能力去消费消息。而只要对消费的顺序做处理,就必然会影响Consumer拉取消息的性能。

​        所以这时候,我们能做的就是在Consumer的处理逻辑中,将消息进行排序。比如将消息按照业务独立性收集到一个集合中,然后在集合中对消息进行排序。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1426857.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IDEA 配置和缓存目录 设置

IDEA系列产品&#xff0c;一般会在用户目录创建 配置 和 缓存 目录&#xff1a; %APPDATA%\JetBrains%LOCALAPPDATA%\JetBrains 一般会展示为&#xff1a; C:\Users\<username>\AppData\Roaming\JetBrainsC:\Users\<username>\AppData\Local\JetBrains 一般占用…

为啥监管层要打击量化交易?

&#xff08;1&#xff09;李鬼量化交易&#xff1a;程序化交易 我先讲讲李鬼。它本来不属于量化交易&#xff0c;但是人们说它是量化交易&#xff0c;好吧&#xff0c;三人成虎众口铄金&#xff0c;既然大家说鹿就是马&#xff0c;那鹿就是马&#xff0c;至于鹿是不是马&#…

中国文化之光:微博数据的探索与可视化分析

大家好&#xff0c;我是八块腹肌的小胖 下面我们针对主题“中国文化”相关的微博数据进行爬取 使用LDA、情感分析、情感演化、词云等可视化操作进行相关的展示 1、导包 第一步我们开始导包工作 下面这段代码&#xff0c;首先&#xff0c;pandas被请来了&#xff0c;因为它是…

睿尔曼超轻量仿人机械臂——外置按钮一键启停程序配置

在睿尔曼超轻量仿人机械臂—外置按钮盒使用说明一文中&#xff0c;介绍了外置按钮盒的安装及使用。它能够使机械臂的使用变得更加编辑&#xff0c;仅需按钮即可完成运动程序的启停等控制&#xff0c;而无需进入示教界面操作。 在示教界面中&#xff0c;我们可以完成运动程序的…

从用户行为到数据:数据采集全景解析【主流电商平台API接口数据采集方式】

电商数据采集是数据体系建设的最上游&#xff0c;是非常重要的一个环节&#xff0c;除了专业的数据人员&#xff0c;人们普遍对数据采集的认知度不高。如果你提起埋点&#xff0c;应该很多人都熟悉它。它应该也是绝大部分人对数据采集的认知了。数据上报其实是一个系统性工程&a…

vmware网络配置,VMware的三种网络模式详解与配置

vmware为我们提供了三种网络工作模式 vmware为我们提供了三种网络工作模式, 它们分别是: Bridged&#xff08;桥接模式&#xff09;、NAT&#xff08;网络地址转换模式&#xff09;、Host-Only&#xff08;仅主机模式&#xff09;。 VMware虚拟机的三种网络类型的适用场景如下…

设备的层次结构 - 驱动程序的垂直层次结构

Windows操作系统是分层调用。其实在驱动程序中也可以是分层调用的。 驱动程序的垂直层次结构 不仅是WDM驱动&#xff0c;NT式驱动也可以分层&#xff0c;这主要是通过一个设备附加在另一个设备之上。因此&#xff0c;可以将WDM驱动模型看成是NT驱动模型的延伸。 设备的创建顺序…

一文掌握SpringBoot注解之@Configuration知识文集(2)

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

Flink 1.18.1的基本使用

系统示例应用 /usr/local/flink-1.18.1/bin/flink run /usr/local/flies/streaming/SocketWindowWordCount.jar --port 9010nc -l 9010 asd asd sdfsf sdf sdfsdagd sdf单次统计示例工程 cd C:\Dev\IdeaProjectsmvn archetype:generate -DarchetypeGroupIdorg.apache.flink -…

【Qt】—— 项⽬⽂件解析

目录 &#xff08;一&#xff09;.pro⽂件解析 &#xff08;二&#xff09;widget.h⽂件解析 &#xff08;三&#xff09;main.cpp⽂件解析 &#xff08;四&#xff09;widget.cpp⽂件解析 &#xff08;五&#xff09;widget.ui⽂件解析 &#xff08;一&#xff09;.pro⽂…

HiveSQL题——array_contains函数

目录 一、原创文章被引用次数 0 问题描述 1 数据准备 2 数据分析 ​编辑 3 小结 二、学生退费人数 0 问题描述 1 数据准备 2 数据分析 3 小结 一、原创文章被引用次数 0 问题描述 求原创文章被引用的次数&#xff0c;注意本题不能用关联的形式求解。 1 数据准备 i…

物联网可视化平台:赋能企业数字化转型

在数字化转型的大潮中&#xff0c;企业面临着如何更好地理解和利用海量数据的挑战。物联网技术的快速发展&#xff0c;为企业提供了一个全新的视角和解决方案。通过物联网可视化平台&#xff0c;企业能够实时监控、分析和展示物联网数据&#xff0c;从而加速数字化转型的进程。…

深度揭秘:代理IP的工作原理及其在网络安全中的关键角色

代理IP的工作原理及其在网络安全中的关键角色是一个相对复杂但非常重要的主题。以下是对这一内容的深度揭秘&#xff1a; 代理IP的工作原理 1. 请求转发 当一个客户端&#xff08;如浏览器或爬虫程序&#xff09;使用代理IP时&#xff0c;它不是直接与目标网站通信&#xff0c…

【无刷电机学习】电流采样电路硬件方案

【仅作自学记录&#xff0c;不出于任何商业目的】 目录 AD8210 INA282 INA240 INA199 AD8210 【AD8210数据手册】 在典型应用中&#xff0c;AD8210放大由负载电流通过分流电阻产生的小差分输入电压。AD8210抑制高共模电压(高达65V)&#xff0c;并提供接地参考缓冲输出&…

从0搭建react+ts+redux+axios+antd项目

文章目录 一、安装及初始化二、TypeScript配置三、Webpack配置四、Prettier统一编码风格五、使用less六、Antd 安装及使用七、添加Router及配置八、安装axios九、添加redux及使用 本文介绍了如何用creat-react-app脚手架搭建一个react项目的基本结构&#xff0c;同时配置webpac…

书客、米家、柏曼大路灯哪款好?多维度实测对比推荐!

每到寒暑假&#xff0c;各个论坛上出现“大路灯怎么选”的类似话题非常频繁&#xff0c;因为现在的孩子出来上学期间需要读写之外&#xff0c;在寒暑假时也在不断的学习&#xff0c;许多家长关注到孩子学习时的光线问题&#xff0c;担心影响到孩子的视力状况&#xff0c;都纷纷…

Java的Mysql使用

Java的Mysql使用 说明 通过Java的方式连接Mysql中的数据库&#xff0c;并对数据库中的数据进行增加 查询操作 ​ 使用Mysql所提供的第三方库中的类(Mysql的API) 对其进行操作 ​ 将Mysql Jar包添加到lib目录后&#xff0c;就可以使用其中的类对其Mysql数据库进行操作 Mysq…

代码随想录算法训练营第五十九天|503.下一个更大元素II 、42. 接雨水

代码随想录算法训练营第五十九天|503.下一个更大元素II 、42. 接雨水 下一个更大元素II 503.下一个更大元素II 文章讲解&#xff1a;https://programmercarl.com/0503.%E4%B8%8B%E4%B8%80%E4%B8%AA%E6%9B%B4%E5%A4%A7%E5%85%83%E7%B4%A0II.html 题目链接&#xff1a;https://…

2024年美赛 (C题MCM)| 温网积分 |数学建模完整代码+建模过程全解全析

当大家面临着复杂的数学建模问题时&#xff0c;你是否曾经感到茫然无措&#xff1f;作为2022年美国大学生数学建模比赛的O奖得主&#xff0c;我为大家提供了一套优秀的解题思路&#xff0c;让你轻松应对各种难题。 让我们来看看美赛的C题&#xff01; 完整内容可以在文章末尾领…

2024美赛数学建模F题思路分析 - 减少非法野生动物贸易

1 赛题 问题F&#xff1a;减少非法野生动物贸易 非法的野生动物贸易会对我们的环境产生负面影响&#xff0c;并威胁到全球的生物多样性。据估计&#xff0c;它每年涉及高达265亿美元&#xff0c;被认为是全球第四大非法交易。[1]你将开发一个由数据驱动的5年项目&#xff0c;…