文章目录
- Kafka概述
- 定义
- 应用场景
- 缓冲/削峰
- 解耦
- 异步通信
- 应用模式
- 点对点模式
- 发布/订阅模式
- 基础架构
- Kafka集群部署
- 集群规划
- 下载解压
- 修改配置文件
- 分发安装包
- hadoop103、hadoop104修改配置文件
- 配置环境变量
- 启动集群
- 先启动Zookeeper集群
- 然后启动Kafka
- 关闭集群
- 集群启停脚本
- 脚本编写
- 添加执行权限
- 启动集群脚本命令
- 停止集群脚本命令
- Docker启动Kafka集群
- docker-compose.yml编写
- 启动compose
- 命令行验证
- Python验证
Kafka概述
定义
kafka是一种分布式的,基于发布/订阅的消息队列 (MessageQueue)。它可以处理消费者在网站中的所有动作流数据。
Kafka是一个开源的分布式事件流平台(Event StreamingPlatform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。(既想处理消息队列,又想处理数据)
应用场景
缓冲/削峰
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
异步通信
允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。
应用模式
点对点模式
消费者主动拉去数据,消息收到后清除消息
发布/订阅模式
• 可以有多个topic主题(浏览、点赞、收藏、评论等)
• 消费者消费数据之后,不删除数据
• 每个消费者相互独立,都可以消费到数据
基础架构
(1)Producer:消息生产者,就是向Kafka broker发消息的客户端。
(2)Consumer:消息消费者,向Kafka broker取消息的客户端。
(3)Consumer Group(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
(4)Broker:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
(5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。
(6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
(7)Replica:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower。
(8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
(9)Follower:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader。
Kafka集群部署
集群规划
hadoop102 | hadoop103 | hadoop104 |
---|---|---|
zk | zk | zk |
kafka | kafka | kafka |
下载解压
官网地址;https://kafka.apache.org/downloads
# 下载
cd /opt/module
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz
# 解压
tar -zxvf kafka_2.12-3.4.0.tgz -C /opt/module/
# 改名
mv kafka_2.12-3.4.0/ kafka
修改配置文件
路径:config/server.properties
- broker.id=0
- log.dirs=/opt/module/kafka/datas
- zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
#broker的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
分发安装包
这一步的作用是让另外几台服务器也都有 kafka/ 这套文件
- 命令:xsync kafka/
- 下面是xsync.sh脚本的内容
#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
echo Not Enough Arguement!
exit;
fi
#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
echo ==================== $host ====================
#3. 遍历所有目录,挨个发送
for file in $@
do
#4. 判断文件是否存在
if [ -e $file ]
then
#5. 获取父目录
pdir=$(cd -P $(dirname $file); pwd)
#6. 获取当前文件的名称
fname=$(basename $file)
ssh $host "mkdir -p $pdir"
rsync -av $pdir/$fname $host:$pdir
else
echo $file does not exists!
fi
done
done
hadoop103、hadoop104修改配置文件
# broker.id不得重复,整个集群中唯一
# hadoop103对应的
broker.id=1
# hadoop104对应的
broker.id=2
配置环境变量
路径:vim /etc/profile.d/my_env.sh
注意:集群内的机子都需要配一遍
# KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
刷新一下环境变量
source /etc/profile
启动集群
先启动Zookeeper集群
kafka/zk.sh start
vim kafka/zk.sh
#!/bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo ------------- zookeeper $i 启动 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
done
}
;;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo ------------- zookeeper $i 停止 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
done
}
;;
"status"){
for i in hadoop102 hadoop103 hadoop104
do
echo ------------- zookeeper $i 状态 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
done
}
;;
esac
然后启动Kafka
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties
关闭集群
bin/kafka-server-stop.sh
集群启停脚本
脚本编写
vim kf.sh
#! /bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
done
};;
esac
添加执行权限
chmod +x kf.sh
启动集群脚本命令
kf.sh start
停止集群脚本命令
kf.sh stop
注意: 停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。
Docker启动Kafka集群
docker-compose.yml编写
version: '3'
services:
li-zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- "7181:2181"
networks:
- li-kafka-net
li-kafka-1:
image: wurstmeister/kafka
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7091
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
ports:
- "7091:9092"
networks:
- li-kafka-net
li-kafka-2:
image: wurstmeister/kafka
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7092
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
ports:
- "7092:9092"
networks:
- li-kafka-net
li-kafka-3:
image: wurstmeister/kafka
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7093
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
ports:
- "7093:9092"
networks:
- li-kafka-net
li-kafka-map:
image: dushixiang/kafka-map:latest
environment:
KAFKA_MAP_KAFKA_SERVERS: li-kafka-1:9092,li-kafka-2:9092,li-kafka-3:9092
KAFKA_MAP_USERNAME: admin
KAFKA_MAP_PASSWORD: admin
ports:
- "8080:8080"
networks:
- li-kafka-net
networks:
li-kafka-net:
driver: bridge
启动compose
docker-compose up -d
命令行验证
docker exec -it kafka-server bash
cd /opt/bitnami/kafka
列出所有topics
kafka-topics.sh --bootstrap-server 139.196.169.148:7091 --list
创建一个topic
kafka-topics.sh --bootstrap-server 139.196.169.148:7092 --create --partitions 1 --replication-factor 3 --topic first
生产者
kafka-console-producer.sh --bootstrap-server 139.196.169.148:7091 --topic first
消费者
kafka-console-consumer.sh --bootstrap-server 139.196.169.148:7092 --from-beginning --topic first
Python验证
-
安装包
pip install kafka-python
-
生产者
from kafka import KafkaProducer from kafka.errors import KafkaError # Kafka 集群地址 bootstrap_servers = ['139.196.169.148:7091', '139.196.169.148:7092', '139.196.169.148:7093'] # Kafka 主题名称 topic = 'first' # 创建 Kafka 生产者 producer = KafkaProducer(bootstrap_servers=bootstrap_servers) # 发送消息到 Kafka 主题 def send_message(message): try: producer.send(topic, message.encode('utf-8')) producer.flush() print('Message sent successfully:', message) except KafkaError as e: print('Failed to send message:', e) # 测试发送消息 send_message('Hello, Kafka!')
-
消费者
from kafka import KafkaConsumer # Kafka 集群地址 bootstrap_servers = ['139.196.169.148:7091', '139.196.169.148:7092', '139.196.169.148:7093'] # Kafka 主题名称 topic = 'first' # 创建 Kafka 消费者 consumer = KafkaConsumer( topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest', # 从最早的消息开始消费 enable_auto_commit=True, # 自动提交消费位移 group_id='my-group') # 消费者组名称 # 从 Kafka 主题消费消息 def consume_message(): for message in consumer: print('Message received:', message.value.decode('utf-8')) # 测试消费消息 consume_message()