kafka3.X集群安装(不使用zookeeper)

news2025/1/11 2:39:47

参考: 【kafka专栏】不用zookeeper怎么安装kafka集群-最新kafka3.0版本

一、kafka集群实例角色规划

在本专栏的之前的一篇文章《kafka3种zk的替代方案》已经为大家介绍过在kafka3.0种已经可以将zookeeper去掉。

上图中黑色代表broker(消息代理服务),褐色/蓝色代表Controller(集群控制器服务)

  • 左图(kafka2.0):一个集群所有节点都是broker角色,kafka从三个broker中选举出来一个Controller控制器,控制器将集群元数据信息(比如主题分类、消费进度等)保存到zookeeper,用于集群各节点之间分布式交互。
  • 右图(kafka3.0):假设一个集群有四个broker,指定三个作为Conreoller角色(蓝色),从三个Controller中选举出来一个Controller作为主控制器(褐色),其他的2个备用。zookeeper不再被需要!相关的元数据信息以kafka日志的形式存在(即:以消息队列消息的形式存在)。
  • controller通信端口:9093, 作用与zk的2181端口类似 。

在搭建kafka3.0集群之前, 我们需要先做好kafka实例角色规划。(四个broker, 需要通过主动配置指定三个作为Controller, Controller需要奇数个, 这一点和zk是一样的)

主机名称ip角色node.id
kafka-vm1192.168.1.111broker,controller1
kafka-vm2192.168.1.112broker,controller2
kafka-vm3192.168.1.113broker,controller3
kafka-vm4192.168.1.114broker4

二、准备工作

  • kafka3.x不再支持JDK8,建议安装JDK11或JDK17。
  • 新建kafka持久化日志数据mkdir -p /data/kafka;并保证安装kafka的用户具有该目录的读写权限。

各个机器节点执行:


  
  
  1. # 安装jdk(kafka3.x不再支持JDK8,建议安装JDK11或JDK17, 这里安装jdk11)
  2. # 下载安装jdk11, 参考: https://blog.csdn.net/justlpf/article/details/127268046
  3. # 下载kafka
  4. adduser kafka
  5. cd /opt
  6. wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
  7. tar -xf kafka_2.12-3.3.1.tgz
  8. chown -R kafka:kafka kafka_2.12-3.3.1*
  9. mkdir -p /data/kafka
  10. chown -R kafka:kafka /data/kafka
vi /etc/hosts,各个节点,添加如下内容:

   
   
  1. 192.168.1.111 data-vm1
  2. 192.168.1.112 data-vm2
  3. 192.168.1.113 data-vm3
  4. 192.168.1.114 data-vm4

三、修改Kraft协议配置文件

在kafka3.x版本中,使用Kraft协议代替zookeeper进行集群的Controller选举,所以要针对它进行配置。

vi /opt/kafka_2.12-3.3.1/config/kraft/server.properties
   
   

具体配置参数如下:


   
   
  1. # data-vm1节点
  2. node.id=1
  3. process.roles=broker,controller
  4. listeners=PLAINTEXT://data-vm1:9092,CONTROLLER://data-vm1:9093
  5. advertised.listeners=PLAINTEXT://:9092
  6. controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
  7. log.dirs=/data/kafka/
  8. # data-vm2节点
  9. node.id=2
  10. process.roles=broker,controller
  11. listeners=PLAINTEXT://data-vm2:9092,CONTROLLER://data-vm2:9093
  12. advertised.listeners=PLAINTEXT://:9092
  13. controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
  14. log.dirs=/data/kafka/
  15. # data-vm3节点
  16. node.id=3
  17. process.roles=broker,controller
  18. listeners=PLAINTEXT://data-vm3:9092,CONTROLLER://data-vm3:9093
  19. advertised.listeners=PLAINTEXT://:9092
  20. controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
  21. log.dirs=/data/kafka/
  • node.id:这将作为集群中的节点 ID,唯一标识,按照我们事先规划好的(上文),在不同的服务器上这个值不同。其实就是kafka2.0中的broker.id,只是在3.0版本中kafka实例不再只担任broker角色,也有可能是controller角色,所以改名叫做node节点。
  • process.roles:一个节点可以充当broker或controller或两者兼而有之。按照我们事先规划好的(上文),在不同的服务器上这个值不同。多个角色用逗号分开。
  • listeners: broker将使用9092端口,而kraft controller控制器将使用9093端口。
  • advertised.listeners: 这里指定kafka通过代理暴漏的地址,如果都是局域网使用,就配置PLAINTEXT://:9092即可。
  • controller.quorum.voters:这个配置用于指定controller主控选举的投票节点,所有process.roles包含controller角色的规划节点都要参与,即:zimug1、zimug2、zimug3。其配置格式为:node.id1@host1:9093,node.id2@host2:9093
  • log.dirs:kafka 将存储数据的日志目录,在准备工作中创建好的目录。

