Python生产、消费Kafka

news2024/9/24 18:34:05

如果想通过docker安装kafka,可参考
Docker安装Kafka
生产者

import json
import time
import traceback

from datetime import datetime
from kafka import KafkaProducer
from kafka.errors import kafka_errors

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    key_serializer=lambda k: json.dumps(k).encode(),
    value_serializer=lambda v: json.dumps(v).encode())

while True:
    i = datetime.strftime(datetime.now(), '%Y%m%d %H:%M:%S')
    future = producer.send(
        'kafka_demo',
        key='count_num',  # 同一个key值,会被送至同一个分区
        value=str(i),
        partition=0)  # 向分区1发送消息
    print("send {}".format(str(i)))
    time.sleep(3)
    try:
        future.get(timeout=10)  # 监控是否发送成功
    except kafka_errors:  # 发送失败抛出kafka_errors
        traceback.format_exc()

消费者

import json

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'kafka_demo',
    bootstrap_servers=':9092',
    group_id='test'
)
for message in consumer:
    print("receive, key: {}, value: {}".format(
        json.loads(message.key.decode()),
        json.loads(message.value.decode())
    )
    )

分为两个终端页面运行即可:
生产者打印:
在这里插入图片描述
消费者打印:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1243692.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JSP:Servlet

Servlet处理请求过程 B/S请求响应模型 Servlet介绍 JSP是Servlet的一个成功应用,其子集。 JSP页面负责前台用户界面,JavaBean负责后台数据处理,一般的Web应用采用JSPJavaBean就可以设计得很好了。 JSPServletJavaBean是MVC Servlet的核心…

NOIP2003提高组第二轮T3:加分二叉树

题目链接 [NOIP2003 提高组] 加分二叉树 题目描述 设一个 n n n 个节点的二叉树 tree \text{tree} tree 的中序遍历为 ( 1 , 2 , 3 , … , n ) (1,2,3,\ldots,n) (1,2,3,…,n),其中数字 1 , 2 , 3 , … , n 1,2,3,\ldots,n 1,2,3,…,n 为节点编号。每个节点都…

抖音汽车租赁小程序背后的技术挑战与解决方案

随着共享经济的不断发展,抖音上的汽车租赁小程序也逐渐崭露头角。然而,这背后涉及的技术挑战却不容小觑。本文将深入探讨抖音汽车租赁小程序的技术挑战,并提出相应的解决方案。 一、实时位置追踪 汽车租赁小程序的核心在于用户能够实时追踪…

JS中reduce函数的使用

一:函数解释 reduce()是一个对数组中的每个元素按照顺序依次执行自定义函数的方法。 就是遍历数组,每个元素都执行相同的方法。 二:实际应用 accumulator和currentValue分别用acc和cur表示哈,举例如下: 应用例子①…

赛迪生电源充电模块维修CHR-22005 RCU-202A

通信电源维修品牌:英可瑞,许继,艾默生,通合,动力源,九洲,华隆,合欣,泰坦,赛迪生等 直流屏模块故障和解决办法: 1、针对各类变电站直流屏,若显示交流空开跳闸,但并没有动作。应当检查三处地方是否正确: 接线是否正确…

SWT/Jface(2): 表格的编辑

