Python发送带key的kafka消息

news2024/12/19 15:57:08

在Python中发送带有键(key)的Kafka消息,通常会使用`confluent-kafka`或`kafka-python`这样的库。这里我将分别展示如何使用这两个库来实现这个功能。

 

### 使用 `confluent-kafka`

 

首先,确保你已经安装了`confluent-kafka`库。如果没有安装,可以使用pip进行安装:

```bash

pip install confluent-kafka

```

 

然后,你可以使用以下代码来发送带有键的消息:

```python

from confluent_kafka import Producer

 

def delivery_report(err, msg):

    """ Called once for each message produced to indicate delivery result.

        Triggered by poll() or flush(). """

    if err is not None:

        print(f'Message delivery failed: {err}')

    else:

        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

 

# 配置生产者

conf = {'bootstrap.servers': 'localhost:9092'}

 

# 创建生产者实例

producer = Producer(conf)

 

# 消息的键和值

message_key = 'my_key'

message_value = 'Hello, Kafka!'

 

# 发送消息

producer.produce('my_topic', key=message_key, value=message_value, callback=delivery_report)

 

# 触发所有消息的回调函数

producer.flush()

```

 

### 使用 `kafka-python`

 

同样地,确保你已经安装了`kafka-python`库。如果未安装,可以通过pip安装:

```bash

pip install kafka-python

```

 

接下来,使用以下代码来发送带有键的消息:

```python

from kafka import KafkaProducer

 

# 创建生产者实例

producer = KafkaProducer(bootstrap_servers='localhost:9092',

                         key_serializer=str.encode,

                         value_serializer=str.encode)

 

# 消息的键和值

message_key = 'my_key'

message_value = 'Hello, Kafka!'

 

# 发送消息

producer.send('my_topic', key=message_key, value=message_value)

 

# 确保所有消息都已发送

producer.flush()

 

# 关闭生产者

producer.close()

```

 

在这两个例子中,我们创建了一个Kafka生产者,并指定了一个本地运行的Kafka服务器地址(`localhost:9092`)。然后,我们定义了要发送的消息的键和值,并调用了相应的方法来发送消息。对于`confluent-kafka`,我们还设置了一个回调函数来处理消息的交付结果。

 

请根据你的实际环境调整配置,例如Kafka服务器的地址等。希望这能帮助到你!如果有任何其他问题,请随时提问。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2262249.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Moretl开箱即用日志采集

永久免费: 至Gitee下载 使用教程: Moretl使用说明 使用咨询: 用途 定时全量或增量采集工控机,电脑文件或日志. 优势 开箱即用: 解压直接运行.不需额外下载.管理设备: 后台统一管理客户端.无人值守: 客户端自启动,自更新.稳定安全: 架构简单,兼容性好,通过授权控制访问. 架…

分享一次接口性能摸底测试过程

接口性能测试是用于验证应用程序中的接口是否可以满足系统的性能要求的一种测试方法。确定应用程序在各种负载条件下的性能指标,例如响应时间、吞吐量、并发性能等,以便提高系统的性能和可靠性。本文主要讲述接口性能测试从前期准备、方案设计到环境搭建…

【机器学习】机器学习的基本分类-无监督学习-t-SNE(t-分布随机邻域嵌入)

t-SNE(t-分布随机邻域嵌入) t-SNE(t-distributed Stochastic Neighbor Embedding)是一种用于降维的非线性技术,常用于高维数据的可视化。它特别适合展示高维数据在二维或三维空间中的分布结构,同时能够很好…

【教学类-83-03】20241218立体书盘旋蛇3.0——圆点蛇1(蚊香形)

