step0: 环境准备
1、 安装jdk 1.8 以上版本
yum -y install java-1.8.0-openjdk.x86_64
2、 安装配置ZooKeeper
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.2/apache-zookeeper-3.8.2-bin.tar.gz --no-check-certificate
tar -zxf apache-zookeeper-3.8.2-bin.tar.gz
# 存放数据的路径 根据需要修改
mkdir /你存放数据的路径/data
mkdir /你存放日志的路径/logs
cd apache-zookeeper-3.8.2-bin/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 增加如下内容
# dataDir = /你存放数据的路径/data(自定义目录)
# dataLogDir = /你存放日志的路径/logs(自定义目录)
cd ../bin
./zkServer.sh start
#./zkServer.sh stop
#./zkServer.sh restart
#./zkServer.sh status
# 如果启动失败 Starting zookeeper ... FAILED TO START 可能是server 的端口号被占用 需要指定下端口号
# vim zoo.cfg
# admin.serverPort=9091
./zkCli.sh
./zkServer.sh stop
step1: 下载kafka
wget https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz --no-check-certificate
tar -xzf kafka_2.13-3.5.0.tgz
cd kafka_2.13-3.5.0
# 新建一个会话
# yum install tmux -y
tmux
# Start the ZooKeeper service
./bin/zookeeper-server-start.sh config/zookeeper.properties
# Start the Kafka broker service
bin/kafka-server-start.sh config/server.properties
step3: 验证安装
KafkaProducer
pip install kafka-python
import logging
import time
from kafka import KafkaProducer
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
logging.error('I am an errback', exc_info=excp)
# handle exception
# 将yoursIp替换为启动kafka 服务的ip
producer = KafkaProducer(bootstrap_servers="yoursIp:9092")
i = 0
while True:
ts = int(time.time() * 1000)
future = producer.send(topic="test", value=bytes(str(i), encoding="utf-8"), key=bytes(str(i), encoding="utf-8"),
timestamp_ms=ts).add_callback(on_send_success).add_errback(on_send_error)
record_metadata = future.get(timeout=60)
producer.flush()
print(i)
print(record_metadata)
i += 1
time.sleep(1)
KafkaConsummer
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
import time
tp = TopicPartition("test", 0)
consummer = KafkaConsumer(bootstrap_servers=["yoursIp:9092"], auto_offset_reset='earliest',
enable_auto_commit=False, group_id="my-group")
consummer.assign([tp])
print("starting offset is {}".format(consummer.position(tp)))
for message in consummer:
print(message, message.offset)
consummer.commit({tp: OffsetAndMetadata(message.offset + 1, message)})
time.sleep(0.15)
重新启动后,consumer从上一次记录的offset开始继续消费消息。之后每一次consumer重新启动,consumer都会从上一次停止的地方继续开始消费。
参考链接
- https://kafka.apache.org/quickstart
- https://blog.csdn.net/qq_54780911/article/details/124293670
- https://cloud.tencent.com/developer/article/1700375
- https://juejin.cn/post/6924584866327560199
- https://kafka-python.readthedocs.io/