python学习-python对kafka的相关操作

news2024/10/11 18:15:18

python对kafka的操作

【1】kafka简介

如果你对kafka有一定的理解可以忽略以下内容

kafka的基础知识如下:

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写,用于处理实时数据流。它以高吞吐量、低延迟和可扩展性等特点而著称,广泛应用于Web数据抓取、日志收集、消息系统等领域。
​
Kafka的主要特点包括:
​
1. 高吞吐量:Kafka能够处理大规模的数据流,每秒可以处理几十万条消息。
​
2. 低延迟:Kafka能够快速地传输消息,通常延迟在毫秒级别。
​
3. 可扩展性:Kafka能够轻松地扩展到多个节点,以满足不同规模和负载的需求。
​
4. 数据持久化:Kafka将数据持久化到磁盘上,可以在数据丢失或节点故障时进行恢复。
​
5. 多语言支持:Kafka支持多种编程语言,包括Java、Scala、Python等。
​
Kafka的核心概念包括:
​
1. Topic:主题,即消息的类别或主题。
​
2. Producer:生产者,用于向Kafka发送消息。
​
3. Consumer:消费者,用于从Kafka接收消息。
​
4. Broker:代理,Kafka节点的服务器。
​
5. Partition:分区,一个主题可以被分为多个分区,每个分区可以在不同的节点上。
​
6. Offset:偏移量,每个分区中的每个消息都有一个唯一的偏移量。
​
7. ZooKeeper:Kafka使用ZooKeeper来管理集群中的各个节点。
​
Kafka的应用场景包括:
​
1. 实时数据流处理:Kafka能够快速地传输和处理大规模的实时数据流,适用于实时数据分析、实时监控等场景。
​
2. 日志收集:Kafka能够快速地收集和处理大量的日志数据,适用于日志分析、日志管理等场景。
​
3. 消息系统:Kafka能够快速地传输消息,适用于消息推送、消息队列等场景。
​
总之,Kafka是一个高性能、高可靠、可扩展的流处理平台,适用于处理实时数据流、日志收集、消息系统等场景。

开始使用kafka是,我先来描述一下我的配置文件

kafka:
  brokers: 'kafka46:9092,kafka47:9092,kafka48:9092,kafka49:9092'
  zookeeper_hosts: 'kafka46:2181,kafka47:2181,kafka48/kafka'
  topic: srvdbMessage
  group_id: message_to_srvdb-20210810165601
  # 初始kafka消费偏移: smallest - 从最早消费; largest - 从最新消费
  offset_reset: largest

【2】创建消费者消费数据

简单说一下,想消费同一个topic里面的数据两次的话,可以用不同的group_id去消费

    from confluent_kafka import Consumer
    from confluent_kafka import KafkaError
    def get_message(self):
        def print_assignment(consumer, partitions):
​
            logging.info("Assignment: {}".format(partitions))
​
        def print_revoke(consumer, partitions):
            logging.info("Revoke: {}".format(partitions))
        logging.info("Initialize kafka consumer.")
        consumer_conf = {
            'bootstrap.servers': self.server_config['kafka_coach']['brokers'],
            'group.id': self.server_config['kafka_coach']['group_id'],
            'enable.auto.commit': 'true',
            'default.topic.config': {
                'auto.offset.reset': self.server_config['kafka_coach']['offset_reset']
            }
        }
        consumer = Consumer(consumer_conf)
        consumer.subscribe([self.server_config['kafka_coach']['topic']],
                           on_assign=print_assignment,
                           on_revoke=print_revoke)
        logging.info("start analysis")
​
        while True:
            message = consumer.poll(timeout=1.0)
            if message is None:
                if not consumer.assignment():
                    logging.error("Partition is not assignment. ")
                time.sleep(0.1)
                continue
​
            message_partition = message.partition()
            message_offset = message.offset()
            if message.error():
                if message.error().code() == KafkaError._PARTITION_EOF:
                    # logger.info('partition: %d reached end at offset %d.', partition, offset)
                    pass
                else:
                    logging.error("kafka consumer error! {}".format(message.error()))
                continue
            message_value = message.value()
            
            if message_value:
                print("消费到的数据是:{}".format(message_value))