背景需求: 制作儿童简易立体书贺卡 【教学类-83-01】20241215立体书三角嘴1.0——小鸡(正菱形嘴)-CSDN博客文章浏览阅读1k次,点赞24次,收藏18次。【教学类-83-01】20241215立体书三角嘴1.0——小鸡(正菱形…

监控视频汇聚融合云平台一站式解决视频资源管理痛点

随着5G技术的广泛应用,各领域都在通信技术加持下通过海量终端设备收集了大量视频、图像等物联网数据,并通过人工智能、大数据、视频监控等技术方式来让我们的世界更安全、更高效。然而,随着数字化建设和生产经营管理活动的长期开展&#xff0…

JAVA 零拷贝技术和主流中间件零拷贝技术应用

目录 介绍Java代码里面有哪些零拷贝技术java 中文件读写方式主要分为什么是FileChannelmmap实现sendfile实现 文件IO实战需求代码编写实战IOTest.java 文件上传阿里云,测试运行代码看耗时为啥带buffer的IO比普通IO性能高?BufferedInputStream为啥性能高点…

云灾备技术

目录 云灾备分类与定义 云容灾定义与主要应用场景 云容灾定义 应用场景 云备份定义与主要应用场景 云备份定义 应用场景 云容灾参考模型与关键技术 云备份参考模型与关键技术 云灾备分类与定义 云容灾技术是指保护云数据中心业务持续性的灾备技术,它是云灾…

进程通信方式---共享映射区(无血缘关系用的)

5.共享映射区(无血缘关系用的) 文章目录 5.共享映射区(无血缘关系用的)1.概述2.mmap&&munmap函数3.mmap注意事项4.mmap实现进程通信父子进程练习 无血缘关系 5.mmap匿名映射区 1.概述 原理:共享映射区是将文件…

leetcode 面试经典 150 题:长度最小的子数组

链接长度最小的子数组题序号209题型数组解题方法滑动窗口难度中等 题目 给定一个含有 n 个正整数的数组和一个正整数 target 。找出该数组中满足其总和大于等于 target 的长度最小的 子数组 [numsl, numsl1, …, numsr-1, numsr] ,并返回其长度。如果不存在符合条件…

代码随想录day22 | 回溯算法理论基础 leetcode 77.组合 77.组合 加剪枝操作 216.组合总和III 17.电话号码的字母组合

DAY22 回溯算法开始 学到目前最烧脑的一天 回溯算法理论基础 任何回溯算法都可以抽象成一个树结构 理论基础 什么是回溯法 回溯法也可以叫做回溯搜索法,它是一种搜索的方式。 在二叉树系列中,我们已经不止一次,提到了回溯 回溯是递归的副…

画一颗随机数

代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>codePen - Random Tree</title> </head> <body><canvas></canvas><script>const canvas doc…

牛客周赛 Round 72 题解

本次牛客最后一个线段树之前我也没碰到过&#xff0c;等后续复习到线段树再把那个题当例题发出来 小红的01串&#xff08;一&#xff09; 思路&#xff1a;正常模拟&#xff0c;从前往后遍历一遍去统计即可 #include<bits/stdc.h> using namespace std; #define int lo…

[x86 ubuntu22.04]投影模式选择“只使用外部”,外部edp屏幕无背光

1 问题描述 CPU&#xff1a;G6900E OS&#xff1a;ubuntu22.04 Kernel&#xff1a;6.8.0-49-generic 系统下有两个一样的 edp 屏幕&#xff0c;投影模式选择“只使用外部”&#xff0c;内部 edp 屏幕灭&#xff0c;外部 edp 屏幕无背光。DP-1 是外部 edp 屏幕&#xff0c;eDP-1…

清理C盘小记

突然C盘就爆满了&#xff0c;想当初还是给他预留了120G的空间&#xff0c;感觉到现在也不够用了&#xff0c;担心出现死机的情况就赶紧进行了清理。有一说一&#xff0c;清理回收站是真的有用。 参考&#xff1a;C盘清理指南&#xff0c;清理出30G起&#xff0c;超详细总结&am…

Docker:Docker Compose(补充三)

Docker&#xff1a;Docker Compose 1. Docker Compose 批量管理容器的工具 1. Docker Compose 批量管理容器的工具 Docker Compose 是一个用于定义和运行多容器 Docker 应用程序的工具。通过一个 YAML 文件来配置应用服务&#xff0c;它允许用户编排、组合和配置多个容器的部署…

lightRAG 论文阅读笔记

论文原文 https://arxiv.org/pdf/2410.05779v1 这里我先说一下自己的感受&#xff0c;这篇论文整体看下来&#xff0c;没有太多惊艳的地方。核心就是利用知识图谱&#xff0c;通过模型对文档抽取实体和关系。 然后基于此来构建查询。核心问题还是在解决知识之间的连接问题。 论…

Visual studio的AI插件-通义灵码

通义灵码 TONGYI Lingma 兼容 Visual Studio、Visual Studio Code、JetBrains IDEs 等主流 IDE&#xff1b;支持 Java、Python、Go、C/C、C#、JavaScript、TypeScript、PHP、Ruby、Rust、Scala 等主流编程语言。 安装 打开扩展管理器&#xff0c;搜送“TONGYI Lingma”&…

shutil 文件拷贝copy - python 实现

DataBall 助力快速掌握数据集的信息和使用方式&#xff0c;会员享有 百种数据集&#xff0c;持续增加中。 需要更多数据资源和技术解决方案&#xff0c;知识星球&#xff1a; “DataBall - X 数据球(free)” -------------------------------------------------------------…

attack xv6

思路 被这个实验折磨了两天&#xff0c;可能是2024新出的一个实验内容&#xff0c;网上资料少&#xff0c;参考了一篇仅有的博客&#xff0c;吭哧吭哧分析出来了个大概吧…在此记录一下&#xff0c;以便帮助有需要的人。 attack xv6的ans只有几行代码&#xff0c;根据实验描述…