可以在可视化的工具通过点击来操作kafka完成主题的创建,分区等操作
注意: 安装完后桌面不会有快捷方式,需要去电脑上搜索,或者去自己选的安装位置找到发送快捷方式到桌面!
连接配置
创建主题
删除主题
主题下的数据查看
数据显示问题说明
修改工具的数据显示类型
发送消息数据到kafka
Kafka的Python API的操作
模块安装
纯Python的方式操作Kafka。
准备工作:在node1的节点上安装一个python用于操作Kafka的库
安装kafka-python 模模块 ,模块中提供了操作kafka的方法
在线安装
在node1上安装就可以,需要保证服务器能够连接网络
安装命令: python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple
离线安装
将kafka_python-2.0.2-py2.py3-none-any.whl安装包上传服务器software目录下进行安装
安装命令: pip install kafka_python-2.0.2-py2.py3-none-any.whl
模块使用
API使用的参考文档: Usage — kafka-python 2.0.2-dev documentation
模块中封装了两个类,
一个是生成者类KafkaProducer,提供了向kafka写数据的方法
另一个是消费者类KafkaConsumer,提供了读取kafka数据的方法
完成生产者代码
生成者类KafkaProducer,提供了向kafka写数据的方法
send(topic,valu)方法: 发送消息 topic参数:指定向哪个主题发送消息 value参数:指定发送的消息数据 ,数据类型要求是bytes类型
示例:
# 导包 from kafka import KafkaProducer # 编写代码 if __name__ == '__main__': # 创建生产者对象并指定对应服务器 producer = KafkaProducer(bootstrap_servers=['node1:9092']) # 发送消息 for i in range(1,101): future = producer.send('kafka', f'hi_kafka_{i}'.encode()) # 获取元数据 record_metadata = future.get() # 从元数据中获取主题,分区,偏移 print(record_metadata.topic) print(record_metadata.partition) print(record_metadata.offset)
完成消费者代码
消费者类KafkaConsumer,提供了读取kafka数据的方法
KafkaConsumer(topic,bootstrap_servers) 第一个参数:指定消费者连接的主题, 第二个参数:指定消费者连接的kafka服务器
示例:
# 导包 from kafka import KafkaConsumer # 编写代码 if __name__ == '__main__': # 创建消费者对象 consumer = KafkaConsumer('kafka',bootstrap_servers=['node1:9092']) # 遍历对象 for message in consumer: # 格式化打印,设置相关参数 # 因为value是二进制,需要decode解码 print ("主题:%s,分区:%d,偏移:%d : key=%s value=%s" % (message.topic, message.partition,message.offset, message.key, message.value.decode('utf8')))
可能遇到的错误:
原因: 服务器环境有问题。是因为服务器上既安装了kafka-python的第三方依赖,同时还安装kafka的第三方依赖。可以通过pip list | grep kafka进行确定 解决办法: 先将这两个第三方依赖全部卸载,然后再重新执行如下命令 python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple