Python连接Kafka收发数据等操作

news2024/9/25 8:16:32

目录

一、Kafka

二、发送端(生产者)

三、接收端(消费者)

四、其他操作


一、Kafka

Apache Kafka 是一个开源流处理平台,由 LinkedIn 开发,并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构建实时的数据流和流式处理应用程序,它以高吞吐量、可扩展性和容错性著称。

kafka-python 是一个用 Python 编写的 Apache Kafka 客户端库。

安装命令如下:

pip install kafka-python

二、发送端(生产者)

自动创建test主题,并每隔一秒发送一条数据,示例代码如下:

from kafka import KafkaProducer
import json
import time

# Kafka服务器地址
bootstrap_servers = ['localhost:9092']

# 创建KafkaProducer实例
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

# 发送消息的函数
def send_message(topic, message):
    # 将消息转换为字节
    producer.send(topic, json.dumps(message).encode('utf-8'))
    producer.flush()

if __name__ == '__main__':
    # 创建'test'主题
    topic = 'test'
    # 发送消息
    i = 1
    while True:
        message = {'num': i, 'msg': f'Hello Kafka {i}'}
        send_message(topic, message)
        i += 1
        time.sleep(1)

三、接收端(消费者)

代码如下:

from kafka import KafkaConsumer
import json

# Kafka服务器地址
bootstrap_servers = ['localhost:9092']

# 创建KafkaConsumer实例
consumer = KafkaConsumer(
    'test',
    bootstrap_servers=bootstrap_servers,
    auto_offset_reset='latest',  # 从最新的消息开始消费
    # auto_offset_reset='earliest',  # 从最早的offset开始消费
    enable_auto_commit=True,  # 自动提交offset
    group_id='my-group'  # 消费者组ID
)

# 消费消息
for message in consumer:
    # 将接收到的消息解码并转换为字典
    message = json.loads(message.value.decode('utf-8'))
    print(f"Received message: {message}")

消费者参数如下:

1、auto_offset_reset
该参数指定了当Kafka中没有初始偏移量或当前偏移量在服务器上不再存在时(例如数据被删除了),消费者应从何处开始读取数据。
可选值:
earliest:从最早的记录开始消费,即从分区日志的开始处开始。
latest:从最新的记录开始消费,即从分区日志的末尾开始。(默认)
none:如果没有为消费者指定初始偏移量,就抛出一个异常。

2、enable_auto_commit

该参数指定了消费者是否周期性地提交它所消费的偏移量。自动提交偏移量可以简化消费者的使用,但可能有重复消费或数据丢失的风险。禁用自动提交可以更精确地控制偏移量的提交时机,通常在确保消息处理成功后才提交偏移量。
可选值:
true:自动提交偏移量。(默认)
false:不自动提交偏移量,需要手动调用commitSync()或commitAsync()来提交偏移量。

3、group_id

该参数用于指定消费者所属的消费组。同一个消费组的消费者将共同消费一个主题的不同分区,而不同消费组的消费者可以独立地消费消息,互不影响。这对于实现负载均衡和故障转移很有用。
类型:字符串(必须指定)

四、其他操作

list_topics():获取主题元数据。

create_topics():创建新主题。

delete_topics():删除主题。

from kafka.admin import KafkaAdminClient, NewTopic

# 获取主题元数据
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092', client_id='test')
topics = admin_client.list_topics()
print(topics)


# 创建主题
new_topic = NewTopic(name="test-topic", num_partitions=3, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic], validate_only=False)

# 删除主题
admin_client.delete_topics(topics=['test-topic'])

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2163011.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在Java中,关于final、static关键字与方法的重写和继承【易错点】

在Java中,关于final、static关键字与方法的重写和继承【易错点】 1.final方法不能被重写2.static方法不是重写,而是遮蔽3.final与static的组合4.final与继承5.static与继承 1.final方法不能被重写 如果父类中的方法被声明为final,那么这个方法…

开源音频处理项目推荐【持续更新】

Audacity 介绍:Audacity是一款功能强大的开源音频编辑软件,适用于多种操作系统,包括Windows、macOS和Linux。它支持多轨音频编辑、录制,并且提供了丰富的音频处理功能,如剪切、复制、粘贴、混音、降噪等 。Audacity的…

基于Python+flask+MySQL+HTML的全国范围水质分析预测系统,可视化用echarts,预测算法随机森林

1绪论 近年来,水质监测系统的进步显著,这在全球环保意识不断提升的背景下尤为明显。大量资源被投入到水质监测技术的研发和应用中,以不断优化监测效能。水资源的保护及健康环境的维护,这种趋势旨在提升人们生活质量,确…

微软宣称其新工具可纠正人工智能幻觉 但专家依然对此表示怀疑

人工智能经常胡言乱语,微软现在说它有办法解决这个问题,但我们有理由对此持怀疑态度。微软今天发布了一项名为"更正"(Correction)的服务,它可以自动修改人工智能生成的与事实不符的文本。Correction 首先会标…

华为认证HCIA篇--网络通信基础

大家好呀!我是reload。今天来带大家学习一下华为认证ia篇的网络通信基础部分,偏重一些基础的认识和概念性的东西。如果对网络通信熟悉的小伙伴可以选择跳过,如果是新手或小白的话建议还是看一看,先有个印象,好为后续的…

8.隐私与安全 - 使用ChatGPT时的注意事项【8/10】

