confluent-kafka

news2025/1/23 6:12:29

confluent-kafka

pip3 install confluent-kafka

Producer.py

from confluent_kafka import Producer

# Kafka 配置
config = {
    'bootstrap.servers': '10.10.x.x:3082',
    'security.protocol': 'SASL_PLAINTEXT',
    'sasl.mechanism': 'SCRAM-SHA-512',
    'sasl.username': 'kafka-974a3a34-bpxuser1',
    'sasl.password': 'zjavIj4OPZNV2vALc2F>zesn8izaHEYP(ZK0IETrtKrMR5w+gUNpL60xkGhX3ca9'  # 请替换为您的实际密码
}

# 创建生产者实例
producer = Producer(**config)

# 异步发送消息
def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

# 发送消息
for _ in range(10):
    producer.produce('bpx', 'Hello, Kafka!', callback=delivery_report) # 替换Topic

# 等待消息被发送
producer.flush()

Consumer.py

from confluent_kafka import Consumer

# Kafka 配置
config = {
    'bootstrap.servers': '10.10.x.x:3082',
    'security.protocol': 'SASL_PLAINTEXT',
    'sasl.mechanism': 'SCRAM-SHA-512',
    'sasl.username': 'kafka-974a3a34-bpxuser1',
    'sasl.password': 'zjavIj4OPZNV2vALc2F>zesn8izaHEYP(ZK0IETrtKrMR5w+gUNpL60xkGhX3ca9',
    'group.id': 'my-python-group'  # 添加 group.id 配置项
}

# 创建消费者实例
consumer = Consumer(**config)
consumer.subscribe(['bpx']) # 替换Topic

try:
    while True:
        msg = consumer.poll(1.0)  # 等待消息,超时为1秒

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                print('End of partition reached {0}/{1}'.format(msg.topic(), msg.partition()))
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            print('Received message: {}'.format(msg.value().decode('utf-8')))
finally:
    # 关闭消费者连接
    consumer.close()

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2098666.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SAP物料帐结账操作

1. CKMLCP 实际物料帐运行 备注:执行最后一步过帐结帐时,MMPV允许记帐结帐期间和结帐的下一期间,同时OB52会计期间要允许记帐到这两个期间。 2. FAGLB03 查询材料成本差异余额 物料分类帐运行前总帐余额 物料分类帐运行后总帐余额 备注&…

模具配件加工精度的重要性及如何实现高精度加工?

在现代工业生产的舞台上,模具配件加工如同一位技艺精湛的工匠,以精度至上为准则,为各类模具赋予了优越的品质保障。时利和将详细阐述模具配件加工精度的重要性以及如何实现高精度加工! 一、精度:模具配件的灵魂 模具配件的精度是其…

语音测试(一)视频转音频

视频转音频 下载ffmpeg工具进入bin目录cmd进入控制台输入命令 ffmpeg.exe -i ./视频.mp4 ./音频.wav

92. UE5 GAS RPG 使用C++创建GE实现灼烧的负面效果

在正常游戏里,有些伤害技能会携带一些负面效果,比如火焰伤害的技能会携带燃烧效果,敌人在受到伤害后,会接受一个燃烧的效果,燃烧效果会在敌人身上持续一段时间,并且持续受到火焰灼烧。 我们将在这一篇文章里…

【最新华为OD机试E卷】猜字迷(100分)-多语言题解-(Python/C/JavaScript/Java/Cpp)

🍭 大家好这里是春秋招笔试突围 ,一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-E/D卷的三语言AC题解 💻 ACM金牌🏅️团队| 多次AK大厂笔试 | 编程一对一辅导 👏 感谢大家的订阅➕ 和 喜欢💗 🍿 最新华为OD机试D卷目录,全、新、准,题目覆盖率达 95% 以上,…

计算机网络 数据链路层1

数据链路层: 服务:将来自网络层的数据传输到相邻节点的网络层 作用:加强物理层传输原始比特流的功能 封装成帧---组帧:将来自网络层的数据在首尾添加特定信息(帧定界:帧的起始,结束) 差错控制 CRC循环冗余…

Springboot快速创建的两种方法(简单易学)

方式一:使用网站https://start.spring.io/快速创建 直接在浏览器中输入以上网址,进入创建Springboot项目页面,根据需要勾选一些选项,然后下载到本地即可。 方式二:在IDEA中创建 步骤 创建Maven项目 导入spring-bo…

Spring boot整合接入Redis

Spring boot简单接入Redis 1.pom文件中引入redis <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency> 2.yml文件redis属性配置 spring:redis:host: 120.4…

小北使用Python和pyecharts对我校通信学院2024大数据专业就业情况进行中国地图可视化