【3】kafka的数据生产
    from confluent_kafka import Producer
    def save_result_to_kafka(self):
        """将数据保存到kafka"""
        def delivery_callback(err, msg):
            if err:
                logging.error('Message failed delivery: %s' % err)
        producer_conf = {
            'bootstrap.servers': self.config['kafka']['brokers'])
        }
        while 1:
            try:
                producer = Producer(**producer_conf)
                while 1:
                    result = {"key":123}
                    try:
                        producer.produce(
                            self.config['kafka']['topic_in'], json.dumps(result),
                            callback=delivery_callback)
                    except Exception as e:
                        logging.exception('保存kafka时错误: %s', str(e))
                        self.result_queen.put(result)
                        time.sleep(0.1)
                        producer.poll(0)
            except Exception as e:
                logging.exception('连接kafka时错误: %s', str(e))
                time.sleep(10)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2205427.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

7. BBP 控制板首次运行步骤说明

7.1. 概述 BBP 控制板在硬件焊接安装完成之后, 就可以通过烧写器将控制程序烧写到控制板的 MCU 上, 此步骤与通常的stm32单片机程序烧写完全一致。 烧写完成在程序能够正常运行后, 还需要执行至少一次机载参数初始化工作. 在后续调试及开发过程中, 如果参数系统内容未发生改变…

让你一看就懂,Javascript的原型和原型链

自己的话: 你是否看过很多关于javascript的原型,和原型链的技术文章?但是看完后仍然是一头雾水? 没关系,希望我的这一篇文章,能让你一看就懂。 原型和原型链是JavaScript实现继承和共享属性的重要机制。…

零样本主题驱动图像生成新方法!EZIGen:在保持灵活性的同时保留主题身份!

今天给大家介绍一个零样本主题驱动图像生成方法EZIGen,它会从你提供的主体图像中提取出重要特征,就像是给图像做一个“身份识别”,确保生成的新图像能够保留主体的独特特征。接下来,EZIGen会根据你输入的文本提示,生成…

怎么高效恢复硬盘和u盘里的数据:全面指南

在数字时代,数据已成为我们生活与工作中不可或缺的一部分。无论是个人珍贵的照片、视频,还是企业至关重要的文档、项目资料,一旦硬盘或U盘中的数据丢失,都可能带来不可估量的损失。幸运的是,随着技术的发展&#xff0c…

.Net基础1

.NET框架 项目结构 Connected Services是第三方服务MVC框架appsettings.json配置文件Program.cs控制台应用程序Properties里的json文件是配置启动方式 1. 基本开发 出现这个bug是因为防火墙没有把浏览器加入白名单,可以暂时先用http启动代替 第一步创建控制器&am…

希亦超声波清洗机是智商税吗?百元级超声波清洗机旗舰机皇真相大揭秘!

在深入探讨这个问题之前,我们先来了解一下超声波清洗机的工作原理。超声波清洗机利用高频振动波,通过液体介质传递能量,产生无数微小的气泡。这些气泡在压力作用下迅速闭合,形成强大的冲击波,能够深入清洁物品表面难以…

苹果秋季盛典:iPhone 16系列引领未来科技潮流

9月10日,苹果公司在众人瞩目中举办了2024年的秋季特别活动,发布了备受期待的iPhone 16系列。 尽管网络发布会已经持续了一整年,但熬夜观看的果粉们仍然热情不减,因为每一次苹果的新品发布都代表着科技界的一次重大飞跃。 iPhone …

汽车销量预测系统

项目介绍 此项目服务于汽车经销商、汽车生产商,旨在成为用于使企业充分了解消费者诉求,预见市场未来的需求量和可能存在的销售变化趋势,合理规划产能,正确制定生产计划,实施以销定产的生产策略的交流平台,…

应对专利过期的有效方法与补救措施