引言 在数字时代,隐私和安全已成为全球关注的焦点。随着技术的发展,个人信息和数据的收集、存储、处理和传输变得越来越普遍,这既带来了便利,也带来了风险。保护个人隐私和数据安全不仅是法律的要求,也是维护公众信任…

solidwork中查看装配体螺丝或零件

假设我的PETG打印件到了,想知道这个螺丝的型号,怎么办 解决办法: 第一步先看看有没有固定的字样 如果固定的话是不行的。需要这样做: 把这里给关了 接下来第二步,点击你想查看的螺丝 然后就会跳到零件图 可以看到直径…

Cloudflare为网站添加AI审计 可检查AI爬虫何时抓取和抓取频次以及直接屏蔽爬虫

网络服务提供商 Cloudflare 宣布即日起为所有网站 (包括免费托管的网站) 带来 AI 审计功能,该功能目前处于测试阶段,可以分析 AI 公司的爬虫和抓爬数据。新的 AI 审计工具 (Cloudflare AI Audit) 主要提供 AI 公司的爬虫何时到网站来抓取数据、抓取的数据…

Unity 热更新(HybridCLR+Addressable)-资源更新

七、资源更新 创建一个叫Aot的文件夹,用来存放不会热更新的资源 这个修改为第三个 ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/b8be5e6465184ad5ad6173c6870bfa06.png 这个是更新 在更新或者打包时遇到端口被占的报错,不用理会&#xf…

二、认识大模型

认识大模型 什么是大模型?发展趋势AGI是不是泡沫大模型对比【时效】大模型特点大模型技术原理向量化除了向量化,大模型还具有特征提取特点 总结结语 什么是大模型? 大模型是大规模语言模型(Large Language Model)的简…

mysql如何替换数据库所有表中某些字段含有的特定值

目录 背景查询所有表名查询表的所有字段过虑特征字段替换字段中含有的特定值 背景 公司的测试域名更换了,导致存放在数据库中的域名也要跟着替换,当然把域名存放在数据库表中是不科学的,不建议这样做,但公司的同事就这样做了&…

由动静压之比求马赫数的MATLAB函数

函数介绍 输入:动静压之比 p r e pre pre 输出:马赫数 M a c h Mach Mach 【注】仅适合亚音速的情况,如果动静压之比过大或过小,会有相应的提示 函数源代码 function [m] pre2mach(pre) m(5*(pre1).^0.2857-5).^0.5; if pre&l…

Leetcode 螺旋矩阵

算法思想: 这个算法的目标是按照顺时针螺旋的顺序从矩阵中取出元素。为了做到这一点,整个思路可以分成几个关键步骤: 定义边界:首先需要定义四个边界变量: left:当前左边界的索引。right:当前右…

uniapp 实现3d轮播图,也就是中间的放大两边的缩小 用swiper和swiper-item就能实现

话不多说&#xff0c;直接上代码&#xff0c;无需引入外部资源&#xff0c; 用swiper和swiper-item就能实现 先上结构代码 <swiper class"header" circular previous-margin"80rpx" next-margin"60rpx" :current"current"change&…

点亮城市安全:高科技助力精准定位路灯漏电‘隐形杀手

在城市的每一个角落&#xff0c;路灯如同守夜人&#xff0c;默默照亮归家的路。然而&#xff0c;当这些守护者出现“漏电”隐患时&#xff0c;不仅威胁着行人的安全&#xff0c;还可能引发一系列电气故障。那么&#xff0c;如何精准快速地找出这些隐藏的漏电点&#xff0c;并有…

二叉树进阶oj题【二叉树相关10道oj题的解析和代码实现】

目录 二叉树进阶oj题1.根据二叉树创建字符串2.二叉树的层序遍历3.二叉树的层序遍历 II4.二叉树的最近公共祖先5.二叉搜索树和双向链表6.从前序与中序遍历序列构造二叉树7.从中序和后序遍历序列来构造二叉树8.二叉树的前序遍历&#xff0c;非递归迭代实现9.二叉树中序遍历 &…

防止电脑电池老化,禁止usb或者ac接口调试时充电

控制android系统&#xff0c;开发者模式&#xff0c;开启和禁止充电 连接 Android 手机到电脑的 USB 端口。 下载并安装 Android Debug Bridge (ADB) 工具[1]。 USB&#xff1a; 在命令行中输入 adb shell dumpsys battery set usb 0&#xff0c;以禁止 USB 充电。 在命令…

【AI创作组】Matlab中进行符号计算

提示:代码一定要自己运行过才算数…… 1. 符号计算工具箱介绍 1.1 工具箱功能 MATLAB的符号计算工具箱,即Symbolic Math Toolbox,是一套强大的数学软件工具,它使得MATLAB具备了符号运算的能力。该工具箱提供了一系列函数,用于求解、绘制和操作符号数学方程。用户可以直接…

[Linux]从零开始的Minecraft服务器搭建教程

一、前言 学习Linux有一段时间了&#xff0c;当然&#xff0c;我们要把学习的知识运用到实际生活中去。最近朋友们都在玩我的世界&#xff0c;网易版的我的世界联机非常不稳定&#xff0c;用起来也算是非常难受了。所以还是准备转战JAVA版。为了联机&#xff0c;可以考虑一个人…

ARM单片机的内存分布(重要)

ARM单片机的内存分布&#xff08;重要&#xff09; 一、S32K344的内存布局 MEMORY {int_pflash : ORIGIN 0x00400000, LENGTH 0x003D4000 /* 4096KB - 176KB (sBAF HSE)*/int_dflash : ORIGIN 0x10000000, LENGTH 0x00020000 /* 128KB …