所有kafka节点都要按照上文中的节点规划进行配置,完成config/kraft/server.properties配置文件的修改。

四、格式化存储目录

生成一个唯一的集群ID(在一台kafka服务器上执行一次即可),这一个步骤是在安装kafka2.0版本的时候不存在的。


   
   
  1. $ /opt/kafka_2.12-3.3.1/bin/kafka-storage.sh random-uuid
  2. SzIhECn-QbCLzIuNxk1A2A

使用生成的集群ID+配置文件格式化存储目录log.dirs,

所以这一步确认配置及路径确实存在,

并且kafka用户有访问权限(检查准备工作是否做对)。

每一台主机服务器都要执行命令:


   
   
  1. /opt/kafka_2.12-3.3.1/bin/kafka-storage.sh format \
  2. -t SzIhECn-QbCLzIuNxk1A2A \
  3. -c /opt/kafka_2.12-3.3.1/config/kraft/server.properties

格式化操作完成之后,log.dirs​目录下多出一个Meta.properties文件​,存储了当前的kafka节点的id(node.id),当前节点属于哪个集群(cluster.id)


  
  
  1. [root@data-vm2 ~] # ll /data/kafka/
  2. 总用量 8
  3. -rw-r--r--. 1 root root 249 10月 11 18:23 bootstrap.checkpoint
  4. -rw-r--r--. 1 root root 86 10月 11 18:23 meta.properties
  5. $ cat /data/kafka/meta.properties
  6. #
  7. #Tue Apr 12 07:39:07 CST 2022
  8. node.id=1
  9. version=1
  10. cluster.id=SzIhECn-QbCLzIuNxk1A2A

五、 启动集群,完成基础测试

zimug1 zimug2 zimug3是三台应用服务器的主机名称(参考上文中的角色规划),实现方式已经在本专栏《linux主机与ip解析》中进行了说明。将下面的命令集合保存为一个shell脚本,并赋予执行权限。执行该脚本即可启动kafka集群所有的节点,前提是:你已经按照本专栏的《集群各节点之间的ssh免密登录》安装方式做了集群各节点之间的ssh免密登录。

启动命令:


  
  
  1. bin/kafka-server-start.sh \
  2. /opt/kafka_2.12-3.3.1/config/kraft/server.properties
  3. # 后台运行
  4. nohup bin/kafka-server-start.sh \
  5. /opt/kafka_2.12-3.3.1/config/kraft/server.properties 2>&1 &

脚本: 


  
  
  1. #!/bin/bash
  2. kafkaServers= 'data-vm1 data-vm2 data-vm3'
  3. #启动所有的kafka
  4. for kafka in $kafkaServers
  5. do
  6. ssh -T $kafka << EOF
  7. nohup /opt/kafka_2.12-3.3.1/bin/kafka-server-start.sh /opt/kafka_2.12-3.3.1/config/kraft/server.properties 1>/dev/null 2>&1 &
  8. EOF
  9. echo 从节点 $kafka 启动kafka3.0...[ done ]
  10. sleep 5
  11. done

六、一键停止集群脚本

一键停止kafka集群各节点的脚本,与启动脚本的使用方式及原理是一样的。

停止命令:

/opt/kafka_2.12-3.3.1/bin/kafka-server-stop.sh
  
  

执行脚本:


  
  
  1. #!/bin/bash
  2. kafkaServers= 'data-vm1 data-vm2 data-vm3'
  3. #停止所有的kafka
  4. for kafka in $kafkaServers
  5. do
  6. ssh -T $kafka << EOF
  7. cd /opt/kafka_2.12-3.3.1
  8. bin/kafka-server-stop.sh
  9. EOF
  10. echo 从节点 $kafka 停止kafka...[ done ]
  11. sleep 5
  12. done

七、测试Kafka集群

