Java技术栈总结:kafka篇

news2025/4/21 21:36:53

一、# 基础知识

1、安装

  • 部署一台ZooKeeper服务器;
  • 安装jdk;
  • 下载kafka安装包;
  • 上传安装包到kafka服务器上:/usr/local/kafka;
  • 解压缩压缩包;
  • 进入到config目录,修改server.properties配置信息:
#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0

#kafka部署的机器ip和提供服务的端⼝号
listeners=PLAINTEXT://192.168.65.60:9092

#kafka的消息存储⽂件
log.dir=/usr/local/data/kafka-logs

#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181
  • 进入到bin目录,使用命令启动kafka服务器(带配置文件)
  • ./kafka-server-start.sh -daemon ../config/server.properties
  • 检查kafka是否启动成功:
  • 进入到zk内查看是否有kafka节点:
    /brokers/ids/0

    2、基本概念

名称

说明

Broker

消息中间件处理节点,一个kafka节点为一个broker,一个或者多个broker组成一个kafka集群

Topic

消息主题。kafka根据topic对消息进行分类,发布到kafka集群的每条消息都需要指定一个topic

Producer

消息生产者。向broker发送消息的客户端。

Consumer

消息消费者。从broker读取消息的客户端。

3、主题创建

  • 通过kafka命令向zk中创建一个主题
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replicationfactor 1 --partitions 1 --topic test
  • 查看当前zk中所有的主题
./kafka-topics.sh --list --zookeeper 172.16.253.35:2181 test

4、发送消息

把消息发送给broker的某个topic,打开一个kafka发送消息的客户端,然后开始用客户端向kafka服务器发送消息。

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test

5、消费消息

打开一个消费消息的客户端,向kafka服务器的某个主题消费消息。

生产者将消息发送给broker,broker会将消息保存到本地的日志文中。/usr/local/kafka/data/kafka-logs/主题-分区/00000000.log;消息的保存是有序的,通过offset偏移量来描述消息的有序性;消费者消费消息时也是通过offset来描述所要消费消息的位置。

  • 方式一:从当前主题中的最后一条消息的offset + 1 开始消费:
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test
  • 方式二:从当前主题的第一条消息开始消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-beginning --topic test

6、单播&&多播消息

如果多个消费者在同一个消费组,那么只有一个消费者可以订阅到topic中的消息。即,同一个消费组中只能有一个消费者收到一个topic中的消息。

不同的消费组订阅同一个topic,那么不同消费组中各只有一个消费者能收到消息。

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --consumer-property group.id=testGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --consumer-property group.id=testGroup2 --topic test

7、查看消费组信息

/kafka-consumer-groups.sh --bootstrap-server 172.16.253.38:9092 --describe --group testGroup
  • current-offset: 最后被消费的消息的偏移量;
  • Log-end-offset: 消息总量(最后⼀条消息的偏移量);
  • Lag:积压了多少条消息。


二、主题与分区

1、主题 topic

kafka通过topic对消息进行分类,不同的topic会被订阅该topic的消费者消费。

如果一个topic的消息非常多,消息保存在log日志文件中,会占用大量的磁盘空间。为了解决文件过大的问题,kafka提出了Partition分区的概念。

2、分区

通过partition将一个topic中的消息分区来存储。好处:

  • 分区存储,解决了统一存储文件过大的问题;
  • 提升了读写的吞吐量:读和写可以同时在多个分区中进行。

创建多个分区的主题:

./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replicationfactor 1 --partitions 2 --topic test1

3、消息日志

  • 00000.log:保存的为消息;
  • __consumer_offset-49:
    • kafka内部创建了主题 “__consumer_offsets” 包含50个分区。这个主题用来存放消费者消费某个主题的偏移量。每个消费者会自己维护消费的主题的偏移量,即每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题consumer_offsets。kafka为了提升这个主题的并发性,默认设置了50个分区。
      • 提交到哪个分区,通过hash函数确定:
        • hash(cunsumerGroupId)% __consumer_offsets 主题的分区数;
        • 提交到该主题的内容:key为 consumerGroupId+topic+分区号,value为当前的offset值。
  • 文件内容默认保存7天,到期后消息自动删除。

三、集群

