第三章 Kafka生产问题总结及性能优化实践

news2025/1/5 11:18:33

第三章 Kafka生产问题总结及性能优化实践

1、线上环境规划

Untitled

JVM参数设置

kafka 是 scala 语言开发,运行在 JVM 上,需要对 JVM 参数合理设置,参看 JVM 调优专题

修改 bin/kafka-start-server.sh 中的 JVM 设置,假设机器是 32G 内存,可以如下设置:

export KAFKA_HEAP_OPTS="-Xmx16G -Xms16G -Xmn10G -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=16M"

这种大内存的情况一般都要用 G1 垃圾收集器,因为年轻代内存比较大,用 G1 可以设置 GC 最大停顿时间,不至于一次 minor gc 就花费太长时间,当然,因为像 kafka,rocketmq,es 这些中间件,写数据到磁盘会用到操作系统的 page cache,所以 JVM 内存不宜分配过大,需要给操作系统的缓存留出几个 G。

2、线上问题及优化

消息丢失情况

消息发送端:

1、acks=0: 表示 producer 不需要等待任何 broker 确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种。

2、acks=1: 至少要等待 leader 已经成功将数据写入本地log,但是不需要等待所有 follower 是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower 没有成功备份数据,而此时 leader 又挂掉,则消息会丢失。

3、acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。当然如果 min.insync.replicas 配置的是1则也可能丢消息,跟acks=1情况类似。

消息消费端:

如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据丢失了,下次也消费不到了。

消息重复消费

消息发送端:

发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息

消息消费端:

如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理

一般消费端都是要做消费幂等处理的。

消息乱序

如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了。

所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息发送的有序。

kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。

消息积压

1、线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。
此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区。

2、由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致 broker 积压大量未消费消息。

此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。

延时队列

延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费,延时队列的使用场景有很多, 比如 :

1、在订单系统中, 一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延时队列来处理这些订单了。

2、订单完成 1 小时后通知用户进行评价。

实现思路:

发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,…topic_2h,这个一般不能支持任意时间段的延时),然后通过定时器进行轮训消费这些 topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的 topic 中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。

消息回溯

如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序 bug 导致的计算错误,当程序 bug 修复后,这时可能需要对之前已消费的消息进行重新消费,可以指定从多久之前的消息回溯消费,这种可以用 consumer 的 offsetsForTimes、seek 等方法指定从某个 offset 偏移的消息开始消费,参见上节课的内容。

分区数越多吞吐量越高吗

可以用 kafka 压测工具自己测试分区数不同,各种情况下的吞吐量:

# 往 test 里发送一百万条消息,每条消息设置 1 KB
# throughput 用来进行限流控制,当设定的值小于 0 时不限流,当设定的值大于 0 时,当发送的吞吐量大于该值时就会被阻塞一段时间
bin/kafka-producer-perf-test.sh --topic test --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=192.168.65.60:9092 acks=1

Untitled

网络上很多资料都说分区数越多吞吐量越高 , 但从压测结果来看,分区数到达某个值时吞吐量反而开始下降,实际上很多事情都会有一个临界值,当超过这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况下分区数跟集群机器数量相当就差不多了。

当然吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关。

注意:如果分区数设置过大,比如设置10000,可能会设置不成功,后台会报错"java.io.IOException : Too many open files"。

异常中最关键的信息是“ Too many open flies”,这是一种常见的 Linux 系统错误,通常意味着文件描述符不足,它一般发生在创建线程、创建 Socket、打开文件这些场景下 。 在 Linux 系统的默认设置下,这个文件描述符的个数不是很多 ,通过 ulimit -n 命令可以查看:一般默认是1024,可以将该值增大,比如:ulimit -n 65535

消息传递保障

  • at most once(消费者最多收到一次消息,0–1次):acks = 0 可以实现。
  • at least once(消费者至少收到一次消息,1–多次):ack = all 可以实现。
  • exactly once(消费者刚好收到一次消息):at least once 加上消费者幂等性可以实现,还可以用 kafka 生产者的幂等性来实现。

kafka 生产者的幂等性:

因为发送端重试导致的消息重复发送问题,kafka 的幂等性可以保证重复发送的消息只接收一次,只需在生产者加上参数 props.put(“enable.idempotence”, true) 即可,默认是 false 不开启。

具体实现原理是:

kafka 每次发送消息会生成 PID 和 Sequence Number,并将这两个属性一起发送给 broker,broker 会将 PID 和 Sequence Number 跟消息绑定一起存起来,下次如果生产者重发相同消息,broker 会检查 PID 和 Sequence Number,如果相同则不会再接收。

PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID 对用户完全是透明的。生产者如果重启则会生成新的PIDSequence Number:对于每个 PID,该 Producer 发送到每个 Partition 的数据都有对应的序列号,这些序列号是从0开始单调递增的。

kafka的事务

