Python消费Kafka与优化

news2025/1/9 2:30:56

一.背景与问题

     之前使用kafka-python库进行消费数据处理业务逻辑,但是没有深入里面的一些细节,导致会遇到一些坑。正常普通我们常见的一个消费者代码:(假设topic的分区数是20个)

from kafka import KafkaConsumer

bootstrap_servers = ['localhost:9092']
group_id = 'python-consumer'
consumer = KafkaConsumer(topic='test', bootstrap_servers=bootstrap_servers,  group_id=group_id)
for msg in consumer:
    s = msg.value.decode(encoding='utf-8')
    print('从kafka获取到的数据: ' + s)

   这个代码本身没什么问题,消费数据都正常。但是消费能力随着数据写入量的增加,消费者消费能力跟不上,导致数据堆积严重。那到此我们先来回顾一下topic和分区相关知识。

   1.topic是一个逻辑概念,数据实际存储是落在分区上的,所有的物理分区属于同一个topic。 每个topic的分区内的队列是有序的,但是不能保证某一条消息在全局是有序的。 一个topic有多个分区,能够解决其中的一个问题就是可以提高数据写入和消费能力。

   2.一个消费组Group有多个消费者consumer,一个consumer可以消费一个topic里面的多个分区,但是一个topic分区只能被一个Group里面的一个消费者进行消费。

二.多进程消费

  通过回顾的知识,我们理解了开篇的实例代码。  这个代码阐述的消费进程数和topic分区关系是:

   1个进程消费了topic(test)的20个分区.  消费得到的msg对象,大家可以打印出所属分区id以及offset。

   那也就很好解释了为什么随着生产者生产能力的增加,消费者为什么消费能力上不去。一个消费者同时消费20个分区的数据,消费能力自然上不来。 那此时我们可以使用多进程进行消费.那又有2个问题需要考虑:

   1. 为什么不开启多进程而是多线程? 

        由于针对消费者线程安全问题, kafka-python库推荐使用多进程而非多线程:

        GitHub - dpkp/kafka-python: Python client for Apache Kafka

 

  2. 开启多少个进程合适呢? 消费进程越多消费能力越强?

      非也.  消费者进程的数量并不是越多越好,也不是越少越好。 此时,我们通过上面的回顾知道,相同Group里面的一个consumer消费者进程可以消费同一个topic的多个分区,但是一个分区只能被一个消费者consumer消费。 针对20个分区,也就意味着我们最多只有20个消费者进程与之对应,一个分区能够被一个消费者消费,多出来的消费者进程是无法被kafka分配到消费"权利"的,那就意味着多出来的进程只能处于空转状态,白白消耗系统CPU和资源。

      所以理想状态下,我们希望消费者consumer进程数量=分区数量,并且是一个消费者消费一个分区,此时消费效果应该较为合适。

        此时我们针对代码修改,首先连接kafka,查询topic的分区数,然后开启和分区数一样多的进程。之后,我们针对每个分区分配一个消费者进程进行消费,每个进程维护自己消费分区的offset.

from kafka import KafkaConsumer, KafkaAdminClient, TopicPartition
from multiprocessing import Process
import sys


def run(topic_name, partition_id, group_id):
    consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
                             max_poll_records=500,  # 每次poll最大的记录数
                             max_poll_interval_ms=30000,  # 两次间隔poll最大间隔时间ms
                             heartbeat_interval_ms=10000,  # 每次心跳间隔ms
                             session_timeout_ms=20000,  # 心跳会话超时时间ms
                             group_id=group_id
                             )
    # 为进程指定消费的topic名称、分区id
    consumer.assign([TopicPartition(topic_name, partition_id)])
    while True:
        messages = consumer.poll(timeout_ms=1000)  # 拉取每次超时时间, 如果1000ms内拉到500条记录则直接返回,如果超过了1000ms拉不到500条也返回
        for tp, messages_list in messages.items():
            for message in messages_list:
                print(message.value.decode(), message.offset)


if __name__ == '__main__':

    # 获取分区数
    bootstrap_servers = ['localhost:9092']
    adminClient = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    topic = 'test'
    topics = ['test']
    group_id = 'python-consumer'
    tps = adminClient.describe_topics()
    partitions_len = len(tps[0]['partitions'])

    if partitions_len <= 0:
        print("topic 分区数 <= 0")
        sys.exit(1)

    # 开启多进程, 进程数 = 分区数
    processes = []
    for i in range(partitions_len):
        p = Process(target=run, args=(topic, i, group_id,))
        p.start()
        processes.append(p)

    # 等待所有进程结束
    for p in processes:
        p.join()

     经过我们使用多进程的方式分别消费对应的分区数据,消费者能力一下子比原来消费能力翻了几倍甚至几十倍。   所以针对数据写入量大,同时消费能力也要跟得上的情况下,我们针对topic要设置多分区、启动多个消费者,但是消费者的数量尽量和topic分区数保持一样,这样每个进程就能消费对应一个分区,提高了资源的利用率和消费能力!

    其他编程语言也是一样的原理,这个和实现的编程语言没关系,道理都是相同的。如果使用Golang编写,那么这里的多进程我们可以换成多协程实现都可以,具体情况具体分析即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/530189.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vim命令大全,非常详细,强烈建议收藏!