kafka的服务端由被称为Broker的服务进程构成,即一个kafka集群由多个Broker组成。如果集群中的某一台机器宕机,其他机器上的Broker仍然能够对外提供服务,确保kafka的高可用性。

1、集群搭建

  • 创建多个server.properties文件
# 0 1 2
broker.id=2
// 9092 9093 9094
listeners=PLAINTEXT://192.168.65.60:9094
// kafka-logs kafka-logs-1 kafka-logs-2
log.dir=/usr/local/data/kafka-logs-2
  • 通过命令分别启动各个broker
./kafka-server-start.sh -daemon ../config/server.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
  • 检查是否启动成功

进入到zk中查看 /brokers/ids 中是否有对应的znode(0,1,2)。

2、副本

# 副本
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replicationfactor 3 --partitions 2 --topic my-replicated-topic
# 查看topic情况
./kafka-topics.sh --describe --zookeeper 172.16.253.35:2181 --topic myreplicated-topic

副本是为了给主题中的分区创建多个备份,多个副本在kafka的集群的多个broker中,会有一个副本作为Leader,其他为Follower。

  • Leader:
    • kafka的写和读操作,都发生在Leader。Leader负责把数据同步给Follower,如果Leader挂了,通过主从选举,从多个Follower中选举产生一个新的Leader。
  • Follower:
    • 接收Leader的数据同步。
  • isr:
    • 可以同步和已经同步的节点会被存入到isr集合中。如果isr中的节点性能较差,会被从isr集合中剔除。

总结:集群中有多个broker,创建主题是可以指明主题有多个分区,可以为分区创建多个副本,不同的副本存放在不同的broker里。

3、集群消费

  • 一个partition(分区)只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性。多个partition的多个消费者的消费顺序的顺序性无法得到保证。
  • partition的数量决定了消费组中消费者的数量,同一个消费组中的消费者的数量最好不要超过partition的数量,否则超出的消费者消费不到消息。
  • 如果消费者挂了,会出发rebalance机制,会让其他消费者来消费该分区。

4、Controller、rebalance、Hw

(1)Controller

【Controller选举】:

启动时,每个broker会向zk创建一个临时序号节点,获得序号最小的那个broker将会作为集群中的Controller,负责:

  • Leader选举:当集群中一个副本的Leader挂掉,需要在集群中选举出一个新的Leader,选举从isr集合中的最左边获得。
  • broker信息同步:当集群中有broker新增或者减少,Controller会同步信息给其他broker。
  • 分区信息同步:当集群中有分区新增或者减少,Controller会同步信息给其他broker。

(2)reblance机制

前提:消费组中的消费者没有指明分区来消费;

触发的条件:消费组中的消费者和分区的关系发生变化;

分区分配的策略:reblance之前,分区有三种分配策略:

  • range:根据公式计算每个消费者消费哪几个分区,分区总数/消费者数量 + 1 (根据余数情况确定,前面几个消费者需要“+1”,后面几个不需要)。
  • 轮询:即依次轮着来。
  • sticky:粘合策略。如果需要reblance,会在之前已经分配的基础上进行调整,不会改变之前分配的情况。如果该策略没有开,那么久需要进行整体的重新分配。

(3)HW和LEO

HW是已经完成同步的位置。

消息在写入broker,且每个broker已经完成该消息的同步后,hw才会发生变化。在此之前消费者是消费不到这条消息的。在完成同步后,HW更新后,消费者才能消费到这条消息,这样的目的是为了防止消息丢失

LEO(log-end-offset)是某个副本最后的消息位置。


四、消息的同步异步发送

1、同步发送消息

如果生产者发送消息没有收到ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试的次数为3次。

【生产者的三种ack配置】

  • ack=0,kafka-cluster不需要任何的broker收到消息,就立即返回ack给生产者,最容易丢消息,效率最高。
  • ack=1(默认),多副本之间的Leader已经收到消息,并把消息写入到本地的log中,才返回ack给生产者,性能和安全新较为均衡。
  • ack=-1/all,配置min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要Leader和一个Follower同步完成后,才返回给ack给生产者(此时集群中有2个broker已经完成数据的接收)。这种方式最安全,但性能最差。

2、异步发送消息

