摸鱼大数据——Kafka——kafka tools工具使用

news2025/1/23 17:51:13

可以在可视化的工具通过点击来操作kafka完成主题的创建,分区等操作

注意: 安装完后桌面不会有快捷方式,需要去电脑上搜索,或者去自己选的安装位置找到发送快捷方式到桌面!

 连接配置

创建主题

删除主题

主题下的数据查看

数据显示问题说明

修改工具的数据显示类型

发送消息数据到kafka

Kafka的Python API的操作

模块安装

纯Python的方式操作Kafka。

准备工作:在node1的节点上安装一个python用于操作Kafka的库

安装kafka-python 模模块 ,模块中提供了操作kafka的方法

在线安装

在node1上安装就可以,需要保证服务器能够连接网络

 安装命令: python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

离线安装

将kafka_python-2.0.2-py2.py3-none-any.whl安装包上传服务器software目录下进行安装

 安装命令: pip install kafka_python-2.0.2-py2.py3-none-any.whl

模块使用

API使用的参考文档: Usage — kafka-python 2.0.2-dev documentation

模块中封装了两个类,

一个是生成者类KafkaProducer,提供了向kafka写数据的方法

另一个是消费者类KafkaConsumer,提供了读取kafka数据的方法

完成生产者代码

生成者类KafkaProducer,提供了向kafka写数据的方法

 send(topic,valu)方法: 发送消息
 topic参数:指定向哪个主题发送消息
 value参数:指定发送的消息数据 ,数据类型要求是bytes类型

示例:

 # 导包
 from kafka import KafkaProducer
 ​
 # 编写代码
 if __name__ == '__main__':
     # 创建生产者对象并指定对应服务器
     producer = KafkaProducer(bootstrap_servers=['node1:9092'])
     # 发送消息
     for i in range(1,101):
         future = producer.send('kafka', f'hi_kafka_{i}'.encode())
         # 获取元数据
         record_metadata = future.get()
         # 从元数据中获取主题,分区,偏移
         print(record_metadata.topic)
         print(record_metadata.partition)
         print(record_metadata.offset)

完成消费者代码

消费者类KafkaConsumer,提供了读取kafka数据的方法

 KafkaConsumer(topic,bootstrap_servers)
 第一个参数:指定消费者连接的主题,
 第二个参数:指定消费者连接的kafka服务器

示例:

 # 导包
 from kafka import KafkaConsumer
 ​
 # 编写代码
 if __name__ == '__main__':
 ​
     # 创建消费者对象
     consumer = KafkaConsumer('kafka',bootstrap_servers=['node1:9092'])
     # 遍历对象
     for message in consumer:
 ​
         # 格式化打印,设置相关参数
         # 因为value是二进制,需要decode解码
         print ("主题:%s,分区:%d,偏移:%d : key=%s value=%s"
                % (message.topic, message.partition,message.offset, message.key, message.value.decode('utf8')))
 ​

可能遇到的错误:

 原因: 服务器环境有问题。是因为服务器上既安装了kafka-python的第三方依赖,同时还安装kafka的第三方依赖。可以通过pip list | grep kafka进行确定
 解决办法: 先将这两个第三方依赖全部卸载,然后再重新执行如下命令
 python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1931019.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++】vector的认识与使用

vector的认识与使用 认识vectorvector的使用Member functions(成员函数)构造函数(constructor)析构函数(destructor)赋值构造函数(operator) Iterators(迭代器)beginendrbeginrend Capacity(容量)sizemax_s…

zephyr设置BLE广播数据实例

