kafka调优配置

news2025/1/21 5:48:06

Kafka生产者核心参数配置

在这里插入图片描述来源于尚硅谷

参数名称描述
bootstrap.servers生产者连接集群所需的broker地址清单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置1个或者多个,中间用逗号隔开。注意这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。
key.serializer和value.serializer指定发送消息的key和value的序列化类型。一定要写全类名。
buffer.memoryRecordAccumulator缓冲区总大小,默认32m。
batch.size缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。
acks0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader收到数据后应答。-1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。
max.in.flight.requests.per.connection允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是100ms。
enable.idempotence是否开启幂等性,默认true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。

生产者如何提高吞吐量

参数名称描述
buffer.memoryRecordAccumulator缓冲区总大小,默认32m。
batch.size缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间
compression.type生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。

数据可靠性

acks0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader收到数据后应答。-1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。

至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

数据去重: 检查是否开启幂等性 默认true,表示开启幂等性

// 1初始化事务
void initTransactions();

// 2开启事务
void beginTransaction() throws ProducerFencedException;

// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;

// 4提交事务
void commitTransaction() throws ProducerFencedException;

// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

数据有序 :单分区内,有序(有条件的,不能乱序);多分区,分区与分区间无序;

Kafka Broker 核心参数配置

在这里插入图片描述

参数描述
replica.lag.time.max.msISR中,如果Follower长时间未向Leade r发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值,默认30s。
auto.leader.rebalance.enable默认是true。 自动Leader Partition 平衡。建议关闭。
leader.imbalance.per.broker.percentage默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。
leader.imbalance.check.interval.seconds默认值300秒。检查leader负载是否平衡的间隔时间。
log.segment.bytesKafka中log日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G。
log.index.interval.bytes默认4kb,kafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。
log.retention.hoursKafka中数据保存的时间,默认7天。
log.retention.minutesKafka中数据保存的时间,分钟级别,默认关闭。
log.retention.msKafka中数据保存的时间,毫秒级别,默认关闭。
log.retention.check.interval.ms检查数据是否保存超时的间隔,默认是5分钟。
log.retention.bytes默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment。
log.cleanup.policy默认是delete,表示所有数据启用删除策略;如果设置值为compact,表示所有数据启用压缩策略。
num.io.threads默认是8。负责写磁盘的线程数。整个参数值要占总核数的50%。
num.replica.fetchers默认是1。副本拉取线程数,这个参数占总核数的50%的1/3
num.network.threads默认是3。数据传输线程数,这个参数占总核数的50%的2/3 。
log.flush.interval.messages强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。

自动创建主题:如果broker端配置参数auto.create.topics.enable设置为true(默认值是true),那么当生产者向一个未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。生产环境建议将该参数设置为false。

Kafka消费者核心参数配置

在这里插入图片描述
在这里插入图片描述

参数描述
bootstrap.servers向Kafka集群建立初始连接用到的host/port列表。
key.deserializer和value.deserializer指定接收消息的key和value的反序列化类型。一定要写全类名。
group.id标记消费者所属的消费者组。
enable.auto.commit默认值为true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。
auto.offset.reset当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions__consumer_offsets的分区数,默认是50个分区。不建议修改。
heartbeat.interval.msKafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于session.timeout.ms ,也不应该高于 session.timeout.ms的1/3。不建议修改。
session.timeout.msKafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes默认1个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records一次poll拉取数据返回消息的最大条数,默认是500条。

消费者再平衡

参数名称描述
heartbeat.interval.msKafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于 session.timeout.ms,也不应该高于 session.timeout.ms 的1/3。
session.timeout.msKafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。
partition.assignment.strategy消费者分区分配策略,默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。可以选择的策略包括:Range、RoundRobin、Sticky、CooperativeSticky

如何提升吞吐量
1)提升生产吞吐量
(1)buffer.memory:发送消息的缓冲区大小,默认值是32m,可以增加到64m。
(2)batch.size:默认是16k。如果batch设置太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
(3)linger.ms,这个值默认是0,意思就是消息必须立即被发送。一般设置一个5-100毫秒。如果linger.ms设置的太小,会导致频繁网络请求,吞吐量下降;如果linger.ms太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
(4)compression.type:默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的CPU开销。
2)增加分区
3)消费者提高吞吐量
(1)调整fetch.max.bytes大小,默认是50m。
(2)调整max.poll.records大小,默认是500条。
4)增加下游消费者处理能力

如何保证数据精准发送接收

1)生产者角度

  • acks设置为-1 (acks=-1)
  • 幂等性(enable.idempotence = true) + 事务 。

2)broker服务端角度

  • 分区副本大于等于2 (–replication-factor 2)。
  • ISR里应答的最小副本数量大于等于2 (min.insync.replicas = 2)。