异步发送,生产者发送完消息后就可以执行之后的业务,broker在收到消息后异步调用生产者提供的callback()回调方法。

3、消息发送的缓冲区

  • kafka生产者默认会创建一个消息缓冲区,用来存放要发送的消息,默认为32m;
    • props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  • kafka本地线程会去缓冲区中一次拉取16k的数据,发送到broker;
    • props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  • 如果拉取不到16k的数据,间隔10ms也会将已有的数据发送到broker。
    • props.put(ProducerConfig.LINGER_MS_CONFIG, 10);


五、消费者实现

1、消费者自动&&手动提交Offset

(1)提交的内容

“所属的消费组 + 主题 + 分区 + 消费的偏移量”,提交到集群的__consumer_offsets主题里面。

(2)自动提交

消费者poll消息下来后就自动提交offset。

// 是否自动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

// 自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

注:自动提交会丢消息。因为消费者在消费前提交offset,可能提交完后还没有完成消费,消费者就挂了。

(3)手动提交

需要把自动提交的配置改成false。

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

手动提交分为两种:

  • 手动同步提交:在消费完消息后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交成功,执行之后的逻辑。
  • 手动异步提交:在消息消费完后提交,不需要等待集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用。

2、长轮询poll消息

(1)默认情况下,消费者一次会拉取500条消息。

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

(2)可以设置长轮询的时间周期,例如1000ms。

  • 如果⼀次poll到500条,就直接执行for循环。
  • 如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s。
  • 如果多次poll都没达到500条,且1秒时间到了,那么直接执行for循环。
  • 如果两次poll的间隔超过30s,集群会认为该消费者的消费能⼒过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息条数少⼀点。
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

//如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢出消费组。将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

3、消费者健康状态检查

消费者每隔1skafka集群发送一次心跳,如果集群发现超过10s没有续约的消费者,会将其踢出消费者,触发消费组的reblance机制,将该分区的交给消费组里的其他消费者进行消费。

//consumer给broker发送⼼跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);

//kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

六、常见问题处理

1、防止消息丢失

(1)生产者发送消息到Broker的过程丢失

方式一:异步发送

  • 设置异步发送,发送失败的情况使用回调记录或者重发;
  • 失败重试,配置重试次数。

方式二:同步发送

  • 使用同步发送消息的方式。

(2)消息在Broker中存储丢失

  • 把ack设置为1或者all(-1),设置同步的分区数 >= 2,让Follower节点参与保存数据的确认。

(3)消费者从Broker接收消息丢失

  • 关闭自动提交偏移量,开启手动提交偏移量;
  • 提交方式,把自动提交改成手动提交(最好使用 同步 + 异步 提交)。

2、防止重复消费

如果生产者发送消息后,由于网络抖动等问题,没有收到ack,但是实际上broker已经收到了消息。此时,生产者会进行重试,于是broker就会收到多条相同的消息,从而造成重复消费。

解决:

  • 生产者关闭重试。这种方式会造成消息丢失(不推荐);
  • 消费者关闭自动提交偏移量,开启手动提交偏移量;
  • 消费者解决非幂等性消费问题:
    • 在数据库中创建联合主键,防止相同的主键创建出多条记录。
    • 使用分布式锁,以业务id为锁。保证只有一条记录能够创建成功。

3、保证顺序性消费

问题原因:一个topic的数据可能存储在不同的分区中,每个分区都有一个按照顺序存储的偏移量。如果消费者关联了多个分区,则不能保证顺序性。

解决该问题,只需要保证需要顺序消费的消息出现在同一个分区。

解决方法:

  • 方式一:
    • 发送消息时,指定分区号;
    • 发送消息时,按照相同的业务设置相同的key(默认情况下,分区是通过key的hashcode值来确定分区的。因此,key一样的话,分区也是一样的);
  • 方式二(不推荐):
    • 生产者:使用同步发送,ack设置成非0的值(1或者-1(all))。
    • 消费者:主题只设置一个分区,消费组只设置一个消费者。

主:实际kafka顺序消费的场景不多,因为会牺牲掉性能。

4、消息积压

(1)出现的原因

消费者的消费速度赶不上生产者的生产速度,导致kafka中大量的数据没有被消费。

