kafka学习笔记三

news2025/1/21 9:36:11

第二篇 外部系统集成

Flume、Spark、Flink、SpringBoot 这些组件都可以作为kafka的生产者和消费者,在企业中非常常见。

Flume官网:Welcome to Apache Flume — Apache Flume

Flink:Apache Flink_百度百科 

Spark:Apache Spark_百度百科

集成SpringBoot:略。 

第三篇 生产调优手册

第1章 kafka硬件配置选择

第2章 生产者调优

2.1 生产者核心参数配置

发送流程:做到心中有图,回忆。

生产者核心参数:

参数名称描述
bootstrap.servers生产者连接集群所需的 broker 地址清单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。
key.serializer 和 value.serializer
 
指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memoryRecordAccumulator 缓冲区总大小,默认32m
batch.size缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
acks0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,eader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和 all是等价的。
max.in.flight.requests.per.connection允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。
retries当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1,否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是 100ms
enable.idempotence是否开启幂等性,默认 true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。

2.2 生产者如何提高吞吐量

2.3 数据可靠性

2.4 数据去重

略。

2.5 数据有序

单分区内,有序(有条件的,不能乱序);多分区,分区与分区间无序。

2.6 数据乱序

第3章 Kafka Broker调优

3.1 Broker核心参数配置

Kafka Broker总体工作流程:心中有图,回忆。

Broker核心参数:

参数名称描述
replica.lag.time.max.msISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s
auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡。建议关闭。
leader.imbalance.per.broker.percentage默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
log.segment.bytesKafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G
log.index.interval.bytes默认4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。
log.retention.hoursKafka 中数据保存的时间,默认 7 天。生成中一般设置3天
log.retention.minutesKafka 中数据保存的时间,分钟级别,默认关闭。
log.retention.msKafka 中数据保存的时间,毫秒级别,默认关闭。
log.retention.check.interval.ms检查数据是否保存超时的间隔,默认是 5 分钟。
log.retention.bytes默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。
log.cleanup.policy默认是 delete,表示所有数据启用删除策略;
如果设置值为 compact,表示所有数据启用压缩策略。
num.io.threads默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。
num.replica.fetchers默认是 1。副本拉取线程数,这个参数占总核数的 50%的 1/3
num.network.threads默认是 3。数据传输线程数,这个参数占总核数的 50%的 2/3 。
log.flush.interval.messages强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。

3.2 其他

1. 服役/退役新节点

(1)创建一个要均衡的主题。

(2)生成一个负载均衡的计划。

(3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)。

(4)执行副本存储计划。

(5)验证副本存储计划。

2. 增加分区

分区只能增加,不能减少。

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --alter --topic first --partitions 3

3. 增加副本因子

4. 手动调整分区副本存储

5. Leader Partition负载均衡

6. 自动创建主题

如果 broker 端配置参数 auto.create.topics.enable 设置为 true(默认值是 true),那么当生
产者向一个未创建的主题发送消息时,会自动创建一个分区数为 num.partitions(默认值为
1)、副本因子为 default.replication.factor(默认值为 1)的主题。除此之外,当一个消费者
开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会
自动创建一个相应主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。
生产环境建议将该参数设置为 false。

第4章 消费者调优

4.1 消费者核心参数配置

消费者组初始化流程:回忆。

消费者组详细消费流程如下:

消费者核心参数:

参数名称描述
bootstrap.servers 向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer 和 value.deserializer指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。
group.id标记消费者所属的消费者组。
enable.auto.commit默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s
auto.offset.reset当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? 
earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 
none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 
anything:向消费者抛异常。
offsets.topic.num.partitions__consumer_offsets 的分区数,默认是 50 个分区。不建议修改
heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。不建议修改。
session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes默认 1 个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records一次 poll 拉取数据返回消息的最大条数,默认是 500 条

4.2 消费者再平衡

4.3 指定offset消费

4.4 指定时间消费

4.5 消费者事务

4.6 消费者如何提高吞吐量

回忆。

第5章 Kafka总体看调优

5.1 如何提升吞吐量★★★★★

1)提升生产吞吐量

(1)buffer.memory:发送消息的缓冲区大小,默认值是 32m,可以增加到 64m。

(2)batch.size:默认是 16k。如果 batch 设置太小,会导致频繁网络请求,吞吐量下降;
如果 batch 太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时。

(3)linger.ms,这个值默认是 0,意思就是消息必须立即被发送。一般设置一个 5-100毫秒。如果 linger.ms 设置的太小,会导致频繁网络请求,吞吐量下降;如果 linger.ms 太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。

(4)compression.type:默认是 none,不压缩,但是也可以使用 lz4 压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大 producer 端的 CPU 开销。

2)增加分区

3)消费者提高吞吐量

(1)调整 fetch.max.bytes 大小,默认是 50m。