Vim是一款常用的文本编辑器&#xff0c;具有强大的功能和高度的可定制性。在本文中&#xff0c;我们将详细介绍Vim的常用命令&#xff0c;并提供相关的示例。如果您是初学者或已经熟练使用Vim&#xff0c;这篇文章都能为您提供帮助。 基本命令 以下是一些基本的Vim命令&#x…

一文足矣:Unity行为树

目录 前言 unity行为树简介 一个简单的敌人AI 正文 个人对行为树的理解 有限状态机与行为树 基本框架 BTNode DataBase 行为树入口 行为树的事件GraphEvent 发送事件 监听事件 脚本发送事件 行为树的管理&操作 一、操作单颗树 二、管理所有树 自定义Task任务 …

python字符串的三种定义方式

之前我们讲过 一些字符串的定义 但当时是说 被双引号包裹的就是字符串 其实并不是特别严谨 这个叫双引号的定义方式 也没错 也只有字符串会被双引号包裹 但还有其他的定义方式 这里 还是先说答案 三种定义方式分别是 单引号定义 双引号定义 三引号定义 参考代码如下 #单引定义…

《点云处理算法》——GROR配准

GROR配准方法&#xff08;实时性挺好&#xff09; 一、 效果展示二、VS运行2.1 github源码下载2.2 编译运行 三、后续集成 一、 效果展示 二、VS运行 最近和小伙伴交流&#xff0c;他发现一个好用的配准方法&#xff0c;放在这里实现一下 2.1 github源码下载 gror 2.2 编译…

hexo,typecho,wordpress,hugo的官网下载及介绍

Typecho Typecho是一个轻量级的PHP博客系统&#xff0c;它的优点在于易于安装、使用和管理。Typecho使用MySQL数据库来存储文章和评论&#xff0c;同时支持主题和插件的自定义。Typecho适用于个人博客、技术博客等&#xff0c;因为它的易用性和可扩展性较高。 WordPress Word…

分析SpringBoot 底层机制【Tomcat 启动分析+Spring 容器初始化+Tomcat 如何关联Spring 容器之手动实现

分析SpringBoot 底层机制【Tomcat 启动分析Spring 容器初始化Tomcat 如何关联Spring 容器之手动实现 目录 分析SpringBoot 底层机制【Tomcat 启动分析Spring 容器初始化Tomcat 如何关联Spring 容器之手动实现 实现任务阶段1- 创建Tomcat, 并启动 说明: 分析代码实现 修改…

Android源码之Application与Activity创建时机分析

前言 我们知道App进程是由SystemServer启动的Android启动流程 那App对应的Application以及第一个Activity又是如何创建的呢&#xff1f; 源码分析(API 30为例) 我们从ActivityThread.main函数入手&#xff1b; public static void main(String[] args) {...ActivityThread t…

第八章结构型模式—装饰者模式

文章目录 装饰者模式解决的问题概念结构 案例使用装配者进行改进 使用场景JDK源码分析 静态代理和装饰者的区别 结构型模式描述如何将类或对象按某种布局组成更大的结构&#xff0c;有以下两种&#xff1a; 类结构型模式&#xff1a;采用继承机制来组织接口和类。对象结构型模式…

【Linux】volatile | SIGCHLD | 多线程概念

文章目录 1. volatile编译器优化 2.SIGCHLD信号验证SIGCHLD的存在 3. 多线程多线程概念理解概念什么是多线程调度成本低局部性原理 什么叫做进程 1. volatile 在vscode中&#xff0c;创建signal.c文件 故意在while中没有写代码块&#xff0c;让编译器认为在main中&#xff0c;…

爬虫+可视化 | 动态展示2020东京奥运会奖牌世界分布

文章目录 前言1. 导入模块2. 数据爬取3. 地图展示 3.1 2020东京奥运会奖牌数世界分布3.2 2020东京奥运会金牌世界分布3.3 2020东京奥运会金、银、铜世界分布 前言 2020东京奥运会已落下帷幕&#xff0c;中国军团共获得88枚奖牌&#xff0c;其中38枚金牌、32枚银牌、18枚铜牌…