Kafka 的事务不同于 Rocketmq,Rocketmq是保障本地事务(比如数据库)与 mq 消息发送的事务一致性,Kafka 的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败),一般在 kafka 的流式计算场景用得多一点,比如,kafka 需要对一个 topic 里的消息做不同的流式计算处理,处理完分别发到不同的 topic 里,这些 topic 分别被不同的下游系统消费(比如hbase,redis,es等),这种我们肯定希望系统发送到多个 topic 的数据保持事务一致性。Kafka 要实现类似 Rocketmq 的分布式事务需要额外开发功能。

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
 //初始化事务
 producer.initTransactions();

 try {
     //开启事务
     producer.beginTransaction();
     for (int i = 0; i < 100; i++){
         //发到不同的主题的不同分区
         producer.send(new ProducerRecord<>("hdfs-topic", Integer.toString(i), Integer.toString(i)));
         producer.send(new ProducerRecord<>("es-topic", Integer.toString(i), Integer.toString(i)));
         producer.send(new ProducerRecord<>("redis-topic", Integer.toString(i), Integer.toString(i)));
     }
     //提交事务
     producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
     // We can't recover from these exceptions, so our only option is to close the producer and exit.
     producer.close();
 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     //回滚事务
     producer.abortTransaction();
 }
 producer.close();

3、 kafka高性能的原因

Untitled

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/357867.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java有几种文件拷贝方式?哪一种最高效?

第12讲 | Java有几种文件拷贝方式&#xff1f;哪一种最高效&#xff1f; 我在专栏上一讲提到&#xff0c;NIO 不止是多路复用&#xff0c;NIO 2 也不只是异步 IO&#xff0c;今天我们来看看 Java IO 体系中&#xff0c;其他不可忽略的部分。 今天我要问你的问题是&#xff0c;…

一个巨型的ESP8266模块,围观围观

作者&#xff1a;晓宇&#xff0c;排版&#xff1a;晓宇微信公众号&#xff1a;芯片之家&#xff08;ID&#xff1a;chiphome-dy&#xff09;01 巨型ESP8266ESP8266几乎无人不知&#xff0c;无人不晓了吧&#xff0c;相当一部分朋友接触物联网都是从ESP8266开始的&#xff0c;…

软考中级-嵌入式系统设计师(二)

1、逻辑电路&#xff1a;组合逻辑单路、时序逻辑电路。根据电路是否有存储功能判断。 2、组合逻辑电路 指该电路在任一时刻的输出&#xff0c;仅取决于该时刻的输入信号&#xff0c;而与输入信号作用前电路的状态无关。一般由门电路组成&#xff0c;不含记忆元器件&#xff0…

XD文件转换为sketch的三种方法

XD文件如何转化为Sketch文件&#xff0c;作为竞品的两个产品&#xff0c;如果要互通到可以彼此转换为彼此的文件格式&#xff0c;还是有点难的。所以&#xff0c;今天我总结了 3 个方法&#xff0c;其中最后一个方法是最好用的&#xff01; XD 和 Sketch 算是竞品&#xff0c;想…

论文笔记:TIMESNET: TEMPORAL 2D-VARIATION MODELINGFOR GENERAL TIME SERIES ANALYSIS

ICLR 2023 1 intro 时间序列一般是连续记录的&#xff0c;每个时刻只会记录一些标量 之前的很多工作着眼于时间维度的变化&#xff0c;以捕捉时间依赖关系 ——>可以反映出、提取出时间序列的很多内在特征&#xff0c;比如连续性、趋势、周期性等但是现实时间序列数据中的…

linux环境搭建私有gitlab仓库以及启动gitlab后出现卡顿处理办法

搭建之前&#xff0c;需要安装相应的依赖包&#xff0c;并且要启动sshd服务(1).安装policycoreutils-python openssh-server openssh-clients [rootVM-0-2-centos ~]# sudo yum install -y curl policycoreutils-python openssh-server openssh-clients [rootVM-0-2-centos ~]…

C++【类与对象】

文章目录类与对象&#xff08;1&#xff09;类与对象一1.0.面向过程和面向对象初步认识1.1.类的引入1.2.类的定义1.3.类的访问限定符及封装1.4.类的作用域1.5.类的实例化1.6.类的对象大小的计算1.8.类成员函数的this指针&#xff08;2&#xff09;类与对象二2.0类的6个默认成员…

LeetCode——51. N 皇后

一、题目 按照国际象棋的规则&#xff0c;皇后可以攻击与之处在同一行或同一列或同一斜线上的棋子。 n 皇后问题 研究的是如何将 n 个皇后放置在 nn 的棋盘上&#xff0c;并且使皇后彼此之间不能相互攻击。 给你一个整数 n &#xff0c;返回所有不同的 n 皇后问题 的解决方案…

CS144-Lab4

