Linux平台Kafka高可用集群部署全攻略

news2025/1/13 3:01:56

🐇明明跟你说过:个人主页

🏅个人专栏:《大数据前沿:技术与应用并进》🏅

🔖行路有良友,便是天堂🔖

目录

一、引言

1、Kafka简介

2、Kafka核心优势

二、环境准备

1、服务器

2、服务器环境初始化

三、安装zookeeper

1、上传tar包

2、编辑配置文件

3、创建数据目录

4、安装JAVA

5、启动zookeeper

四、Kafka集群搭建

1、上传tar包

2、编辑配置文件

3、创建数据目录

4、启动kafka

5、查看端口状态

五、测试 

1、创建Topic

2、生产消息 

3、消费消息 


一、引言

1、Kafka简介

Apache Kafka 是一个开源的分布式流处理平台,它最初由 LinkedIn 开发,后来成为 Apache 软件基金会的顶级项目。Kafka 设计用于处理实时数据流,提供了一种高效、可扩展、持久化的方式来进行数据发布和订阅。它通常被描述为一种分布式发布-订阅消息队列,但它实际上超越了传统消息队列的概念。

2、Kafka核心优势

1. 高吞吐量:

  • Kafka 能够处理海量数据,支持每秒数十万条消息的读写操作,即使在大规模部署中也能保持高性能。
  • 通过高效的文件系统设计和内存管理机制,Kafka 能够在处理大量数据的同时保持低延迟。


2. 持久性和可靠性:

  • Kafka 将数据存储在磁盘上,并支持数据复制(replication),确保即使在节点故障的情况下也能保证数据的可靠性和持久性。
  • 数据以追加的方式写入日志文件,减少了磁盘的随机写操作,提高了写入速度和数据完整性。


3. 可扩展性:

  • Kafka 具有良好的水平扩展能力,可以通过增加更多的节点来提升系统的处理能力和存储容量。
  • 分布式架构使得 Kafka 能够轻松地在多台服务器上部署,并且能够动态扩展和收缩集群大小。


4. 灵活的发布-订阅模型:

  • Kafka 支持发布-订阅模式,允许多个消费者订阅同一个主题,并且消费者可以独立消费消息。
  • 消费者可以控制自己的消费进度,不会影响其他消费者的状态,实现了消息消费的解耦。

二、环境准备

1、服务器

准备3台或者5台Linux服务器,用来组建高可用集群,这里使用3台Centos 7.9来进行搭建,大家也可以使用其他的Linux发行版本

配置如下:

2、服务器环境初始化

3台机器都要执行

关闭Selinux

vi /etc/selinux/config

#修改成如下
SELINUX=disabled

之后重启服务器

reboot

关闭并禁用防火墙

[root@kafka1 ~]# systemctl stop firewalld && systemctl disable firewalld

修改 /etc/hosts

vi /etc/hosts

# 添加以下内容
192.168.40.100  kafka1
192.168.40.101  kafka2
192.168.40.102  kafka3

修改镜像源 

curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo

三、安装zookeeper

为什么安装Kafka时,要先安装zookeeper:

ZooKeeper 是一个分布式的协调服务,它为分布式应用程序提供了一套完整的协调服务功能,包括命名服务、配置管理、集群管理和同步等。Kafka 利用 ZooKeeper 来管理其集群中的多个组件,确保系统的稳定性和一致性。

1、上传tar包

apache-zookeeper-3.8.0-bin.tar.gz

tar包可以去官网进行下载 Apache ZooKeepericon-default.png?t=O83Ahttps://zookeeper.apache.org/releases.html#download

解压tar包至 /opt 下

tar zxvf apache-zookeeper-3.8.0-bin.tar.gz -C /opt

2、编辑配置文件

vim /opt/apache-zookeeper-3.8.0-bin/conf/zoo.cfg

# 输入如下内容
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/zkData
dataLogDir=/opt/zookeeper/zkLog
clientPort=2181
server.1=kafka1:2188:3888
server.2=kafka2:2188:3888
server.3=kafka3:2188:3888
4lw.commands.whitelist=*

tickTime=2000:

  • tickTime 定义了 ZooKeeper 服务器之间的心跳间隔时间(毫秒)。它是 ZooKeeper 中最基本的单位时间。默认值通常是 2000 毫秒(即 2 秒)。


