Kafka集群搭建的两种方式

news2025/1/11 11:01:46

目录

1. 依赖Zookeeper搭建集群

1. 下载Kafka二进制文件

2. 更改kafka配置

3. 启动Zookeeper集群和Kafka集群

4. 验证集群

1.创建主题

2. 检查主题是否存在

3. 创建生产者生产数据

4. 创建消费者消费数据

5. 检查Zookeeper中Kafka集群的元数据 

2. 不依赖Zookeeper搭建Kafka集群

1.下载Kafka二进制文件

2.更改Kafka配置

3. 生成meta.properties文件

 4. 启动kafka集群

5. 验证集群

1. 创建主题,并验证主题存在

2. 生产者生产数据

3. 消费者消费数据

​编辑


1. 依赖Zookeeper搭建集群

在2.8.0版本以前,Kafka集群的搭建依赖于Zookeeper环境,需要将kafka集群节点ip及主题等Kafka相关的元数据存入Zookeeper服务中。所以搭建Kafka集群前,需要先搭建Zookeeper集群。

zookeeper多主机集群搭建icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141052933?spm=1001.2014.3001.5501

1. 下载Kafka二进制文件

搭建完Zookeeper集群后,从Kafka官网下载Kafka二进制文件,这里我们下载最新的3.8.0版本。

然后创建/kafka文件夹,将文件传入该文件夹解压,并将解压后的文件改名为kafka 。

#解压文件
tar -xf ./kafka_2.13-3.8.0.tgz
#更改名字
mv ./kafka_2.13-3.8.0 ./kafka

这里我们搭建三个节点的Kafka集群,所以要再三个主机都进行如上操作。

2. 更改kafka配置

修改kafka/conf/server.properties文件中的如下配置

#集群中kafka节点的唯一id,其他的节点要配置别的id
broker.id=0 

#Kafka启动时会向Zookeeper集群进行服务注册,这个配置项记录所有Zookeeper节点的ip和端口,只记录一个localhost也能成功启动,不过为了集群的高可用性,全部配置更好。
zookeeper.connect=ip1:2181,ip2:2181,ip3:2181 

#Kafka监听的IP端口,由于现在的主题通常是多网卡环境,所以需要指定ip,早期版本好像可以使用0.0.0.0监听全部的ip,不过这个版本不可以,必须指定明确的ip。
listeners=PLAINTEXT://ip1:9092

#kafka日志的存储位置,实际上是Kafka的全部数据,包括Kafka内部存储的消息,以及Kafka集群第一次启动时从Zookeeper中获取的集群元数据。
log.dirs=/path/to/kafka/logs

启动一个简单的集群,Kafka不需要太多的配置,不过实际生产环境中Kafka的部署需要考虑大量因素,需要大量的调整配置,运维难度较大。

3. 启动Zookeeper集群和Kafka集群

如果在Zookeeper集群搭建后已经启动了,这里就不需要在启动了,如果没有启动,不仅可以通过Zookeeper提供的zkServer.sh可执行文件启动,也可以通过Kafka的可执行文件启动。

#启动Zookeeper,如果之前没启动
bin/zookeeper-server-start.sh config/zookeeper.properties

#启动kafka
bin/kafka-server-start.sh config/server.properties

#后台启动Kafka,不占用当前终端
nohup bin/kafka-server-start.sh config/server.properties > kafka.out 2>&1 &

4. 验证集群

集群启动成功后,我们可以通过创建生产者,生产数据,并在另一个Kafka服务进行消费来验证集群创建是否成功。

1.创建主题

通过调用Kafka的bin目录下的kafka-topic.sh文件创建主题,命令如下。

./bin/kafka-topics.sh --create --topic test-topic --bootstrap-server 192.168.142.20:9092 --replication-factor 3 --partitions 3
#create代表当前执行的创建主题操作
#topic指定创建主题名称
#bootstrap-server指定从哪个kafka服务实例连接集群,可以指定多个,防止某个节点宕机造成指令执行失败。
#replication-factor指定当前主题要复制几份,分给其他Kafka服务实例,当前是三节点集群,我们创建三份,保证每个节点都有该主题数据。
#当复制份数少于集群节点数时会有部分节点没有该主题数据,不过不会影响kafka集群正常消费数据。