专利作为创新成果的重要保护手段,在一定期限内为所有者提供了独家的权利。然而,当专利过期时,情况会变得较为复杂,需要采取不同的应对方法,以下将分别针对忘记缴纳年费以及保护期限届满这两种常见情况进行要点解析。 一…

100V调光芯片SL8701 支持PWM/模拟调光 无频闪 多路共阳 高辉度65536级

一、产品概述 SL8701是一款内置100V MOS的降压型高调光比LED恒流驱动芯片,专为智能调光调色照明研发设计。它支持多种调光方式,包括PWM调光、模拟调光等,能够实现高调光比,满足不同场景的照明需求。 二、主要特点 高效降压&…

银行流水获取方式(二)

银行流水获取方式 摘要: 本文探讨了银行流水在企业财务管理中的重要性及其获取方式。银行流水是企业财务活动的关键记录,涵盖了所有资金流动情况,对日常运营、财务管理、税务申报和审计至关重要。企业通过核对银行流水确保账务准确性&#…

软测实验:熟悉功能测试工具

实验背景:理解自动化测试原理和方法,熟悉功能测试工具的使用。 实验目的: 熟悉功能测试工具的基本使用方法熟悉功能测试的基本流程能够根据测试结果撰写测试报告 一、测试需求 自动化测试原理是通过使用自动化测试工具和脚本来模拟人工测…

python 共享内存(注册、写入、读取)

import sys from PyQt5.QtWidgets import * from PyQt5.QtCore import * from UI.ui_shareMmap import Ui_ShareMServiceDlg # 导入UI类 import mmapclass QMainDialog(QDialog, Ui_ShareMServiceDlg): # 修改点(UI类)def __init__(self, parentNone):…

数据库文档编写流程

在一个系统中新增一个模块,通常不是一个人能够独立完成的。这需要多个团队甚至两个组的共同合作。例如,如果我们想在设备管理系统中添加一个IT资产管理模块,领导不会简单地说:“喂,你给我加一个IT资产管理模块。”直接…

如何做独立站将产品卖到国外?从零开始打造你的全球电商帝国

近年来,跨境电商发展迅猛,为卖家提供了广阔的市场空间。相比于传统跨境电商平台模式,独立站模式拥有更大的自主权和灵活性,卖家可以打造专属的品牌形象,并根据自身需求定制营销策略。 如果你也想通过独立站将产品卖到…

在培训考试小程序页面弹出半屏的弹窗交互实践

如果在页面内进行复杂的界面设计(如在页面内弹出半屏的弹窗、在页面内加载一个全屏的子页面等),用户进行返回操作会直接离开当前页面,不符合用户预期,预期应为关闭当前弹出的组件。 为此提供“假页”容器组件page-con…

python爬虫 - 深入正则表达式

🌈个人主页:https://blog.csdn.net/2401_86688088?typeblog 🔥 系列专栏:https://blog.csdn.net/2401_86688088/category_12797772.html 目录 前言 一、匹配多个字符 (一)匹配任意多个字符 &#xff0…

Java学习-JVM

目录 1. 基本常识 1.1 JVM是什么 1.2 JVM架构图 1.3 Java技术体系 1.4 Java与JVM的关系 2. 类加载系统 2.1 类加载器种类 2.2 执行顺序 2.3 类加载四个时机 2.4 生命周期 2.5 类加载途径 2.6 双亲委派模型 3. 运行时数据区 3.1 运行时数据区构成 3.2 堆 3.3 栈…

adaptor lora基础

https://www.zhihu.com/question/508658141/answer/3340979311 adaptor和PEFT的区别:前者在模型子层后加一个小型的dense;后者直接稀疏化模型本身; Loading Pre-Trained Adapters — AdapterHub documentation CVPR 2024 | SD-DiT&#xff…

五分钟带你零基础入门跨境电商独立站,干货速递!

对于跨境电商卖家来说,多平台、多站点的布局是非常重要的战略。这样做可以规避”鸡蛋放在同一个篮子里”的风险也能够追求更高的销售额和利润。同时,市场的变化也带来了新的发展机会,因此很多出海企业都希望抓住独立站的新机遇,抢…