initLimit=10:

  • initLimit 定义了初始同步阶段的最大超时时间(心跳次数)。这意味着在初始同步阶段,跟随者(follower)必须在 initLimit * tickTime 毫秒内完成与领导者(leader)的同步。例如,这里设置为 20 秒(10 * 2000 毫秒)。


syncLimit=5:

  • syncLimit 定义了在领导者和跟随者之间发送消息的最大超时时间(心跳次数)。这意味着在同步阶段,跟随者必须在 syncLimit * tickTime 毫秒内响应领导者的请求。例如,这里设置为 10 秒(5 * 2000 毫秒)。


dataDir=/opt/zookeeper/zkData:

  • dataDir 指定 ZooKeeper 服务器用来存储快照(snapshot)的目录。


dataLogDir=/opt/zookeeper/zkLog:

  • dataLogDir 指定 ZooKeeper 服务器用来存储事务日志(transaction logs)的目录。这是从 ZooKeeper 3.4.6 开始引入的一个配置项,使得日志和数据可以分开存储。


clientPort=2181:

  • clientPort 指定客户端连接到 ZooKeeper 服务器的端口,默认为 2181。


server.1=ka1:2188:3888:

  • server.N 表示第 N 台服务器的信息,格式为 hostname:peerPort:leaderPort。peerPort 是服务器之间通信的端口,leaderPort 是选举领导者时使用的端口。


4lw.commands.whitelist=*:

  • 4lw.commands.whitelist 指定客户端可以执行的命令白名单。* 表示允许所有命令。 

3、创建数据目录

mkdir -p /opt/zookeeper/zkData 
mkdir -p /opt/zookeeper/zkLog

创建集群ID文件

在3台机器上分别执行

 

[root@kafka1 bin]# echo 1 >  /opt/zookeeper/zkData/myid
[root@kafka2 bin]# echo 2 >  /opt/zookeeper/zkData/myid
[root@kafka3 bin]# echo 3 >  /opt/zookeeper/zkData/myid

4、安装JAVA

yum install -y java-1.8.0-openjdk-devel

5、启动zookeeper

cd /opt/apache-zookeeper-3.8.0-bin/bin/ 
./zkServer.sh start ../conf/zoo.cfg &

查看状态

[root@kafka1 bin]# ps -aux | grep zook

四、Kafka集群搭建

1、上传tar包

资源包大家可以到官网下载

https://kafka.apache.org/

解压至指定目录