7.1 创建topic


  
  
  1. [root@data-vm1 kafka_2.12-3.3.1] # bin/kafka-topics.sh \
  2. --create \
  3. --topic quickstart-events \
  4. --bootstrap-server data-vm4:9092
  5. Created topic quickstart-events.
  6. [root@data-vm1 kafka_2.12-3.3.1] #
  7. #
  8. [root@data-vm1 kafka_2.12-3.3.1] # bin/kafka-topics.sh \
  9. --create \
  10. --topic quickstart-events \
  11. --bootstrap-server data-vm1:9092,data-vm2:9092,data-vm3:9092

7.2 查看topic列表


  
  
  1. bin/kafka-topics.sh \
  2. --list \
  3. --bootstrap-server data-vm4:9092
  4. #
  5. bin/kafka-topics.sh \
  6. --list \
  7. --bootstrap-server data-vm1:9092,data-vm2:9092,data-vm3:9092,data-vm4:9092

7.3 查看消息详情


  
  
  1. [root@data-vm1 kafka_2.12-3.3.1] # bin/kafka-topics.sh \
  2. --describe \
  3. --topic quickstart-events \
  4. --bootstrap-server data-vm3:9092
  5. Topic: quickstart-events TopicId: zSOJC6wNRRGQ4MudfHLGvQ PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
  6. Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1 Isr: 1
  7. [root@data-vm1 kafka_2.12-3.3.1] #

7.4 生产消息


  
  
  1. [root@data-vm1 kafka_2.12-3.3.1] # bin/kafka-console-producer.sh \
  2. --topic quickstart-events \
  3. --bootstrap-server data-vm1:9092
  4. # 参考: 创建并配置topic
  5. bin/kafka-topics.sh \
  6. --bootstrap-server localhost:9092 \
  7. --create \
  8. --topic my-topic \
  9. --partitions 1 \
  10. --replication-factor 1 \
  11. --config max.message.bytes=64000 \
  12. --config flush.messages=1
  13. # ------------------------- 参考 ------------------------ #
  14. # 1: 修改已创建topic配置
  15. # (Overrides can also be changed or set later using the alter configs command.)
  16. bin/kafka-configs.sh \
  17. --bootstrap-server localhost:9092 \
  18. --entity-type topics \
  19. --entity-name my-topic \
  20. --alter \
  21. --add-config max.message.bytes=128000
  22. # 2: 检查已修改的topic配置是否生效
  23. # (To check overrides set on the topic you can do)
  24. bin/kafka-configs.sh \
  25. --bootstrap-server localhost:9092 \
  26. --entity-type topics \
  27. --entity-name my-topic \
  28. --describe
  29. # 3. 恢复到原来的配置
  30. # (To remove an override you can do)
  31. bin/kafka-configs.sh \
  32. --bootstrap-server localhost:9092 \
  33. --entity-type topics \
  34. --entity-name my-topic \
  35. --alter \
  36. --delete-config max.message.bytes
  37. # 4. 增加分区数
  38. # (To add partitions you can do)
  39. bin/kafka-topics.sh \
  40. --bootstrap-server broker_host:port \
  41. --alter \
  42. --topic my_topic_name \
  43. --partitions 40
  44. # 5. 添加配置
  45. # (To add configs:)
  46. bin/kafka-configs.sh \
  47. --bootstrap-server broker_host:port \
  48. --entity-type topics \
  49. --entity-name my_topic_name \
  50. --alter \
  51. --add-config x=y
  52. # 6. 移除配置
  53. # (To remove a config:)
  54. bin/kafka-configs.sh \
  55. --bootstrap-server broker_host:port \
  56. --entity-type topics \
  57. --entity-name my_topic_name \
  58. --alter \
  59. --delete-config x
  60. # 7. 删除topic
  61. # (And finally deleting a topic:)
  62. bin/kafka-topics.sh \
  63. --bootstrap-server broker_host:port \
  64. --delete \
  65. --topic my_topic_name

7.5 消费消息


  
  
  1. bin/kafka-console-consumer.sh \
  2. --topic quickstart-events \
  3. --from-beginning \
  4. --bootstrap-server data-vm4:9092

7.6 查看消费者组


  
  
  1. # 检查消费者postition
  2. # Checking consumer position
  3. bin/kafka-consumer-groups.sh \
  4. --bootstrap-server localhost:9092 \
  5. --describe \
  6. --group my-group
  7. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  8. my-topic 0 2 4 2 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1
  9. my-topic 1 2 3 1 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1
  10. my-topic 2 2 3 1 consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2 /127.0.0.1 consumer-2