基于RV1126平台检测模型全流程部署(附工程)

基于RV1126平台检测模型全流程部署 模型训练ONNX导出ONNX模型简化Python部署C部署 本工程地址&#xff1a;https://github.com/liuyuan000/Rv1126_YOLOv5-Lite 模型训练 这次选用的是方便部署的YOLOv5 Lite模型&#xff0c;是一种更轻更快易于部署的YOLOv5&#xff0c;主要摘…

嵌入式通信协议【Modbus】modbus RTU的帧格式

modbus的帧格式 设备地址功能代码数据格式CRC校验LCRC校验H8bit8bitN*8bit8bit8bit 1 主机对从机单个寄存器写数据操作&#xff08;0x06&#xff09; 从机地址功能代码数据格式&#xff08;数据地址&#xff09;数据格式&#xff08;数据&#xff09;CRC校验LCRC校验H010600…

动态规划:万变不离其宗,带你吃透股票系列问题

前言&#xff1a; 对于买卖股票问题而言&#xff0c;最关键的是我们对问题的处理方式&#xff08;对于每一天而言&#xff0c;我们应该描述当天买入卖出还是只描述每天股票的只有或者不持有的状态呢&#xff1f;&#xff09;我们应该描述每天股票是否持有的状态&#xff0c;因…

中科院发布多模态 ChatGPT,图片、语言、视频都可以 Chat ?中文多模态大模型力作

作者 | 小戏、ZenMoore 在 GPT-4 的发布报道上&#xff0c;GPT-4 的多模态能力让人印象深刻&#xff0c;它可以理解图片内容给出图片描述&#xff0c;甚至能在图片内容的基础上理解其中的隐喻或推断下一时刻的发展。无疑&#xff0c;面向所谓的 AGI&#xff08;通用人工智能&am…

数据结构初阶(1)(一些学习数据结构所需掌握的先导知识:包装类、装箱与拆箱、泛型、List简介)

包装类 基本数据类型和包装类是Java中处理数据的两种不同方式。 基本数据类型&#xff08;Primitive Types&#xff09;&#xff1a; Java的基本数据类型是直接存储数据的原始类型&#xff0c;包括以下8种类型&#xff1a; byte&#xff1a;1字节&#xff0c;用于表示整数 …

IEEE编写LaTeX时在作者后添加ORCID标志及链接(简单方案,一行代码)

IEEE的一些论文&#xff0c;如Trans系列惯例是要在作者后添加ORCID标志及链接&#xff0c;但是其How to里面没有相关latex代码案例。 1. 可以用但复杂的方案 CSDN中不少博主也给出了挺漂亮但是比较复杂的方案&#xff0c;如这个的一大串&#xff1a; \documentclass[letters…

Linux文本之awk编译器

一、awk介绍 1&#xff09;awk概述 AWK 是一种用于处理文本的编程语言工具。AWK 在很多方面类似于 shell 编程语言&#xff0c;尽管 AWK 具有完全属于其本身的语法。它的设计思想来源于 SNOBOL4 、sed 、Marc Rochkind设计的有效性语言、语言工具 yacc 和 lex &#xff0c;当…

尚硅谷大数据技术NiFi教程-笔记02【NiFi(使用案例,同步文件、离线同步mysql数据到hdfs、实时监控kafka数据到hdfs)】

尚硅谷大数据技术-教程学习路线-笔记汇总表【课程资料下载】 视频地址&#xff1a;尚硅谷大数据NiFi教程&#xff08;从部署到开发&#xff09;_哔哩哔哩_bilibili 尚硅谷大数据技术NiFi教程-笔记01【NiFi&#xff08;基本概念、安装、使用&#xff09;】尚硅谷大数据技术NiFi教…

探索古文明,玛雅文明衰落的原因

说起玛雅文明&#xff0c;大家在各种小说或者电影中或多或少的都有听说过&#xff0c;那么这个文明到底是怎么一回事呢&#xff1f;今天老铁就带大家好好的了解下。 玛雅文明存在的时间大致是在公元前2000年至公元1500年之间&#xff0c;这个文明见证了中美洲地区的一段辉煌的…

Cefsharp109.1.110(winfrom)最新支持H264-MP3-MP4功能体验,导出pdf和下载方法有变调整

最新版的支持H264版本(109.1.11,109.1.18)5154分支,也是win7/8/8.1最后一个支持版本 此贴仅分项版本变化和注意事项,本篇文章不提供dll编译文件,有需要单独联系,仅供学习参考 109版本体验测试(音频和视频功能),版本较100.0.230变化提醒及注意变更的内容。 上视频支…