3)消费者

  • 事务 + 手动提交offset (enable.auto.commit = false)。
  • 消费者输出的目的地必须支持事务(MySQL、Kafka)。

合理设置分区数
(1)创建一个只有1个分区的topic。
(2)测试这个topic的producer吞吐量和consumer吞吐量。
(3)假设他们的值分别是Tp和Tc,单位可以是MB/s。
(4)然后假设总的目标吞吐量是Tt,那么分区数 = Tt / min(Tp,Tc)。
例如:producer吞吐量 = 20m/s;consumer吞吐量 = 50m/s,期望吞吐量100m/s;
分区数 = 100 / 20 = 5分区
分区数一般设置为:3-10个
分区数不是越多越好,也不是越少越好,需要搭建完集群,进行压测,再灵活调整分区个数。

单条日志大于1m

参数名称描述
message.max.bytes默认1m,broker端接收每个批次消息最大值。
max.request.size默认1m,生产者发往broker每个请求消息最大值。针对topic级别设置消息体的大小。
replica.fetch.max.bytes默认1m,副本同步数据,每个批次消息最大值。
fetch.max.bytes默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/943239.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

bpmnjs Properties-panel拓展(ExtensionElements拓展篇)

接上文bpmnjs Properties-panel拓展&#xff08;属性设置篇&#xff09;&#xff0c;继续记录下第三个拓展需求的实现。 需求简述 在ExclusiveGateway标签的extensionElements标签中增加子标签<activiti:executionListener>子标签&#xff0c;可增加复数子标签。子标签…

vLLM 实战

引言 随着人工智能技术的飞速发展&#xff0c;以及今年以来 ChatGPT 的爆火&#xff0c;大语言模型 (Large Language Model, LLM) 受到越来越多的关注。 为了实现 LLM 部署时的推理优化&#xff0c;全球各地有众多团队做出了各种优化框架。本文以加州大学伯克利分校开发的 vLLM…

R语言之基础绘图

文章和代码已经归档至【Github仓库&#xff1a;https://github.com/timerring/dive-into-AI 】或者公众号【AIShareLab】回复 R语言 也可获取。 文章目录 1. 函数 plot( )2.直方图和密度曲线图3.条形图4. 饼图5. 箱线图和小提琴图6. 克利夫兰点图7. 导出图形小结 R 的基础绘图系…

大模型开发05:PDF 翻译工具开发实战

大模型开发实战05:PDF 翻译工具开发实战 PDF-Translator 机器翻译是最广泛和基础的 NLP 任务 PDF-Translator PDF 翻译器是一个使用 AI 大模型技术将英文 PDF 书籍翻译成中文的工具。这个工具使用了大型语言模型 (LLMs),如 ChatGLM 和 OpenAI 的 GPT-3 以及 GPT-3.5 Turbo 来…

【通用消息通知服务】0x4 - 目前进展 阶段复盘

【通用消息通知服务】0x4 - 阶段复盘 达成 基本的API已经写完✍️了(消息查看发送, 模板crud,终端crud,发送渠道crud,计划crud,计划执行查看)拆分server, executor, planner三个入口, 方便针对性水平扩展整体架构初步形成&#xff0c;通过队列实现了事件驱动模型和消息订阅发…

Spring Boot(Vue3+ElementPlus+Axios+MyBatisPlus+Spring Boot 前后端分离)【六】

&#x1f600;前言 本篇博文是关于Spring Boot(Vue3ElementPlusAxiosMyBatisPlusSpring Boot 前后端分离)【六】&#xff0c;希望你能够喜欢 &#x1f3e0;个人主页&#xff1a;晨犀主页 &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是晨犀&#xff0c;希望我的文章…

Apache RocketMQ 5.0 消息进阶:如何支撑复杂的业务消息场景?

作者&#xff1a;隆基 一致性 首先来看 RocketMQ 的第一个特性-事务消息&#xff0c;事务消息是 RocketMQ 与一致性相关的特性&#xff0c;也是 RocketMQ 有别于其他消息队列的最具区分度的特性。 以大规模电商系统为例&#xff0c;付款成功后会在交易系统中订单数据库将订单…

三---开关稳压器

通过控制系统反馈&#xff0c;当电压上升时通过反馈降低&#xff0c;当电压下降时通过反馈升高&#xff1b;形成一个控制环路&#xff1b;控制电路&#xff1a;PWM&#xff08;脉宽调制&#xff09;&#xff0c;PFM&#xff08;频率控制方式&#xff09;&#xff0c;移相控制方…

2023开学季有哪些电容笔值得买?平价电容笔推荐

在日常生活中&#xff0c;这支电容笔的使用范围很广&#xff0c;不管是和电脑、IPAD、手机一起使用&#xff0c;都可以说是一种很好的办公工具。首先要弄清楚自己的需求&#xff0c;再根据自己的需求来挑选合适的商品。苹果的Pencil有一种独特的重力压感&#xff0c;让人在上面…

