本文介绍kafka的集群如何部署和安装,1-4章理论知识,第5章详解集群的部署,部署Kafka之前需要先部署好分布式的Zookeeper,不喜欢理论的可以直接看第5章,欢迎大家一起探讨技术!
Zookeeper集群部署参考文章:精通Zookeeper:详解分布式集群部署全程,掌握数据一致性、选举机制与集群容错能力-CSDN博客
关于Kafka的资料:
大数据技术之Kafka(最新版)资料+jar包+安装包+笔记+视频+代码集合,看完学会使用Kafka
关于Kafka进阶高手资料:
Kafka集群调优实战+分布式集群搭建,分布式集群搭建与调优实战,Kafka专家之路!课程内容全程实战,没有拖泥带水
Apache Kafka是一个开源的分布式消息系统,也是一个分布式流式计算平台。尽管它在流式计算方面有着强大功能,但在实际应用中,Kafka更多地被用作分布式消息队列。以下是对Kafka的详细解析:
一、Kafka的主要特点
- 高吞吐量:Kafka能够处理非常高的消息吞吐量,适用于大规模数据处理和实时数据流。
- 低延迟:Kafka具有较低的消息传递延迟,能够提供快速的消息传递服务。
- 可伸缩性:Kafka支持水平扩展,通过增加更多的节点来扩展处理能力和存储容量,保证系统的可靠性和性能。
- 持久性:Kafka使用磁盘存储消息,确保消息的持久性和可靠性,并支持消息的批量处理。
- 高可靠性:通过副本机制保证消息的可靠性,即使某些节点发生故障,也不会丢失消息。
- 分区:Kafka的消息被分成多个分区,每个分区可以在不同的服务器上进行写入和读取,提高了并发性能。
- 支持流处理:Kafka提供了强大的流处理功能,可以进行实时数据处理、转换和分析。
- 社区活跃:Kafka拥有庞大的开源社区支持,持续更新和改进,解决了许多实际场景中的数据处理问题。
二、Kafka的关键概念
- Topic:Kafka把收到的消息按Topic进行分类,可以理解为Topic是一种类别。
- Producer:消息生产者,即向Kafka broker发送消息的客户端。
- Consumer:消息消费者,即向Kafka broker取消息的客户端。消费者通过订阅Topic来消费消息。
- Consumer Group:消费者组,由多个Consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组间互不影响。
- Partition:分区,是实现Kafka高吞吐量的关键。一个Topic可以分成多个Partition,每个Partition是一个有序的队列。
- Replica:副本,为保证集群中某个节点发生故障时,该节点上的Partition数据不丢失,且Kafka仍然可以继续工作,Kafka提供了副本机制。每个Partition都有若干个副本,其中一个作为Leader,其他作为Follower。Leader负责处理所有读写请求,Follower实时从Leader同步数据。
三、Kafka的应用场景
- 日志处理与分析:Kafka可以有效地从各个实例收集日志流,并与其他系统(如ElasticSearch、Kibana)结合,提供日志的索引、搜索和可视化。
- 实时点击流分析:收集系统指标进行监控和故障排除。与日志分析类似,但处理的是结构化数据。
- 更改数据捕获(CDC):将数据库更改流式传输到其他系统以进行复制或缓存/索引更新。
- 构建数据管道:使用Kafka从各种来源获取数据、应用处理规则,并将数据存储在仓库、数据湖或数据网格中。
- 微服务解耦通信:Kafka可以作为微服务之间的消息队列,实现服务的解耦和异步通信。
- 事件溯源:捕获一系列事件中状态的变化,并作为系统中的事实来源,用于重建状态或进行其他处理。
四、Kafka的优缺点
优点:
- 高吞吐量、低延迟。
- 支持水平扩展和分区,提高并发性能。
- 持久性高,数据不丢失。
- 社区活跃,支持丰富。
缺点:
- 由于是批量发送,数据达不到真正的实时。
- 不支持MQTT协议和物联网传感数据直接接入。
- 只能保证一个分区内消息的有序性,无法实现全局消息有序。
- 监控不完善,需要安装插件。
- 可能会重复消费数据,需要额外的处理逻辑来确保数据的一致性。
五、集群部署
1、上传Kafka安装包
关于Kafka的资料和安装包+源码可以在这里下载
大数据技术之Kafka(2019新版)资料+jar包+安装包+笔记+视频+代码集合,看完学会使用Kafka
将Kafka的安装包上传到hadoop102机器下的/opt/software下
2、解压安装包
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
这个命令是用于在Linux或Unix-like系统中使用tar
工具来解压一个名为kafka_2.11-0.11.0.0.tgz
的tarball(tar归档文件,通常用于打包多个文件和目录)到指定的目录/opt/module/
下。命令的各个部分意义如下:
-
tar
:这是调用tar工具的命令。tar是一个在Unix和Unix-like系统中广泛使用的命令行程序,用于打包和解包文件。 -
-zxvf
:这是传递给tar
命令的选项,每个字母都代表一个不同的操作或行为。z
:表示通过gzip进行压缩或解压缩。这个选项告诉tar
命令,它要处理的归档文件(在这个例子中是kafka_2.11-0.11.0.0.tgz
)是用gzip压缩的。x
:表示解包或解压。这个选项告诉tar
命令要执行的操作是解压归档文件。v
:表示在操作过程中显示详细信息。这个选项会让tar
命令在执行时显示正在被解压的文件列表,便于用户了解解压进度。f
:表示后面跟随的是归档文件的名称。这个选项告诉tar
命令,接下来的参数(即kafka_2.11-0.11.0.0.tgz
)是要处理的归档文件的名称。
-
kafka_2.11-0.11.0.0.tgz
:这是要解压的归档文件的名称。文件名中的kafka
指的是Apache Kafka,一个分布式流处理平台;2.11
指的是这个Kafka版本是编译来兼容Scala 2.11的;0.11.0.0
是Kafka的版本号;.tgz
是.tar.gz
的简写,表示这是一个通过gzip压缩的tar归档文件。 -
-C /opt/module/
:这个选项指定了解压后的文件应该被放置的目录。-C
选项后面跟着的/opt/module/
是目标目录的路径。注意,-C
选项在某些版本的tar
中可能写作--directory=
或者简写为-C
,用于在解压之前改变当前工作目录到指定的目录。但是,在这个上下文中,-C
实际上更准确地被理解为在解压时直接将内容解压到指定的目录,而不是先改变当前工作目录。不过,大多数用户(包括很多文档和教程)都使用-C
来表示这个操作,尽管具体实现可能略有不同。
3、修改解压后的文件名称
mv kafka_2.11-0.11.0.0/ kafka
4、在/opt/module/kafka 目录下创建 logs 文件夹
mkdir logs
5、修改配置文件
进入到Kafka/config目录下
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vim server.properties
配置以下参数,这些参数原本配置文件中是存在是
#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper 集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
6、配置环境变量
sudo vim /etc/profile.d/my_env.sh
配置以下内容
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
7、分发安装包
[atguigu@hadoop102 module]$ xsync kafka/
8、启动集群
三台服务器都有 QuorumPeerMain表示zookeeper集群已经启动
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon
config/server.properties
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon
config/server.properties
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon
config/server.properties
这里需要使用 -daemon 指定刚才设置的配置文件config/server.properties
9、关闭集群
[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh stop
10、kafka 群起脚本
for i in hadoop102 hadoop103 hadoop104
do
echo "========== $i =========="
ssh $i '/opt/module/kafka/bin/kafka-server-start.sh -daemon
/opt/module/kafka/config/server.properties'
done