(2)调整 max.poll.records 大小,默认是 500 条。

4)增加下游消费者处理能力

5.2 数据精准一次

1) 生产者角度

  • acks 设置为-1 (acks=-1)。
  • 幂等性(enable.idempotence = true) + 事务 。

2) broker服务端角度

  • 分区副本大于等于2(--replication-factor 2)。
  • ISR 里应答的最小副本数量大于等于2(min.insync.replicas = 2)。

3) 消费者

  • 事务 + 手动提交offset(enable.auto.commit = false)。
  • 消费者输出的目的地必须支持事务(MySQL、Kafka)。

5.3 合理设置分区数

(1)创建一个只有 1 个分区的 topic。

(2)测试这个 topic 的 producer 吞吐量和 consumer 吞吐量。

(3)假设他们的值分别是 Tp 和 Tc,单位可以是 MB/s。

(4)然后假设总的目标吞吐量是 Tt,那么分区数 = Tt / min(Tp,Tc)。

例如:producer 吞吐量 = 20m/s;consumer 吞吐量 = 50m/s,期望吞吐量 100m/s;

分区数 = 100 / 20 = 5 分区

分区数一般设置为:3-10 个

分区数不是越多越好,也不是越少越好,需要搭建完集群,进行压测,再灵活调整分区个数。


5.4 单条日志大于1m

5.5 服务器挂了

在生产环境中,如果某个 Kafka 节点挂掉。

正常处理办法:

(1)先尝试重新启动一下,如果能启动正常,那直接解决。

(2)如果重启不行,考虑增加内存、增加 CPU、网络带宽。

(3)如果将 kafka 整个节点误删除,如果副本数大于等于 2,可以按照服役新节点的方式重新服役一个新节点,并执行负载均衡。

5.6 集群压力测试

1. 生产者压力测试

通过动态调整以下4个参数来控制

  • batch.size=16384
  • linger.ms=0
  • compression.type
  • buffer.memory

2. 消费者压力测试

  • max.poll.records(一次拉取条数)默认是500条;
  • fetch.max.bytes(拉取一批数据大小)默认是50M;

通过修改这两个参数的值来进行测试。这两个参数在kafka/config/consumer.properties 文件中进行配置。这两个参数可以提高吞吐量。

=========================源码解析=============================

第四篇 源码分析

第2章 生产者源码

回忆发送流程原理图,结合原理图看源码更容易。

发送流程:

2.1 初始化

更多内容,见文档。

2.2  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1475108.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微服务Springcloud智慧工地APP源码 AI人工智能识别 支持多工地使用

目录 一、现状描述 二、行业难点 APP端功能 一、项目人员 二、视频监控 三、危大工程 四、绿色施工 五、安全隐患 AI智能识别 环境监测 实名制管理 智慧监测 智慧工地全套解决方案 一、现状描述 建筑工程建设具有明显的生产规模大宗性与生产场所固定性的特点。建…

Alist访问主页显示空白解决方法

文章目录 问题记录问题探索和解决网络方案问题探究脚本内容查看 最终解决教程 问题记录 访问Alist主页显示空白,按F12打开开发人员工具 ,选择控制台,报错如下 index.75e31196.js:20 Uncaught TypeError: Cannot assign to read only property __symbo…

科技论文编写思路

科技论文编写思路 1.基本框架2.课题可行性评估1.研究目标和意义2.研究方法和技术3.可行性和可操作性4.风险和不确定性5.经济性和资源投入6.成果预期和评估 3.写作思路4.利用AI读论文5.实验流程 1.基本框架 IntroductionRelated worksMethodExperiment and analysisDiscussionC…

关于电商API接口数据采集,您需要的了解更多!

关于数据采集 随着企业业务数字化转型的推进,非数字原生企业对数据的感知和获取提出了新的要求和挑战,原有信息化平台的数据输出和人工录入能力已经远远满足不了企业内部组织在数字化下的运作需求。企业需要构建数据感知能力,采用现代化手段…

SQLlabs46关

如果我们直接用列去排序 ?sortusername/password username: passward 可以看到顺序是不同的,当然第一列第二列第三列也可以,基本上都是这个原理,那怎么去实现注入呢,我们主要是通过rand()去实现一个盲注或者报错注入…

Day05:反弹SHELL不回显带外正反向连接防火墙出入站文件下载

目录 常规基本渗透命令 文件上传下载-解决无图形化&解决数据传输 反弹Shell命令-解决数据回显&解决数据通讯 防火墙绕过-正向连接&反向连接&内网服务器 防火墙组合数据不回显-ICMP带外查询Dnslog 思维导图 章节知识点: 应用架构:W…

浅谈 Linux 网络编程 - 网络字节序