7.7 查看消费者组列表


  
  
  1. # list all consumer groups across all topics
  2. bin/kafka-consumer-groups.sh \
  3. --bootstrap-server localhost:9092 \
  4. --list
  5. test-consumer-group
  6. # To view offsets, as mentioned earlier,
  7. # we "describe" the consumer group like this:
  8. bin/kafka-consumer-groups.sh \
  9. --bootstrap-server localhost:9092 \
  10. --describe \
  11. --group my-group
  12. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  13. topic3 0 241019 395308 154289 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
  14. topic2 1 520678 803288 282610 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
  15. topic3 1 241018 398817 157799 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
  16. topic1 0 854144 855809 1665 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
  17. topic2 0 460537 803290 342753 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
  18. topic3 2 243655 398812 155157 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4
  19. # 更多配置参考:
  20. # https://kafka.apache.org/32/documentation.html#uses

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1132679.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DLT645转modbus协议网关采集电表的数据方法

DLT645有两个版本分别是DLT645-97和DLT645-07&#xff0c;该协议主要用于电表抄表&#xff0c;采用为主-从结构的半双工通讯模式&#xff0c;硬件接口使用RS-485今天我们来看下&#xff0c;用远创智控YC-645-TCP网关如何采集电表的数据 1&#xff0c;首先&#xff0c;我们需要…

淘宝京东抖音1688苏宁等关键词搜索商品API接口(关键词搜索商品API接口,关键词搜索商品列表接口)

淘宝京东抖音1688苏宁等关键词搜索商品API接口&#xff08;关键词搜索商品API接口&#xff0c;关键词搜索商品列表接口&#xff09;代码对接如下&#xff1a; item_search-按关键字搜索淘宝商品 taobao.item_search 1.公共参数 名称类型必须描述keyString是调用key&#xf…

UG\NX二次开发 获取面的面面积、周长

文章作者:里海 来源网站:王牌飞行员_里海_里海NX二次开发3000例,里海BlockUI专栏,C\C++-CSDN博客 感谢粉丝订阅 感谢 weixin_38891498 订阅本专栏,非常感谢。 简介 UG\NX二次开发 获取面的面面积、周长 效果 代码 #include "me.hpp" #include <NXOpen/Session…

docker部署prometheus+grafana服务器监控(二) - 安装数据收集器 node-exporter

在目标服务器安装数据收集器 node-exporter 1. 安装数据收集器 node-exporter wget https://github.com/prometheus/node_exporter/releases/download/v1.6.1/node_exporter-1.6.1.linux-amd64.tar.gztar xvf node_exporter-1.6.1.linux-amd64.tar.gzmv node_exporter-1.6.1…

【蓝桥】铺地板

1、题目 问题描述 小蓝家要装修了&#xff0c;小蓝爸爸买了很多块&#xff08;可理解为数量无限&#xff09; 2 3 2 \times 3 23 规格的地砖&#xff0c;小蓝家的地板是 n m n \times m nm 规格的&#xff0c;小蓝想问你&#xff0c;能否用这些 2 3 2 \times 3 23 的地砖…

【QT开发(14)】QT P2P chat 聊天

在【P2P学习&#xff08;2&#xff09;】P2P 通信&#xff0c;主要存在四种不同的网络模型的第一阶段&#xff1a;集中式P2P 模式 最简单的路由方式就是集中式&#xff0c;即存在一个中心节点保存了其他所有节点的索引信息&#xff0c;索引信息一般包括节点 IP 地址、端口、节…

YOLOv8训练自己的数据集+改进方法复现

yolov8已经出来好几个月了&#xff0c;并且yolov8从刚开始出来之后的小版本也升级好几次&#xff0c;总体变化不大&#xff0c;个别文件存放位置发生了变化&#xff0c;以下以最新版本的YOLOv8来详细学习和使用YOLOv8完成一次目标检测。 一、环境按照 深度学习环境搭建就不再…

Cesium 展示——实现鼠标移动到实体上动态高亮显示

文章目录 需求分析需求 在开发中,遇到这样一个需求:在绘制完实体后,要求鼠标移动到上边后有高亮的效果,看的清除一点,因此,经过尝试,做出了如下解决方案 在这里,我们以线为例,实现其动态高亮显示 分析 在这里我们首先需要有一个鼠标监听事件,在合适的位置注册鼠标监听…

uni.showModal的用法