[root@kafka1 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/
[root@kafka2 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/
[root@kafka3 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/

2、编辑配置文件

在kafka1上执行

[root@kafka1 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties

#输入如下内容
broker.id=0
listeners=PLAINTEXT://kafka1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs
num.partitions=5
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms=18000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

 在kafka2上执行

[root@kafka2 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties

#输入如下内容
broker.id=1
listeners=PLAINTEXT://kafka2:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs
num.partitions=5
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms=18000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

 在kafka3上执行

[root@kafka3 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties

#输入如下内容
broker.id=2
listeners=PLAINTEXT://kafka3:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs
num.partitions=5
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms=18000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

 

broker.id=0

  • 这是 Kafka broker 的唯一标识符。每个 broker 必须有唯一的 ID。这里的值为 0,意味着这是一个集群中的单个 broker 或者是第一个 broker。

listeners=PLAINTEXT://kafka1:9092

  • 定义了 broker 监听的网络接口和端口。此处使用 PLAINTEXT 协议,意味着没有加密。kafka1:9092 表示监听名为 kafka1 的主机上的 9092 端口。

num.network.threads=3

  • 指定了用于网络请求处理的线程数。网络请求包括接收来自生产者的消息、发送消息给消费者等操作。这里设置为 3 个线程。

num.io.threads=8

  • 指定了用于处理 I/O 请求的线程数。I/O 请求包括磁盘上的读写操作。这里设置为 8 个线程。

socket.send.buffer.bytes=102400

  • 设置了发送套接字的缓冲区大小(单位:字节)。此配置影响网络数据包的发送速度。此处设置为 102400 字节。

socket.receive.buffer.bytes=102400

  • 设置了接收套接字的缓冲区大小(单位:字节)。此配置影响网络数据包的接收速度。此处设置为 102400 字节。

socket.request.max.bytes=104857600

  • 定义了从客户端接收的最大请求大小(单位:字节)。这有助于防止因过大请求而导致的内存溢出。此处设置为 104857600 字节,即约 100MB。

log.dirs=/opt/kafka-logs

  • 指定了日志文件存储的位置。日志文件包含了 Kafka topic 的数据。这里设置的日志目录为 /opt/kafka-logs。

num.partitions=5

  • 指定了默认主题分区的数量。分区越多,通常意味着更高的并发度。这里设置的主题默认分区数为 5。

default.replication.factor=2

  • 指定了创建新主题时的默认复制因子。复制因子决定了每个分区的副本数量。这里设置的复制因子为 2,意味着每个分区有 2 份副本。

num.recovery.threads.per.data.dir=1

  • 指定了用于恢复日志段的线程数。每个数据目录可以有不同的线程数。这里设置为 1 个线程。

offsets.topic.replication.factor=1

  • 指定了 _consumer_offsets 主题的复制因子。此主题用于存储消费者的偏移量信息。这里设置的复制因子为 1,意味着只有一个副本。

transaction.state.log.replication.factor=1

  • 指定了 _transactions 主题的复制因子。此主题用于记录事务状态。这里设置的复制因子为 1,意味着只有一个副本。

transaction.state.log.min.isr=1

  • 指定了 _transactions 主题的最小 ISR(In-Sync Replicas)数量。ISR 是与 leader 同步的副本集合。这里设置的最小 ISR 数量为 1。

log.retention.hours=168

  • 指定了日志数据保留的时间长度(单位:小时)。这里设置的日志保留时间为 168 小时,即 7 天。

log.segment.bytes=1073741824

  • 指定了日志段的最大大小(单位:字节)。一旦达到这个大小,Kafka 就会创建一个新的日志段。这里设置的日志段大小为 1073741824 字节,即 1GB。

log.retention.check.interval.ms=300000

  • 指定了检查日志清理的间隔时间(单位:毫秒)。这里设置的检查间隔为 300000 毫秒,即 5 分钟。

zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka

  • 指定了 ZooKeeper 服务器列表。ZooKeeper 用于协调 Kafka 集群。这里设置的 ZooKeeper 服务器为 kafka1:2181, kafka2:2181, kafka3:2181,路径为 /kafka。

zookeeper.connection.timeout.ms=18000

  • 指定了 Kafka 与 ZooKeeper 之间的连接超时时间(单位:毫秒)。这里设置的超时时间为 18000 毫秒,即 18 秒。请注意,zookeeper.connection.timeout.ms 在配置中出现了两次,应该是误写,只需要保留一次即可。

group.initial.rebalance.delay.ms=0

  • 指定了消费者组初始重新平衡的延迟时间(单位:毫秒)。这里设置的延迟时间为 0,即立即开始重新平衡。

3、创建数据目录

在3台机器上分别执行

mkdir /opt/kafka-logs

4、启动kafka

[root@kafka1 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[root@kafka1 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@kafka2 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[root@kafka2 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@kafka3 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[root@kafka3 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

5、查看端口状态

[root@kafka1 bin]# netstat -antupl

五、测试 

1、创建Topic

前面我们已经将kafka集群搭建起来了,接下来创建一个Topic进行写入测试,如果不清楚Topic是什么,可以翻看作者之前的文章。

在kafka1上执行

[root@kafka1 bin]# cd /opt/kafka_2.13-3.1.0/bin
[root@kafka1 bin]# ./kafka-topics.sh --bootstrap-server=192.168.40.100:9092 --topic test --create --partitions=3 --replication-factor=2
  • --bootstrap-server:指定 Kafka broker 的地址和端口号。这里的 192.168.40.100:9092 指定了 broker 的 IP 地址为 192.168.40.100,端口号为 9092。
  • --topic:指定要操作的主题名称。在这个例子中,主题名为 test。
  • --create:告诉 Kafka 创建一个新主题。如果主题已经存在,这条命令将会失败,除非你配置了允许创建已存在的主题。
  • --partitions:指定主题的分区数。分区数决定了主题能够并行处理消息的能力。在这个例子中,主题 test 将会有 3 个分区。
  • --replication-factor:指定主题的复制因子。复制因子决定了每个分区的副本数量,这对于数据的冗余和可靠性非常重要。在这个例子中,主题 test 的每个分区将会有 2 个副本。

查看Topic

[root@kafka1 bin]# ./kafka-topics.sh --bootstrap-server=192.168.40.100:9092 --list

 

2、生产消息 

[root@kafka1 bin]# ./kafka-console-producer.sh --bootstrap-server 192.168.40.100:9092 --topic test

向我们刚刚创建的test Topic写入几条消息

3、消费消息 

[root@kafka1 bin]# ./kafka-console-consumer.sh --bootstrap-server=192.168.40.100:9092 --topic test --from-beginning

如果能看到之前生产的消息,则证明集群搭建成功

 💕💕💕每一次的分享都是一次成长的旅程,感谢您的陪伴和关注。希望这些关于大数据的文章能陪伴您走过技术的一段旅程,共同见证成长和进步!😺😺😺

🧨🧨🧨让我们一起在技术的海洋中探索前行,共同书写美好的未来!!!   

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2201651.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android SELinux——安全策略(三)

SELinux 通过严格的访问控制机制增强了 Linux 系统的安全性。它通过标签和安全策略来控制进程和文件的访问权限,从而保护系统免受未经授权的访问和攻击。 一、策略介绍 1、主要组件 安全标签(Security Labels):每个文件、目录、…

麒麟操作系统:解决sudo命令报错“Account or password is expired”问题

麒麟操作系统:解决sudo命令报错“Account or password is expired”问题 1、问题描述2、问题分析3、问题解决方法步骤1:检查账户和密码状态步骤2:处理账户过期问题步骤3:处理密码过期问题 💐The Begin💐点点…

电脑端视频通过PCIE到FPGA端图像缩放转UDP网络视频输出,基于XDMA+PHY芯片架构,提供3套工程源码和技术支持

目录 1、前言工程概述免责声明 2、相关方案推荐我已有的PCIE方案我这里已有的以太网方案本博已有的FPGA图像缩放方案 3、PCIE基础知识扫描4、工程详细设计方案工程设计原理框图电脑端视频PCIE视频采集QT上位机XDMA配置及使用XDMA中断模块FDMA图像缓存纯Verilog图像缩放模块详解…

Python - Modbus测试

python -Modbus测试 前言 本实例中,python版本是python 3.10, 使用的pymodbus安装包进行实现,如没有按照此包,需要先执行如下指令: pip install pymodbusModbus客户端的python实现 Modbus Client的客户端程序,通过…

【Java 问题】集合——List

List 1.说说有哪些常见集合?2.ArrayList和LinkedList有什么区别?3.ArrayList的扩容机制了解吗?4.ArrayList怎么序列化的知道吗? 为什么用transient修饰数组?5.快速失败(fail-fast)和安全失败(fail-safe)了解吗&#xf…

XILINX MIG驱动

简介 框架图 本章节主要针对MIG读写做详细介绍,首先创建BLOCK DESIGN,工程连接如下图所示: MIG IP介绍 DATAMOVER的配置这里不再做介绍,结合上篇文章讲到DATAMOVER对BRAM进行读写操作,这里通过AXI桥再加一个MIG模块,MIG模块的配置和说明如下: 1、Clock Period:…

记录一个Ajax发送JSON数据的坑,后端RequestBody接收参数小细节?JSON对象和JSON字符串的区别?

上半部分主要介绍我实际出现的问题,最终下面会有总结。 起因:我想发送post请求的data,但是在浏览器中竟然被搞成了地址栏编码 如图前端发送的ajax请求数据 如图发送的请求体: 很明显是keyvalue这种形式,根本就不是…

浏览器动态移动的小球源码分享

浏览器动态移动的小球源码分享 <script>(function(a){var width100,height100,borderRadius100,circlefunction(){};circle.prototype{color:function(){let colour "#"Math.floor(Math.random()*255).toString(16)Math.floor(Math.random()*255).toString…

基于SpringBoot剧本杀管理系统 【附源码】

基于SpringBoot剧本杀管理系统 效果如下&#xff1a; 系统首页界面 系统注册页面 剧本信息详细页面 后台登录界面 管理员主界面 剧本信息界面 剧本预约界面 作者主界面 研究背景 随着现代社会生活节奏的加快&#xff0c;人们越来越渴望通过各种娱乐活动来释放压力和增进社交…

开源 Three.js 案例及入门教程

Three.js入门教程目录 章节1认识three.js与开发环境搭建,rar 章节2 Three.js开发入门与调试设置,rar 章节3 Geometry进阶详解.rar 章节4 详解材质与纹理.rar 章节5 纹理材质高级操作,rar 章节6 详解灯光与阴影.rar 章节7 精通粒子特效.rar 章节8 详解光线投射与物体交互…

内容营销:基于大模型的内容再利用

在这篇文章中&#xff0c;我将带你了解一个完整的自动化系统&#xff0c;该系统可帮助你将任何内容重新用于你想要的其他帖子。 输入的内容可以是任何东西&#xff1a;YouTube 视频链接、博客文章链接、大纲、推文、pdf 等…… 我将在本文末尾为你提供所有代码。 一步一步地…

基于SpringBoot+Vue+Uniapp的仓库点单小程序的详细设计和实现

2. 详细视频演示 文章底部名片&#xff0c;联系我获取更详细的演示视频 3. 论文参考 4. 项目运行截图 代码运行效果图 代码运行效果图 代码运行效果图 代码运行效果图代码运行效果图 代码运行效果图 5. 技术框架 5.1 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发…

毕业设计 | Ruff开发板 + 华为云IoT物联网平台,实现家中温度、湿度、二氧化碳、PM2.5、甲醛监控分析...

基于温湿度、空气质量传感器实现温度、湿度、二氧化碳、PM2.5、甲醛环境数据实时监测。 硬件清单 我们采用 Ruff 开发板&#xff0c;串口连接温湿度传感器 DHT11 和空气质量传感器 SDS011&#xff0c;每5分钟采集一次数据&#xff0c;通过MQTT协议发送到华为云 IoT 物联网平台&…

QRTCN区间预测 | Matlab实现QRTCN时间卷积神经网络分位数回归区间预测

区间预测 | Matlab实现QRTCN时间卷积神经网络分位数回归区间预测 目录 区间预测 | Matlab实现QRTCN时间卷积神经网络分位数回归区间预测预测效果基本介绍模型特性程序设计参考资料预测效果 基本介绍 Matlab实现QRTCN时间卷积神经网络分位数回归区间预测 QRTCN(Quantile Regres…

Java数据类型常量

目录 一、数据类型 1.1分类 1.2关键字&内存占用&范围 1.3包装类 1.4说明 1.5类型转换 1.6类型提升 二、常量 2.1java中的常量 2.2定义常量 2.3分类 一、数据类型 1.1分类 1.2关键字&内存占用&范围 数据类型关键字内存占用范围字节型byte1字节-128…

【学术会议征稿】2024年信号处理与神经网络应用国际学术会议(SPNNA 2024)

2024年信号处理与神经网络应用国际学术会议&#xff08;SPNNA 2024&#xff09; 2024 International Conference on Signal Processing and Neural Network Applications 2024年信号处理与神经网络应用国际学术会议&#xff08;SPNNA 2024&#xff09;将于2024年12月13日至15…

scratch星际穿越 2024年9月中国电子学会图形化编程 少儿编程 scratch编程等级考试一级真题和答案解析

目录 scratch星际穿越 一、题目要求 1、准备工作 2、功能实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、 推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、…

DevExpress WinForms中文教程:Data Grid - 如何完成数据输入验证?

本教程介绍DevExpress WinForm的Data Grid控件是如何利用网格组件完成数据输入验证的。 P.S&#xff1a;DevExpress WinForms拥有180组件和UI库&#xff0c;能为Windows Forms平台创建具有影响力的业务解决方案。DevExpress WinForms能完美构建流畅、美观且易于使用的应用程序…

Python酷库之旅-第三方库Pandas(140)

目录 一、用法精讲 631、pandas.Timestamp类 631-1、语法 631-2、参数 631-3、功能 631-4、返回值 631-5、说明 631-6、用法 631-6-1、数据准备 631-6-2、代码示例 631-6-3、结果输出 632、pandas.Timestamp.asm8属性 632-1、语法 632-2、参数 632-3、功能 632…