前言 上节说到, 创建和渲染表格需要如下几个步骤: 接收源数据数组(也可以是单个对象或者其他集合类型): TableViewer.setInput(Object)渲染接收的数据 渲染表头: TableViewer.setLabelProvider(IBaseLabelProvider)渲染内容: TableViewer.setContentProvider(IContentProvide…

k8s-pod生命周期 4

容器环境初始化 pod 由pod 镜像来提供,在pod 生命周期里容器主要分为两种:初始化容器和主容器 初始化容器一定要成功运行并退出,当初始化容器运行退出完了之后主容器开始和运行 主容器开始运行的时候,有两个探针:存…

【C++初阶】STL详解(七)Stack与Queue的模拟实现

本专栏内容为:C学习专栏,分为初阶和进阶两部分。 通过本专栏的深入学习,你可以了解并掌握C。 💓博主csdn个人主页:小小unicorn ⏩专栏分类:C 🚚代码仓库:小小unicorn的代码仓库&…

git-3

1.如何让工作区的文件恢复为和暂存区一样? 工作区所作的变更还不及暂存区的变更好,想从暂存区拷贝到工作区,变更工作区(恢复成和暂存区一样的状态),想到用git checkout -- 文件名 2.怎样取消暂存区部分文件的更改? 如…

C语言函数练习(超基础超详细)

ps:题目来源于pta平台。 1. int sum(int m, int n) {int sum0;for(int im; i<n; i){sumi;}return sum; } 2. int max(int a, int b) {if(a>b)return a;else return b; } 3. double dist( double x1, double y1, double x2, double y2 ) {return sqrt((x1-x2)*(x1…

STM32笔记---RTC

目录 一、RTC简介 二、主要特性 三、功能描述 3.1 读RTC寄存器 3.2 配置RTC寄存器 四、BKP简介 五、RTC_Init() 1. 函数BKP_ReadBackupRegister 2.RCC_LSEConfig设置外部低速晶振&#xff08;LSE&#xff09; 3.RTC基本结构 5.RTC_Init()实现 6.time.h 一、R…

靠这份求职指南找工作,稳了!

大家好&#xff0c;我是鱼皮。为了帮助朋友们更好的准备秋招&#xff0c;我们精心汇总整理了 编程导航星球 内鱼友反馈的 200 多个高频求职问题和 150 多篇面经、以及最新秋招企业投递信息表&#xff0c;解答大家的求职困惑。 一、最新秋招投递信息表 目前已汇总整理了 600 多家…

spring 是如何开启事务的, 核心原理是什么

文章目录 spring 是如何开启事务的核心原理1 基于注解开启事务2 基于代码来开启事务 spring 是如何开启事务的 核心原理 Spring事务管理的实现有许多细节&#xff0c;如果对整个接口框架有个大体了解会非常有利于我们理解事务&#xff0c;下面通过讲解Spring的事务接口来了解…

人工智能今天能为你做什么?生成式人工智能如何改变技术文档领域

▲ 搜索“大龙谈智能内容”关注GongZongHao▲ 作者 | Fabrice Lacroix 大型语言模型&#xff08;LLM&#xff09;和生成式人工智能&#xff08;GenAI&#xff09;&#xff0c;尤其是ChatGPT&#xff0c;这些是引领科技革新的新兴技术。它们不仅在科技界引起了轩然大波&#x…

项目经理超能力图鉴

大家好&#xff0c;我是老原。 在项目管理中&#xff0c;项目经理远没有大家想象中的那么光鲜亮丽&#xff0c;反而成为了背锅的代名词。 项目顺利的时候&#xff0c;老板和颜悦色、团队融洽顺利&#xff1b; 项目出问题时&#xff0c;项目经理不得不承担起责任&#xff0c;…

在.bashrc文件修改环境变量的做法

作者&#xff1a;朱金灿 来源&#xff1a;clever101的专栏 为什么大多数人学不会人工智能编程&#xff1f;>>> ~/.bashrc文件是linux下保存环境变量的系统文件。原以为使用sed命令修改.bashrc文件&#xff0c;实际上不行&#xff0c;需要使用echo命令。具体示例如下…

[shader] 光照入门(未完结。。。

反射 漫反射&#xff1a;而当物体表面粗糙时&#xff0c;我们把物体表面看作无数不同方向的微小镜面&#xff0c;则这些镜面反射出的光方向均不相同&#xff0c;这就是漫反射。 高光反射&#xff1a;我们假定物体表面光滑&#xff0c;只有一个镜面&#xff0c;那么所有的光都…

【软件工程师从0到1】- 多态 (知识汇总)

前言 介绍&#xff1a;大家好啊&#xff0c;我是hitzaki辰。 社区&#xff1a;&#xff08;完全免费、欢迎加入&#xff09;日常打卡、学习交流、资源共享的知识星球。 自媒体&#xff1a;我会在b站/抖音更新视频讲解 或 一些纯技术外的分享&#xff0c;账号同名&#xff1a;hi…

使用XHProf查找PHP性能瓶颈

使用XHProf查找PHP性能瓶颈 XHProf是facebook 开发的一个测试php性能的扩展&#xff0c;本文记录了在PHP应用中使用XHProf对PHP进行性能优化&#xff0c;查找性能瓶颈的方法。 下载 网上很多是编译安装xhprof-0.9.4版本&#xff0c;应该是用php5&#xff0c;在php8.0下编译x…

【spring(三)】AOP总结

&#x1f308;键盘敲烂&#xff0c;年薪30万&#x1f308; 目录 一、AOP相关概念 ① AOP核心思想思想&#xff1a; ② AOP专业术语&#xff1a; 二、AOP快速如入门 三、AOP工作流程 四、切入点表达式 ① 语法格式 ②支持通配符 ③书写技巧 五、通知类型 ①⭐环绕通知…