创建成功后结果如下图

2. 检查主题是否存在

依旧是调用Kafka-topic.sh文件,检查主题是否存在

./bin/kafka-topics.sh --list --bootstrap-server 192.168.142.21:9092
#list指的是当前是要执行查看主题列表操作
#bootstrap-server指定接入集群的kafka服务地址

这里我们通过第二台主机,连接第二个kafka节点获取主题列表,结果如下

3. 创建生产者生产数据

确定主题存在后,我们再用这台主机,和这个kafka节点服务创建生产者生产数据。

bin/kafka-console-producer.sh --topic test-topic --bootstrap-server 192.168.142.21:9092

 仔细阅读上面的配置解释,这里的配置就不解释了,应该很容易看懂,结果如下

可以看到执行完可执行文件后,会出现特殊的命令行,在这个命令行填写数据并回车,则可以向指定的主题消费数据,这里我生产了三条数据。

4. 创建消费者消费数据

这里我们再用第三台主机和第三个kafka节点服务来消费数据,通过可执行文件kafka-console-consumer.sh执行以下命令

bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server 192.168.142.22:9092

结果如下

可以看到成功消费到生产者生产的数据

5. 检查Zookeeper中Kafka集群的元数据 

通过zkCli.sh执行如下命令连接zookeeper。

./zkCli.sh -server 192.168.142.20

连接后通过调用ls -R / 查看zookeeper中所有的节点信息,内容过长,这里无法截图,直接复制到代码块中,并且删除掉__consumer_offset节点下的所有子节点信息,方便大家查看。

从如下结果可以看到,除了/zookeeper节点外,其他节点均为kafka集群元数据信息,其中/brokers/ids节点由0,1,2三个子节点,分别记录了Kafka集群中每个节点的ip端口等信息,而/brokers/topics下保存了所有主题信息,其中可以看到除了我们创建的test-topic主题外,还有__consumer_offsets主题,这个主题中记录了所有消费者组和其负责的主题分区的对应关系,内容极多,这里为了方便大家观看结果,将其删除,想了解更多可以看我的另一篇文章

Kafka基本概念及消费流程icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141270920?spm=1001.2014.3001.5501

/
/admin
/brokers
/cluster
/config
/consumers
/controller
/controller_epoch
/feature
/isr_change_notification
/latest_producer_id_block
/log_dir_event_notification
/zookeeper
/admin/delete_topics
/brokers/ids
/brokers/seqid
/brokers/topics
/brokers/ids/0
/brokers/ids/1
/brokers/ids/2
/brokers/topics/__consumer_offsets
/brokers/topics/test-topic
/brokers/topics/test-topic/partitions
/brokers/topics/test-topic/partitions/0
/brokers/topics/test-topic/partitions/1
/brokers/topics/test-topic/partitions/2
/brokers/topics/test-topic/partitions/0/state
/brokers/topics/test-topic/partitions/1/state
/brokers/topics/test-topic/partitions/2/state
/cluster/id
/config/brokers
/config/changes
/config/clients
/config/ips
/config/topics
/config/users
/config/topics/__consumer_offsets
/config/topics/test-topic
/zookeeper/config
/zookeeper/quota

2. 不依赖Zookeeper搭建Kafka集群

Kafka 从版本 2.8.0 开始,引入了新的 KRaft 模式,这使得 Kafka 可以在不依赖 ZooKeeper 的情况下部署集群。在传统的 Kafka 架构中,ZooKeeper 用于管理和协调 Kafka 集群中的元数据,例如主题、分区、副本状态等。KRaft 模式则将这些管理功能集成到 Kafka 自身的控制器节点中,消除了对 ZooKeeper 的依赖。

1.下载Kafka二进制文件

第一步仍然是下载解压文件,和之前一样

2.更改Kafka配置