随着积压消息的增多,消费者的寻址性能会下降,最终导致整个kafka对外提供服务的性能很差,从而造成其他服务访问速度变慢,造成服务雪崩。

(2)解决方案

  • 消费者中,使用多线程,充分利用机器的性能进行消费消息。
  • 通过业务的架构设计,提升业务层面消费的性能。
  • 创建多个消费组,多个消费者,部署到其他机器上,一起消费,提高消费者的消费速度。
  • 创建一个消费者,该消费者kafka另建一个主题,配上多个分区,多个分区再配上多个消费者。该消费者将消息poll下来,不进行消费,直接转发到新建的主题上。此时,新的主题的多个分区的多个消费者就开始一起消费了。(不常用)


参考:

https://www.bilibili.com/video/BV1Xy4y1G7zA?p=1;

https://www.bilibili.com/video/BV1yT411H7YK

https://www.jianshu.com/p/d3e963ff8b70;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1901974.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

绝区叁--如何在移动设备上本地运行LLM

随着大型语言模型 (LLM)(例如Llama 2和Llama 3)不断突破人工智能的界限,它们正在改变我们与周围技术的互动方式。这些模型早已集成到我们的手机中,但到目前为止,它们理解和处理请求的能力还非常有限。然而,…

2024年7月6日 (周六) 叶子游戏新闻

自动电脑内部录音器AutoAudioRecorder: 是一款免费的自动音频录制软件,可直接将电脑内部所有的声音录制成 mp3/wav 文件,包括音乐、游戏直播、网络会议、聊天通话等音频源。 卸载工具 HiBitUninstaller: Windows上的软件卸载工具 《不羁联盟》制作人&…

数据库测试|Elasticsearch和ClickHouse的对决

前言 数据库作为产品架构的重要组成部分,一直是技术人员做产品选型的考虑因素之一。 ClkLog会经常遇到小伙伴问支持兼容哪几种数据库?为什么是选择ClickHouse而不是这个或那个。 由于目前市场上主流的数据库有许多,这次我们选择其中一个比较典…

【密码学】密码学体系

密码学体系是信息安全领域的基石,它主要分为两大类:对称密码体制和非对称密码体制。 一、对称密码体制(Symmetric Cryptography) 在对称密码体制中,加密和解密使用相同的密钥。这意味着发送方和接收方都必须事先拥有这…

医院产科信息化管理系统源码,智慧产科管理系统,涵盖了从孕妇到医院初次建档、历次产检、住院分娩、统计上报到产后42天全部医院服务的信息化管理。

医院产科信息化管理系统源码,智慧产科管理系统,产科专科电子病历系统 技术架构:前后端分离Java,Vue,ElementUIMySQL8.0.36 医院产科信息化管理系统,通过构建专科病例系统实现临床保健一体化,涵…

线程池理解及7个参数

定义理解 线程池其实是一种池化的技术实现,池化技术的核心思想就是实现资源的复用,避免资源的重复创建和销毁带来的性能开销。线程池可以管理一堆线程,让线程执行完任务之后不进行销毁,而是继续去处理其它线程已经提交的任务。 …

【pytorch18】Logistic Regression

回忆线性回归 for continuous:y xwbfor probability output:yσ(xwb) σ:sigmoid or logistic 线性回归是简单的线性模型,输入是x,网络参数是w和b,输出是连续的y的值 如何把它转化为分类问题?加了sigmoid函数,输出的值不再是…

springboot服务启动读取不到application.yml中的nacos.config信息

我的版本: 可以添加bootstrap.yml文件,在里面添加nacos.config的配置信息 也可以添加VM参数 -Dspring.cloud.nacos.discovery.server-addr -Dspring.cloud.nacos.config.server-addr -Dspring.cloud.nacos.config.namespace -Dspring.cloud.nacos.discov…

Java实现登录验证 -- JWT令牌实现

目录 1.实现登录验证的引出原因 2.JWT令牌2.1 使用JWT令牌时2.2 令牌的组成 3. JWT令牌(token)生成和校验3.1 引入JWT令牌的依赖3.2 使用Jar包中提供的API来实现JWT令牌的生成和校验3.3 使用JWT令牌验证登录3.4 令牌的优缺点 1.实现登录验证的引出 传统…