目录 实例1:静态开启广播数据实例2:动态更改广播数据实例3:创建可连接的广播 实例1:静态开启广播数据 新建一个hello world的工程模板。 在prj.conf中开启蓝牙 CONFIG_BTy这个宏,默认会开启广播支持 ( BT_BROADCAS…

1448.统计二叉树中的好节点数目

给你一棵根为 root 的二叉树,请你返回二叉树中好节点的数目。 「好节点」 X 定义为:从根到该节点 X 所经过的节点中,没有任何节点的值大于 X 的值。 示例 1: 输入:root [3,1,4,3,null,1,5] 输出:4 解释&am…

【算法】LRU缓存

难度:中等 题目: 请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构。 实现 LRUCache 类: LRUCache(int capacity) 以 正整数 作为容量 capacity 初始化 LRU 缓存int get(int key) 如果关键字 key 存在于缓存中,…

【人工智能新纪元】机器学习算法:探索智能背后的奥秘与常见利器

在这个日新月异的科技时代,人工智能(AI)如同一股不可阻挡的洪流,正深刻地改变着我们的世界。作为AI领域的核心驱动力之一,机器学习算法以其独特的魅力,引领着智能技术的飞速发展。今天,就让我们…

【学习】美国虚拟信用卡申请流程

WildCard 官方网址:https://bewildcard.com/i/PEACEFUL (使用邀请码“PEACEFUL”可以享受开卡88 折优惠,注册时提示填写邀请码就可以填写)

Vue3 组件向下通信 祖孙组件的通信 provide与inject

介绍 当父子间通信可以使用props,祖孙使用provide(传递)或inject(接收), 这时不管组件套的多深都可以向下传递。 例子 现在有一个需求,把App.vue的数据传递到MusciPlay.vue里。 App.vue …

跟李沐学AI:模型选择、过拟合和欠拟合

目录 训练误差和泛化误差 验证数据集和测试数据集 K-则交叉验证 模型总结 过拟合和欠拟合 模型容量 模型容量的影响 估计模型容量 数据复杂度 拟合总结 训练误差和泛化误差 训练误差:模型在训练数据上的误差 泛化误差:模型在新数据上的误差 …

移掉 K 位数字

题目链接 移掉 K 位数字 题目描述 注意点 1 < k < num.length < 10^5num 仅由若干位数字&#xff08;0 - 9&#xff09;组成除了 0 本身之外&#xff0c;num 不含任何前导零 解答思路 关键是怎样移掉K位数字保证移除后的数字是最小的。观察规律可得&#xff0c;为…

基于LSTM及其变体的回归预测

1 所用模型 代码中用到了以下模型&#xff1a; 1. LSTM&#xff08;Long Short-Term Memory&#xff09;&#xff1a;长短时记忆网络&#xff0c;是一种特殊的RNN&#xff08;循环神经网络&#xff09;&#xff0c;能够解决传统RNN在处理长序列时出现的梯度消失或爆炸的问题。L…

GB35114国密算法-GMSSL

C有个三方库-GMSSL是可以进行GB35114所需要的SM2、SM3、SM4等加解密算法的&#xff0c;但是使用国密算法是需要申请报备的 GmSSL是由北京大学自主开发的国产商用密码开源库&#xff0c;实现了对国密算法、标准和安全通信协议的全面功能覆盖&#xff0c;支持包括移动端在内的主流…

SpringBoot整合Swagger报错:Failed to start bean ‘documentationPluginsBootstrapper

文章目录 1 问题背景2 问题原因3 修改SpringBoot配置文件 application.properties参考 1 问题背景 Swagger是SpringBoot中常用的API文档工具&#xff0c;在刚接触使用的时候&#xff0c;按照通用的代码进行配置&#xff0c;发现报错了 [main] ERROR org.springframework.boot…

【ARM AMBA AXI 入门 5.1 - QoS是什么?QoS是怎么工作的? 】

请阅读【ARM AMBA AXI 总线 文章专栏导读】 转自&#xff1a;揭秘数通知识&#xff1a;QoS是什么&#xff1f;QoS是怎么工作的&#xff1f;&#xff08;一&#xff09; 文章目录 QoS 概述综合服务和差分服务 QoS 工具报文分类报文标记流量监管和整形工具拥塞管理工具拥塞避免工…

JuiceFS缓存特性

缓存 对于一个由对象存储和数据库组合驱动的文件系统&#xff0c;缓存是本地客户端与远端服务之间高效交互的重要纽带。读写的数据可以提前或者异步载入缓存&#xff0c;再由客户端在后台与远端服务交互执行异步上传或预取数据。相比直接与远端服务交互&#xff0c;采用缓存技…

Linux 线程初步解析

1.线程概念 在一个程序里的一个执行路线就叫做线程&#xff08;thread&#xff09;。更准确的定义是&#xff1a;线程是“一个进程内部的控制序列。在linux中&#xff0c;由于线程和进程都具有id,都需要调度等等相似性&#xff0c;因此都可以用PCB来描述和控制,线程含有PCB&am…

[RuoYi-Vue] - 5. 部分代码分析

文章目录 &#x1f374;1. 后端部分代码分析1.1 BaseController1.2 TableDataInfo1.3 AjaxResult1.4 BaseEntity &#x1f378;2. 权限注解&#x1f37a;3. 前后端交互3.1 跨域问题 &#x1f374;1. 后端部分代码分析 1.1 BaseController 1.2 TableDataInfo 分页查询统一返回对…

如何用个人电脑搭建一台本地服务器,并部署云原生开发工具TitanIDE到服务器详细教程

服务器是一种高性能计算机&#xff0c;作为网络的节点&#xff0c;它存储、处理网络上80%的数据、信息&#xff0c;因此也被称为网络的灵魂。与普通计算机相比&#xff0c;服务器具有高速CPU运算能力、长时间可靠运行、强大I/O外部数据吞吐能力以及更好的扩展性。 服务器的主要…

Day07-ES集群加密,kibana的RBAC实战,zookeeper集群搭建,zookeeper基本管理及kafka单点部署实战

Day07-ES集群加密&#xff0c;kibana的RBAC实战&#xff0c;zookeeper集群搭建&#xff0c;zookeeper基本管理及kafka单点部署实战 0、昨日内容回顾:1、基于nginx的反向代理控制访问kibana2、配置ES集群TSL认证:3、配置kibana连接ES集群4、配置filebeat连接ES集群5、配置logsta…

自建Web网站部署——案例分析

作者主页: 知孤云出岫 目录 作者主页:如何自建一个Web网站一、引言二、需求分析三、技术选型四、开发步骤1. 项目初始化初始化前端初始化后端 2. 前端开发目录结构示例代码App.jsHome.js 3. 后端开发目录结构示例代码app.jsproductRoutes.jsProduct.js 4. 前后端连接安装axio…

ospf复习综合小实验

实验要求&#xff1a; 1&#xff0c;R4为ISP&#xff0c;其上只能配置IP地址&#xff1b;R4与其他所有直连设备间均使用公有IP 2&#xff0c;R3-R5/6/7为MGRE环境&#xff0c;R3为中心站点&#xff1b; 3&#xff0c;整个OSPF环境IP基于172.16.0.0/16划分&#xff1b; 4&#…