Kafka默认配置是基于Zookeeper的,所以我们首先要删除Zookeeper相关的配置,通过vim命令中/zookeeper查找内容,注释或删除掉相关配置,然后删除掉broker.id,排除了zookeeper模式相关配置的影响后在进行如下配置(不删除可能也不影响启动,不过最好删除,防止其他不可预料的行为)。

#指定 Kafka 节点的角色,可以是broker、controller,或两者兼具。
process.roles=broker,controller

#当前节点的唯一id
node.id=1

#kafka集群中担任controller角色的kafka服务地址
controller.quorum.voters=1@192.168.142.30:9093,2@192.168.142.31:9093,3@192.168.142.32:9093

#当前的节点中由controller角色,则需要配置一个name标识,必须大写
controller.listener.names=CONTROLLER

#为上面设置的controller标识设置一个ip地址,这个地址也是controller.quorum.voters中配置的地址
listeners=PLAINTEXT://192.168.142.30:9092,CONTROLLER://192.168.142.30:9093

#存储之前存储在Zookeeper中的元数据内容
metadata.log.dir=/var/lib/kafka/meta

#配置初始化broker角色的kafka实例的超时时间,默认是30s,这里配置高一点,下面会说原因
initial.broker.registration.timeout.ms=240000

在传统的zookeeper模式下,zookeeper担任着选举leader及各种集群元数据管理的工作,而在新版本的kraft模式下,controller角色的kafka服务实例接替了这一工作,代替zookeeper处理Leader选举以及元数据管理的工作,并将元数据存入metadata.log.dir配置的路径中。

3. 生成meta.properties文件

在传统的zookeeper模式下,kafka集群信息保存在zookeeper中,也就是说只要Kafka实例·将自己的信息注册在zookeeper中,它就是集群的一部分,区分不同集群的根据是不同的zookeeper集群。

而在kraft模式下,kafka并没有像redis或zookeeper那样,将集群中所有节点的ip地址配置到配置文件中,而是通过kafka-storage.sh可执行文件生成meta.properties文件,命令如下

bin/kafka-storage.sh format --config conf/server.properties --cluster-id 123
#config选项指定配置文件路径,获取配置文件中的nodeid
#cluster-id指定一个集群id

实际生产过程中,使用Kafka提供的bin/kafka-storage.sh random-uuid生成这两个id,bin/kafka-storage.sh random-uuid可以生成一个唯一id,用于nodeid或clusterid,这里我们定义简单的nodeid和clusterid用于测试Kafka集群搭建过程。

每个Kafka实例都要执行这个命令,生成一个meta.properties文件,文件内容如下:

#
#Sat Aug 17 02:55:59 PDT 2024
cluster.id=123
version=1
directory.id=AABGMd7o4oHy2t-qzVzhcQ
node.id=1

其中cluster.id就是集群标识,一个集群中的所有Kafka节点实例的meta.properties文件中的cluster.id必须相同,并且node.id必须不同。

Kafka没有像其他中间件那样将节点ip写在配置中,可能的原因是想增加可扩展性。如果将所有ip写在配置中,意味着如果我想新增一个节点则需要修改全部kafka实例的配置文件,而如果像Kafka这样,利用一个文件中的cluster-id选项区分是否是一个集群,这样当想要新增一个Kafka实例时,只需要让新增的Kafka实例生成的meta.properties中的集群id相同即可。

 4. 启动kafka集群

启动kafka集群命令和传统模式一样,都是通过kafka-server-start.sh可执行文件,启动Kafka服务。

bin/kafka-server-start.sh config/server.properties

#后台启动
nohup bin/kafka-server-start.sh /path/to/kraft-server.properties > kafka.out 2>&1 &

不同的时,在传统模式下,集群的元数据的处理是靠已经启动的zookeeper,这是各个Kafka实例可以随时启动。而在kraft模式下,集群的元数据处理依赖于controller节点,所以在启动broker角色的Kafka实例之前,必须将controller全部启动完成,然后broker才会正常启动。

但如果节点既是controller,又是broker呢?那么此时这个节点的controller角色将会正常启动,不过由于controller没有全部启动,broker角色无法初始化成功,在这种情况下,该Kafka实例会不断地循环初始化,直到controller节点全部启动后,broker角色才会初始化成功。一旦brocker角色循环初始化的时间超过了超时时间(默认是30s),Kafka服务就会启动失败。