引言 在数据分析领域&#xff0c;地图可视化是一种直观展示数据分布和趋势的有效方式。然而&#xff0c;当我们面对如“2020-2024届近5年通信就业数据”这样的数据集时&#xff0c;挑战也随之而来。这些数据通常包含就业单位名称和对应的学生信息&#xff0c;但缺乏直接的地理位…

MyBatis-SQL-语句执行流程

已查询为例 首先我们可以看到&#xff0c;在查询的时候Mapper对象已经是被代理过后的&#xff1a; 所以会执行invoke方法&#xff0c;其底层实现就是JDK的动态代理&#xff1a; 如下图所示&#xff0c;如果MethodCache里面存在方法&#xff0c;则判断这个方法是否为default方…

STM32:TIM中断配置应用(1)呼吸灯:库函数讲解笔记+代码讲解笔记

声明&#xff1a;本博客为哔哩哔哩up主江协科技 “STM32入门教程”的听课笔记&#xff0c;仅供学习、参考使用&#xff0c;不得用作其他用途&#xff0c;违者必究。如有版权问题&#xff0c;请联系作者修改。 目录 一、综述 二、TIM库&#xff08;有关输出比较的函数&#x…

【如何下载Landsat数据】

下载Landsat数据可以通过多种途径实现&#xff0c;主要包括使用官方网站、第三方平台和专门的软件库等。以下是一些常用的方法&#xff1a; 1. 使用USGS官方网站 EarthExplorer&#xff08;earthexplorer.usgs.gov&#xff09; 注册账号&#xff1a;首先&#xff0c;需要在…

10-python格式化字符串的四种方法(%,format,f-string,string template)

3 f-string (格式化字符串) in Python 自 Python 3.6 引入以来,f-string 提供了一种更加简洁和直观的方式来进行字符串格式化。其语法简单明了:只需在字符串前加上字母 f 或 F,并在字符串中使用 {} 来包裹需要插入的内容。 它相比于之前的%格式化和字符串format方法写起来更…

【R语言】基于Biomod2集成平台探究物种分布区的构建流程(SDMs)(持续更新中。。。。。。)

Species Distribution Models 1.写在前面2.物种分布模型介绍3.输入数据准备及预处理3.1.如何从GBIF网站上获取分布点数据&#xff08;基于rgbif包&#xff09;3.2.分布点稀疏处理&#xff08;基于spThin函数&#xff09;3.3.如何获取环境变量数据&#xff08;基于getData函数&a…

兴业月报|八月法拍房市场套均成交折扣降至6.9折

导读 8月北京法拍房成交房源233套&#xff0c;成交总金额18.2891亿元&#xff0c;套均成交价784.94万元&#xff0c;总参拍人数890人&#xff0c;套均参拍人数3.81人&#xff0c;套均成交折扣6.9折。 ——兴业数据中心 2024年八月北京法拍房市场详细数据报告 2024.03-2024.0…

计算机网络 第1章 概述

文章目录 计算机网络概念计算机网络的组成计算机网络的功能三种数据交换技术电路交换&#xff08;Circuit Switching&#xff09;报文交换&#xff08;message&#xff09;分组交换 三种交换方式性能对比计算机网络的分类计算机网络的性能指标性能指标1&#xff1a;速率性能指标…

【mysql】mysql查询机制 调优不止是索引调优

前言&#xff1a;说到mysql调优 我们第一反应都是想到索引调优 应该这是最基本的 也是至关重要的&#xff1b;一般工作个两年 索引调优都可以掌握的八九不离十&#xff0c;相关数据结构特点也都能说个一二出来&#xff0c;所以本文重点是讲述其它机制 整体架构 连接器&#xff…

【C++】手动实现String类的封装(分文件编译)

实现了String类的大部分封装&#xff0c;采用分文件编译 //mystring.h #ifndef MYSTRING_H #define MYSTRING_H#include <iostream> #include <cstring> using namespace std;class myString { private:char *str; //定义一个字符串int size; //记录字符串…

比亚迪方程豹携手华为乾崑智驾,加速中国智驾技术向前

近日&#xff0c;比亚迪方程豹与华为乾崑智驾在深圳签署合作协议&#xff0c;中国两大科技巨头强强联合&#xff0c;共同合作开发全球首个硬派专属智能驾驶方案&#xff0c;实现整车智驾深度融合&#xff0c;首发搭载在即将上市的方程豹豹8车型。 比亚迪智驾以自主研发和开放合…

MySQL之数据库基础

目录 一、数据库 1、基本概念 2、常见的数据库 3、MySQL数据库 连接MySQL服务器 数据逻辑存储 二、数据库和表的本质 三、SQL语句 四、服务器&#xff0c;数据库&#xff0c;表的关系 五、存储引擎 查看存储引擎 一、数据库 1、基本概念 一般来说&#xff0c;数据库…