概述 在实验0中&#xff0c;你实现了流量控制的字节流(ByteStream)的抽象概念。 在实验1、2和3中&#xff0c;你实现了该抽象概念与互联网提供的抽象概念之间的转换工具&#xff1a;不可靠的数据报(IP或UDP)。 现在&#xff0c;你已经接近顶峰&#xff1a;一个可以工作的TCP…

Word处理控件Aspose.Words功能演示:使用 C++ 在 Word 文档中查找和替换文本

Aspose.Words 是一种高级Word文档处理API&#xff0c;用于执行各种文档管理和操作任务。API支持生成&#xff0c;修改&#xff0c;转换&#xff0c;呈现和打印文档&#xff0c;而无需在跨平台应用程序中直接使用Microsoft Word Aspose API支持流行文件格式处理&#xff0c;并允…

如何使用MidJourney和ChatGPT制作动画短片?

Ammaar Reshi当我制作这部使用生成式人工智能制作的蝙蝠侠动画短片时——我不知道它会在不到一周的时间内获得 700 万次观看。想学&#xff01;给我们讲解下是整体的制作流程吧&#xff01;&#xff01;opusAmmaar Reshi我不是电影制作人&#xff0c;也从未写过剧本。我只是有还…

高频面试题|JVM虚拟机的体系结构是什么样的?

一. 前言最近有很多小伙伴都在找工作&#xff0c;他们在面试时经常被面试官问到一个问题&#xff1a;请说说JVM虚拟机的体系结构是什么样的?很多小伙伴都能说出堆、栈等相关内容&#xff0c;但面试官紧接着又问&#xff0c;你还知道其他内容吗&#xff1f;这时不少小伙伴就语塞…

STM32模拟SPI协议获取24位模数转换(24bit ADC)芯片AD7791电压采样数据

STM32模拟SPI协议获取24位模数转换&#xff08;24bit ADC&#xff09;芯片AD7791电压采样数据 STM32大部分芯片只有12位的ADC采样性能&#xff0c;如果要实现更高精度的模数转换如24位ADC采样&#xff0c;则需要连接外部ADC实现。AD7791是亚德诺(ADI)半导体一款用于低功耗、24…

C语言--回调函数

1. 什么是回调函数&#xff1f; 回调函数&#xff0c;光听名字就比普通函数要高大上一些&#xff0c;那到底什么是回调函数呢&#xff1f;恕我读得书少&#xff0c;没有在那本书上看到关于回调函数的定义。我在百度上搜了一下&#xff0c;发现众说纷纭&#xff0c;有很大一部分…

力扣-部门工资前三高的所有员工

大家好&#xff0c;我是空空star&#xff0c;本篇带大家了解一道稍微复杂的力扣sql练习题。 文章目录前言一、题目&#xff1a;185. 部门工资前三高的所有员工二、解题1.正确示范①提交SQL运行结果2.正确示范②提交SQL运行结果3.其他总结前言 上一篇带大家练习了部门工资最高的…

CUDA硬件实现

CUDA硬件实现 文章目录CUDA硬件实现4.1 SIMT 架构4.2 硬件多线程NVIDIA GPU 架构围绕可扩展的多线程流式多处理器 (SM: Streaming Multiprocessors) 阵列构建。当主机 CPU 上的 CUDA 程序调用内核网格时&#xff0c;网格的块被枚举并分发到具有可用执行能力的多处理器。一个线程…

【C++】1.C++基础

1.命名空间 使用命名空间的目的是对标识符的名称进行本地化&#xff0c;以避免命名冲突或名字污染&#xff0c;namespace关键字的出现就是针对这种问题的。 1定义 定义命名空间&#xff0c;需要使用到namespace关键字&#xff0c;后面跟命名空间的名字&#xff0c;然后接一对…

DepGraph:适用任何结构的剪枝

文章目录摘要1、简介2、相关工作3、方法3.1、神经网络中的依赖关系3.2、依赖关系图3.3、使用依赖图剪枝4、实验4.1、设置。4.2、CIFAR的结果4.3、消融实验4.4、适用任何结构剪枝5、结论摘要 论文链接&#xff1a;https://arxiv.org/abs/2301.12900 源码&#xff1a;https://gi…

软考高级-信息系统管理师之质量管理(最新版)

质量管理目录 项目质量管理质量管理基础质量与项目质量质量管理质量管理标准体系1、IS09000系列,8项基本原则如下。2、全面质量管理(TQM)3、六西格码意为“六倍标准差”,4、软件过程改迸与能力成熟度模型项目质量管理过程规划质量管理1、规划质量管理2、规划质量管理:输入3、…

【java】Spring Cloud --Spring Cloud 的核心组件

文章目录前言一、Eureka&#xff08;注册中心&#xff09;二、Zuul&#xff08;服务网关&#xff09;三、 Ribbon&#xff08;负载均衡&#xff09;四、Hystrix&#xff08;熔断保护器&#xff09;五、 Feign&#xff08;REST转换器&#xff09;六、 Config&#xff08;分布式配…