所以在配置项中我们新增一项initial.broker.registration.timeout.ms=240000,将broker初始化的超时时间设置为240s,让我们由充足的时间启动全部的controller角色的Kafka实例。

5. 验证集群

验证集群和之前一样,通过在不同主机和Kafka实例上操作生产者,消费者在主题中生产或消费信息

1. 创建主题,并验证主题存在

2. 生产者生产数据

3. 消费者消费数据

与之前不同的是,kraft模式下,集群相关元数据存储controller角色,在 metadata.log.dir 配置项配置的文件夹下,内容如下

具体内容是什么我也不知道,也没必要深究。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2053462.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

make/Makefile - ACM 时间及 make 的检查更新

文章目录 一、ACM 时间修改时间改变时间读取时间统一更新时间 二、make 会检查文件的新旧现象gcc 没有检查的功能,这个工作是 make 干的make/Makefile 通过对比时间了解可执行程序是不是最新的新的可执行程序和新的 .o 文件 一、ACM 时间 Linux 下文件属性中有 ACM…

骑行耳机哪个品牌好,精选热门榜单前五名机型实测体验

骨传导耳机凭借不入耳佩戴更健康安全灯特点火遍骑行圈,然而,虽然骨传导耳机如此热门,但我们在选择的时候也要擦亮双眼,避免入手到一些劣质机型,近期也是特意为大家挑选了几款排行榜单前五名骨传导耳机推荐。 那么&…

你见过哪些独特的代码注释

你见过哪些独特的代码注释 代码注释独特的代码注释启发 作为开发者,或者说作为程序员,每天和代码打交道,那么自然也就需要代码注释。从程序员的角度来看,代码注释必不可少。好的代码注释不仅能帮助开发者更好地理解代码&#xff0…

系统架构师计算题(1)——计算机系统基础知识(上)

持续刷题,持续总结,持续更新! 目录 1. 文件系统 题型1:多级索引 2. 存储管理 题型1:页式存储 题型2:段式存储 题型3:段页式存储 3. 磁盘读取 题型1:访问耗时 4. RAID 题型1:计算容量 1. 文件系统 文件在系统中的存储结构有如下几种: (1) 连续结构。 连续结构…

计算机Java项目|基于SpringBoot的物流管理系统设计与实现

作者主页:编程指南针 作者简介:Java领域优质创作者、CSDN博客专家 、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验,被多个学校常年聘为校外企业导师,指导学生毕业设计并参…

2024年骨传导耳机大比拼:看看南卡、韶音、墨觉谁的实力更胜一筹?

作为一名数码产品博主,我能明显感觉到骨传导耳机近年来的快速发展。这类耳机让用户在享受清晰音质的同时,仍能保持对周围环境的感知,非常适合户外活动。所以有很多小伙伴就想入手一款,但是又怕踩雷,没事不用怕&#xf…

武汉流星汇聚:亚马逊高效赋能中国卖家,共绘跨境电商发展新蓝图

在全球电商领域,亚马逊始终以其独特的商业模式、卓越的技术实力和对用户体验的极致追求,引领着行业的潮流与发展。展望未来,亚马逊的发展前景依旧充满无限可能与光明,其将继续在全球跨境电商市场中扮演核心角色,为中国…

AI搜索引擎Perplexica的本地部署(之二)Perplexica的非docker安装

Perplex 是一个开源的AI 驱动的搜索引擎,可以使用 Grok 和 Open AI 等模型在计算机上本地安装和运行。它为学术研究、写作、YouTube 和 Reddit 提供了一系列搜索功能。用户可以通过选择不同的模型、设置本地嵌入模型和探索各种搜索选项来定制他们的体验。该工具演示…

异构数据同步 datax (2)-postgres 写扩展

1、postgres SQL 支持 插入更新操作(与mysql 语法有一定差异) 可参考下面文章 MySQL PostgreSQL批量插入更新insertOrUpdate_mysql insert update-CSDN博客 2、datax中,可通过源码调整来实现 参考来源 https://juejin.cn/post/71248991…