【MTK平台】根据kernel log分析wifi scan的时候流程

一 概要: 本文主要讲解根据kernel log分析下 当前路径下(vendor/mediatek/kernel_modules/connectivity/wlan/core/gen4m/)wifi scan的时候代码流程 二. Log分析: 先看Log: 2.1)在Framework层WifiManager.java 方法中,做了一个标记,可以精准的确认时间 这段log可以…

基于Qt5开发图形界面——WiringPi调用Linux单板电脑IO

Qt5——WiringPi Qt5WiringPi示例教程 Qt5 Qt是一种跨平台的应用程序开发框架。它被广泛应用于图形用户界面&#xff08;GUI&#xff09;开发&#xff0c;可以用于构建桌面应用程序、移动应用程序和嵌入式应用程序。Qt提供了丰富的功能和工具&#xff0c;使开发人员可以快速、高…

企业内部搭建wiki的意义大吗?到底有何价值体现?

内部Wiki也叫做企业Wiki&#xff0c;是员工可以存储、共享和协作创作的地方&#xff0c;将企业内部员工知识共享集中到一个地方&#xff0c;并且相关内容与其他团队成员协作完成&#xff0c;它可以包含企业内部的各种知识&#xff0c;从操作指南到培训手册&#xff0c;再到客户…

ipad有必要用手写笔吗?开学季实惠的电容笔推荐

iPad平板的机型经过了一次又一次的升级&#xff0c;增加了更多的功能&#xff0c;如今已有了与笔记本电脑匹敌的能力。而到了如今&#xff0c;科技的发展&#xff0c;iPad也从一个娱乐工具&#xff0c;变成了一个集学习、画画、办公于一体的强大工具。为了提高生产效率&#xf…

Linux内核学习(十二)—— 页高速缓存和页回写(基于Linux 2.6内核)

目录 一、缓存手段 二、Linux 页高速缓存 三、flusher 线程 Linux 内核实现了一个被叫做页高速缓存&#xff08;page cache&#xff09;的磁盘缓存&#xff0c;它主要用来减少对磁盘的 I/O 操作。它是通过把磁盘中的数据缓存到内存中&#xff0c;把对磁盘的访问变为对物理内…

比较器DATESHEET参数

失调电压的理解 Input Offset Voltage 理想情况下&#xff0c;如果运算放大器的两个输入电压完全相同&#xff0c;则输出应为0V。但运放内部两输入支路无法做到完全平衡&#xff0c;导致输出永远不会是0&#xff0c;具体见图1所示。此时保持放大器负输入端不变&#xff0c;而在…

火绒能一键修复所有dll缺失吗?教你快速修复dll文件

关于dll文件的缺少&#xff0c;其实大家应该都是不陌生的吧&#xff0c;毕竟只要是经常使用电脑的人&#xff0c;那么它就一定碰到过各种各样的dll文件缺失&#xff0c;因为很多程序都是需要dll文件来支撑的&#xff0c;如果dll文件丢失了&#xff0c;那么一些程序就会启动不了…

新版白话空间统计(27):从离散点到密度图

点的密度是点分析中一个很重要的方向&#xff0c;有大量的点数据的空间表达&#xff0c;基本上都是通过密度图来进行体现的&#xff0c;比如百度热力图&#xff1a; 又或者是交通车流量热力图&#xff1a; 空间点密度分析&#xff0c;把海量离散的点&#xff0c;变成高度抽象的…

Tableau可视化进阶实践-1

各类地图 符号地图特点 填充地图特点 多维地图特点 混合地图特点 符号地图 特点 符号化&#xff1a;符号地图以符号的形式表示信息&#xff0c;使用不同的符号来代表不同的地理要素或特定的地理信息。这种符号化的方式可以简化地理信息的表达&#xff0c;使人们更容易理解和识…

基于ssm+vue斗车车辆交易系统源码和论文

基于ssmvue斗车交易系统源码和论文082 开发工具&#xff1a;idea 数据库mysql5.7 数据库链接工具&#xff1a;navcat,小海豚等 技术&#xff1a;ssm 摘 要 21世纪的今天&#xff0c;随着社会的不断发展与进步&#xff0c;人们对于信息科学化的认识&#xff0c;已由低层次…

一个程序员眼中的API调用(淘宝/天猫/1688/拼多多API)

在程序员眼中&#xff0c;API调用是一种重要的编程概念&#xff0c;它允许开发人员通过预先定义好的接口和规范&#xff0c;调用其他应用程序或服务的功能。API调用是现代软件开发中不可或缺的一部分&#xff0c;它使得开发人员能够快速构建出复杂的应用程序&#xff0c;同时避…