文章目录 前言核心知识关于 小端法关于 大端法网络字节序的转换 函数 前言 在进行 socket 网络编程时,会用到字节流的转换函数、例如 inet_pton、htons 等,那么为什么要用到这些函数呢,本篇主要就是对这部分进行介绍。 核心知识 重点需要记…

HTML5 CSS3 提高

一,HTML5的新特性 这些新特性都有兼容性问题,基本是IE9以上版本的浏览器才支持,如果不考虑兼容性问题,可以大量使用这些新特性。 1.1新增语义化标签 注意: 1这种语义化标签主要是针对搜索引擎的 2这些新标签在页面…

36.云原生之SpringCloud+k8s实践

云原生专栏大纲 文章目录 SpringCloudk8s介绍spring-cloud-kubernetes服务发现配置管理负载均衡选主 spring-cloud-bookinfo案例构建项目环境配置namespace部署与验证productpagegatewaybookinfo-admindetailsratingsreviewsreviews-v1reviews-v2 总结 SpringCloudk8s介绍 ht…

Day04:APP架构小程序H5+Vue语言Web封装原生开发Flutter

目录 常见APP开发架构 APP-开发架构-原生态-IDEA APP-开发架构-Web封装-平台 APP-开发架构-H5&Vue-HBuilderX WX小程序-开发架构-Web封装-平台 WX小程序-开发架构-H5&Vue-HBuilderX 思维导图 章节知识点: 应用架构:Web/APP/云应用/三方服…

Oracle中序列

1. Sequence 定义 在Oracle中可以用SEQUENCE生成自增字段。Sequence序列是Oracle中用于生成数字序列的对象,可以创建一个唯一的数字作为主键。 2. 为什么要用 Sequence 你可能有疑问为什么要使用序列? 不能使用一个存储主键的表并每次递增吗&#xf…

使用 kubeadm 部署k8s集群

一、所有节点系统初始化 1、常规初始化 2、内核版本升级以及内核限制文件参数修改 还可以考虑将旧版本的内核卸载 二、准备nginx负载均衡器和keepalived nginx四层代理: keepalived配置: nginx检测脚本: 三、所有节点部署docker&#xff0c…

Linux-Uboot命令

help命令 进入 uboot 的命令行模式后输入“help”或者“?”,然后按下回车即可查看当前 uboot 所支持的命令。 查看某一个命令的帮助信息:?命令名称 或 help命令名称 信息查询命令 常用的和信息查询有关的命令有 3 个…

3D工业相机及品牌集合

3D相机可以获取物理世界的空间信息,即立体三维的物理信息,不仅可以拍摄到场景的二维图像,而且能获取物体之间的位置关系,再经过进一步深化处理,还能完成三维建模等应用。 3D相机三种方案 1、结构光 通常采用特定波长…

如何使用ETLCloud拉通金蝶云

一、ETLCloud集成组件 ETLCloud采用了一种创新的基于平台底座的理念。它通过将组件和平台进行分离,用户可以在平台上自行下载和安装所需的组件,而无需升级整个底座版本。这样用户就可以通过不断升级组件来增强数据集成平台的处理能力。同时,…

论文阅读:SOLOv2: Dynamic, Faster and Stronger

目录 概要 Motivation 整体架构流程 技术细节 小结 论文地址:[2003.10152] SOLOv2: Dynamic and Fast Instance Segmentation (arxiv.org) 代码地址:GitHub - WXinlong/SOLO: SOLO and SOLOv2 for instance segmentation, ECCV 2020 & NeurIPS…

阿里云ECS服务器vCPU是什么意思?

阿里云ECS服务器vCPU和CPU是什么意思?CPU和vCPU有什么区别?一台云服务器ECS实例的CPU选项由CPU物理核心数和每核线程数决定,CPU是中央处理器,一个CPU可以包含若干个物理核,通过超线程HT(Hyper-Threading&am…

一文读懂什么是 OCR 识别

在数字化时代,信息处理和数据管理是企业运营的重要环节。然而,手工输入信息存在效率低和准确性低的问题,严重影响了企业的工作流程和决策过程。因此,OCR(Optical Character Recognition)识别技术的应用变得…

必看——HTTP怎么升级成HTTPS

将HTTP升级为HTTPS主要涉及获取SSL/TLS证书并在您的服务器上配置它。这个过程可以增强网站的安全性,通过加密客户端和服务器之间的通信来保护数据。下面是一个基本的步骤指南: 1.购买SSL/TLS证书:您可以从许多证书颁发机构(CA&…

期货程序化软件 日内抄单软件期货交易程序

按钮&#xff1a;锁定合约、设置、合约、<(折叠按钮) 锁定合约&#xff1a;点击锁定合约后&#xff0c;合约列表不再接受点击事件。再次点击锁定合约按钮可以进行解锁。 设置按钮&#xff1a;点击设置按键后&#xff0c;打开设置窗口&#xff0c;进行交易相关的设置。 合…