如何使用GPT-SoVITSS生成各种角色的语言

百度网盘 请输入提取码 项目来自b站UP主花儿不哭 一,先除去背景声音————人生伴奏出去背景声音 1.下载后,按下面路径打开,打开文件beta,打开go-webui程序 回车,然后稍等一下,等待网页打开 2.勾选如下…

解决部分软件在 Linux 下截屏黑屏,远程控制黑屏的问题

解决部分软件在 Linux 下截屏黑屏,远程控制黑屏的问题 1.黑屏问题 某些 Linux 发行版本默认使用的是Wayland显示协议,比如 ubuntu 22.04 以上版本、fedora、manjaro 等版本。某些 Linux 软件在使用 Wayland 显示服务器协议时,截屏时屏幕是黑…

嵌入式面经篇九——网络编程

文章目录 前言一、网络编程1、列举一下 OSI 协议的各种分层。说说你最熟悉的一层协议的功能。2、TCP/IP协议包括?3、TCP通信建立和释放的过程?端口的作用?4、IP地址转换成物理地址的协议?反之?5、IP 地址的编码分为哪两…

海外媒体发稿的投稿策略:如何撰写高质量的新闻稿?

发布国外新闻稿件是一个涉及多步骤的过程,旨在确保您的新闻稿能够有效覆盖目标受众。以下是一些关键步骤和实用的技巧,帮助你实现海外媒体发稿。 1. 明确目标和受众 首先,明确您发布新闻稿的目标,是为了增加品牌曝光、推出新产品…

AIGC:text2img - 文生图

当前手头上的定制化项目,可用训练数据较少,训练的模型效果不佳。所以通过 clip-interrogator 获取图片获取描述后,批量进行 文生图 以增加样本量。 在批量生成前,先简单评测一下当前的主流 文生图 模型。直接上效果: …

SQL非技术快速入门39题

※食用指南:文章内容为牛客网《非技术快速入门》39道题重点笔记,用于重复思考错题,加深印象。 练习传送门:SQL非技术快速入门39题 目录: SQL13 Where in 和Not in SQL19 分组过滤练习题 SQL20 分组排序练习题 SQL2…

DDPM | 扩散模型代码详解【较为详细细致!!!】

文章目录 1、UNet网络结构1.1 residual网络和attention网络的细节1.2 t 的作用1.3 DDPM 中的 Positional Embedding 的使用1.4 DDPM 中的 Positional Embedding 代码1.5 residual block1.6 attention block1.7 UNet结构 2、命令行参数解析3、数据的获取与预处理4、模型的训练框…

stm32的UART重定向printf()

1配置好uart 2打开usart.c文件 3在此文件前面添加头文件 4在末尾添加重定向代码 添加的代码 /* USER CODE BEGIN 1 *///加入以下代码,支持printf函数,而不需要选择use MicroLIB //#define PUTCHAR_PROTOTYPE int fputc(int ch, FILE *f) #if 1 //#pragma import(__use_n…

microsoft edge怎么关闭安全搜索

microsoft edge浏览器为用户提供了安全搜索功能,旨在帮助用户过滤掉搜索结果中出现的不当信息。然而,有些用户可能觉得安全搜索功能限制了他们的浏览体验或工作需求。下面就给大家带来关闭microsoft edge安全搜索的相关内容,一起来看看吧。&a…

java 函数接口Consumer简介与示例【函数式编程】【Stream】

Java 8 中的 消费者接口Consumer 是一个函数接口,它可以接受一个泛型 类型参数,它属于java.util.function包。 accept(T) 方法:是 Consumer 函数式接口的方法,传入单个输入参数,无返回值,可以用于 Lambda 表…

日本央行还会加息?机构与市场唱反调!

最近,关于日本央行是否会继续加息的话题引发了市场热议。一边是市场对加息预期大幅下降,另一边却有像先锋、M&G这样的国际知名资产管理公司坚定地看好日本央行的进一步紧缩。 这究竟是怎么回事呢? 市场与机构的观点分歧 市场看跌加息&am…