LeetCode刷题之搜索二维矩阵

2024 7/5 一如既往的晴天,分享几张拍的照片嘿嘿,好几天没做题了,在徘徊、踌躇、踱步。蝉鸣的有些聒噪了,栀子花花苞也都掉落啦,今天给他剪了枝,接回一楼来了。ok,做题啦! 图1、宿舍…

EDA 2023 年世界国家suicide rate排名

文章目录 前言:关于数据集列 导入模块导入数据数据预处理探索性数据分析按性别划分的自杀率 [箱线图]相关矩阵热图自杀率最高的 15 个国家变化百分比最高的 15 个国家/地区2023 年世界地图上自杀率的国家 结尾: 前言: 随着社会的不断发展和变迁,人们对于各种社会问…

154. 寻找旋转排序数组中的最小值 II(困难)

154. 寻找旋转排序数组中的最小值 II 1. 题目描述2.详细题解3.代码实现3.1 Python3.2 Java 1. 题目描述 题目中转:154. 寻找旋转排序数组中的最小值 II 2.详细题解 该题是153. 寻找旋转排序数组中的最小值的进阶题,在153. 寻找旋转排序数组中的最小值…

2024 年第十四届亚太数学建模竞赛(中文赛项)浅析

需要完整B题资料,请关注:“小何数模”! 本次亚太(中文赛)数学建模的赛题已正式出炉,无论是赛题难度还是认可度,该比赛都是仅次于数模国赛的独一档,可以用于国赛前的练手训练。考虑到大家解题实属不易&…

品牌推广的核心价值:作用解析与意义探讨!

在激烈的市场竞争环境之下,品牌推广已经成为企业不可缺少的一部分。不仅关乎企业的知名度,对市场份额更是起到了决定性的作用。 作为一名手工酸奶品牌的创始人,目前全国也复制了100多家门店,这篇文章,我将和大家分享品…

web学习笔记(八十)

目录 1.小程序实现微信一键登录 2. 小程序的授权流程 3.小程序配置vant库 4.小程序配置分包 5.小程序配置独立分包 6.小程序分包预下载 1.小程序实现微信一键登录 要先实现小程序一键登录首先我们需要给按钮设置一个绑定事件,然后在绑定事件内部通过wx.login…

ETAS工具导入Com Arxml修改步骤

文章目录 前言Confgen之前的更改Confgen之后的修改CANCanIfComComMEcuM修改CanNmCanSMDCMCanTp生成RTE过程报错修改DEXT-诊断文件修改Extract问题总结前言 通讯协议栈开发一般通过导入DBC实现,ETAS工具本身导入DBC也是生成arxml后执行cfggen,本文介绍直接导入客户提供的arxml…

FastAPI+vue3+Primeflex教学20240706,渲染阶乘案例

子绝父相 相对定位是相对于自己原本的位置定位。 绝对定位,如果父元素设置了相对定位,则相对于父元素进行绝对定位,否则相对于最近的设置了相对定位的元素进行绝对定位,或者相对于根元素进行绝对定位。 定位有四个方向&#xff0…

白嫖A100-interLM大模型部署试用活动,亲测有效-2.Git

申明 以下部分内容来源于活动教学文档: Docs git 安装 是一个开源的分布式版本控制系统,被广泛用于软件协同开发。程序员的必备基础工具。 常用的 Git 操作 git init 初始化一个新的 Git 仓库,在当前目录创建一个 .git 隐藏文件夹来跟踪…

Ubuntu基本环境配置

#Jdk 安装 #--查看 已安装 的jdk软件 java -version # 安装jdk软件(如果有选择请选 y) sudo apt install openjdk-11-jdk # 自行学习 vi 或 vim 学习网址如下: # https://www.runoob.com/linux/linux-vim.html #-- 修改系统级 path : /etc/profile 文件 (注意要…

每周算法:无向图的双连通分量

题目链接 冗余路径, Redundant Paths G 题目描述 为了从 F F F 个草场中的一个走到另一个,奶牛们有时不得不路过一些她们讨厌的可怕的树。 奶牛们已经厌倦了被迫走某一条路,所以她们想建一些新路,使每一对草场之间都会至少有两条相互分离…