uni.showModal({title: 提示,//标题content: "内容",//提示内容可以加入\r\n进行换行showCancel: true,//是否显示取消按钮&#xff0c;默认为truecancelText: 取消,//取消按钮的文字confirmText: 确定,//确认按钮的文字confirmColor: #ff0000,//确认按钮文字颜色can…

chrony时间服务

目录 1.1.重要性 1.2. Linux的两个时钟 1.3. NTP 1.4. Chrony介绍 2.安装与配置 2.1.安装: 2.2. Chrony配置文件分析 3.实验 3.1实验1 3.2实验2 3.常见时区 1.1.重要性 ●由于IT系统中&#xff0c;准确的计时非常重要&#xff0c;有很多种原因需要准确计时: 。在网络…

Netty核心源码剖析

Netty 线程模型 Netty高并发高性能架构设计精髓 主从Reactor线程模型NIO多路复用非阻塞无锁串行化设计思想支持高性能序列化协议零拷贝(直接内存的使用)ByteBuf内存池设计灵活的TCP参数配置能力并发优化 无锁串行化设计思想 在大多数场景下&#xff0c;并行多线程处理可以提…

网络原理之TCP协议(超详细 干货满满)

文章目录 前言TCP 协议的段格式TCP 协议的相关特性什么叫做可靠传输TCP 采用了哪些主要机制保证了可靠传输和优化传输效率1. 确认应答2. 超时重传3. 连接管理&#xff08;三次握手、四次挥手&#xff09;三次握手&#xff08;建立连接&#xff09;四次挥手&#xff08;断开连接…

【方法】如何给PDF文件添加“打开密码”?

PDF文件可以在线浏览&#xff0c;但如果想要给文件添加“打开密码”&#xff0c;就需要用到软件工具&#xff0c;下面小编分享两种常用的工具&#xff0c;小伙伴们可以根据需要选择。 工具一&#xff1a;PDF编辑器 PDF阅读器一般是没有设置密码的功能模块&#xff0c;PDF编辑器…

全志A523(显示篇一)

全志使用de架构&#xff0c;兼容drm架构 返回目录

全面的‘由于找不到mfc110u.dll,无法继续执行代码’的解决方法分享,3分钟教你快速修复

在我们使用电脑的过程中&#xff0c;有时候可能会遇到某个应用程序启动失败&#xff0c;提示“由于找不到mfc110u.dll,无法继续执行代码”的问题。本文将详细介绍如何针对这类问题进行处理&#xff0c;以及mfc110u.dll文件的相关知识。 一.mfc110u.dll文件盘点 首先&#xff0…

用豆瓣电影和掌桥科研练习网页解析的三种方式——正则、Xpath和bs4

网页解析 豆瓣电影解析方式正则表达式Xpathbs4 翻页 掌桥科研正则表达式Xpathbs4 豆瓣电影 解析方式 先爬取数据&#xff1a; # -- coding: utf-8 --** import requests import json import time import pandas as pdurlhttps://movie.douban.com/top250?start0&filter…

【带头学C++】----- 1.基础知识 ---- 1.21.23.9 位运算符的综合应用

最近做任务&#xff0c;公司项目比较重&#xff0c;赶上1024的活动流量券任务&#xff0c;内容治疗略微有一些杂乱&#xff0c;后期会把专栏目录重新搞一下&#xff0c;内容我是融合了很多课程和书籍包含ai的一些理解&#xff0c;我整理和增加了自己的见解和代码贴图&#xff0…

【java学习—八】关键字static(4)

文章目录 1. 前言2. 关键字static3. 代码理解3.1. 类变量3.2. 类方法3.3. 工具类3.4. 总结 4. 注意事项 1. 前言 当我们编写一个类时&#xff0c;其实就是在描述其对象的属性和行为&#xff0c;而并没有产生实质上的对象&#xff0c;只有通过 new 关键字才会产生出对象&#xf…

10-16/10-17 JavaWeb入门/servlet

JavaWeb 现实生活中的互联网项目都是javaWeb项目, 包含网络, 多线程, 展示: HTML等其他的前端技术, 界面窗体展示(Swing包,AWT包 窗体), C#, JAVAWeb架构:(面试重点&#xff1a;要求记忆) B/S: 浏览器/服务器 优点: 以浏览器作为客户端, 使用这个软件, 用户不需要下载客户端,…

Spring Cloud之服务熔断与降级(Hystrix)

目录 Hystrix 概念 作用 服务降级 简介 使用场景 接口降级 服务端服务降级 1.添加依赖 2.定义接口 3.实现接口 4.Controller类使用 5.启动类添加注释 6.浏览器访问 客户端服务降级 1.添加依赖 2.application.yml 中添加配置 3.定义接口 4.Controller类使用 …