Python Kafka客户端性能测试比较

news2025/1/16 7:47:49

前言

    由于工作原因使用到了 Kafka,而现有的代码并不能满足性能需求,所以需要开发高效读写 Kafka 的工具,本文是一个 Python Kafka Client 的性能测试记录,通过本次测试,可以知道选用什么第三方库的性能最高,选用什么编程模型开发出来的工具效率最高。

第三方库性能测试

1.第三方库

    此次测试的是三个主要的 Python Kafka Client:pykafka、kafka-python 和 confluent-kafka,具体介绍见官网:

  • pykafka:pykafka · PyPI
  • kafka-python:kafka-python · PyPI
  • confluent_kafka:confluent-kafka · PyPI

测试环境

    此次测试使用的 Python 版本是2.7,第三方库的版本为:

  • pykafka:2.8.0
  • kafka-python:2.0.2
  • confluent-kafka:1.5.0

    使用的数据总量有50万,每条数据大小为2KB,总共为966MB。

测试过程

(1)Kafka Producer 测试

  分别使用 pykafka、kafka-python 和 confluent-kafka 实例化一个 Kafka 的 Producer 对象,然后调用相应的 produce 方法将数据推送给 Kafka,数据总条数为50万,比较三个库所耗费的时间,并计算每秒钟可以推送的数据条数和大小,比较得出性能最优的。

  代码示例(以 pykafka 为例):

import sys
from datetime import datetime
from pykafka import KafkaClient


class KafkaProducerTool():
    def __init__(self, broker, topic):
        client = KafkaClient(hosts=broker)
        self.topic = client.topics[topic]
        self.producer = self.topic.get_producer()

    def send_msg(self, msg):
        self.producer.produce(msg)


if __name__ == '__main__':
    producer = KafkaProducerTool(broker, topic)
    print(datetime.now())
    for line in sys.stdin:
        producer.send_msg(line.strip())
    producer.producer.stop()
    print(datetime.now())

(2)Kafka Consumer 测试

  分别使用 pykafka、kafka-python 和 confluent-kafka 实例化一个 Kafka 的 Consumer 对象,然后调用相应的 consume 方法从 Kafka 中消费数据,要消费下来的数据总条数为50万,比较三个库所耗费的时间,并计算每秒钟可以消费的数据条数和大小,比较得出性能最优的。

  代码示例(以 pykafka 为例):

from datetime import datetime
from pykafka import KafkaClient


class KafkaConsumerTool():
    def __init__(self, broker, topic):
        client = KafkaClient(hosts=broker)
        self.topic = client.topics[topic]
        self.consumer = self.topic.get_simple_consumer()

    def receive_msg(self):
        count = 0
        print(datetime.now())
        while True:
            msg = self.consumer.consume()
            if msg:
                count += 1
            if count == 500000:
                print(datetime.now())
                return


if __name__ == '__main__':
    consumer = KafkaConsumerTool(broker, topic)
    consumer.receive_msg()
    consumer.consumer.stop()

测试结果

  • Kafka Producer 测试结果:
总耗时/秒每秒数据量/MB每秒数据条数
confluent_kafka3527.9014285.71
pykafka5019.5310000
kafka-python5321.83939.85
  • Kafka Consumer 测试结果:
总耗时/秒每秒数据量/MB每秒数据条数
confluent_kafka3925.0412820.51
kafka-python5218.789615.38
pykafka3352.921492.54

测试结论

  经过测试,在此次测试的三个库中,生产消息的效率排名是:confluent-kafka > pykafka > kafka-python,消费消息的效率排名是:confluent-kafka > kafka-python > pykafka,由此可见 confluent-kafka 的性能是其中最优的,因而选用这个库进行后续开发。

多线程模型性能测试

编程模型

  经过前面的测试已经知道 confluent-kafka 这个库的性能是很优秀的了,但如果还需要更高的效率,应该怎么办呢?当单线程(或者单进程)不能满足需求时,我们很容易想到使用多线程(或者多进程)来增加并发提高效率,考虑到线程的资源消耗比进程少,所以打算选用多线程来进行开发。那么多线程消费 Kafka 有什么实现方式呢?我想到的有两种:

1. 一个线程实现一个 Kafka Consumer,最多可以有 n 个线程同时消费 Topic(其中 n 是该 Topic 下的分区数量);

2. 多个线程共用一个 Kafka Consumer,此时也可以实例化多个 Consumer 同时消费。

对比这两种多线程模型:

  • 模型1实现方便,可以保证每个分区有序消费,但 Partition 数量会限制消费能力;
  • 模型2并发度高,可扩展能力强,消费能力不受 Partition 限制。

测试过程

(1)多线程模型1

  测试代码:

import time
from threading import Thread
from datetime import datetime
from confluent_kafka import Consumer


class ChildThread(Thread):
    def __init__(self, name, broker, topic):
        Thread.__init__(self, name=name)
        self.con = KafkaConsumerTool(broker, topic)

    def run(self):
        self.con.receive_msg()


class KafkaConsumerTool:
    def __init__(self, broker, topic):
        config = {
            'bootstrap.servers': broker,
            'session.timeout.ms': 30000,
            'auto.offset.reset': 'earliest',
            'api.version.request': False,
            'broker.version.fallback': '2.6.0',
            'group.id': 'test'
        }
        self.consumer = Consumer(config)
        self.topic = topic

    def receive_msg(self):
        self.consumer.subscribe([self.topic])
        print(datetime.now())
        while True:
            msg = self.consumer.poll(timeout=30.0)
            print(msg)


if __name__ == '__main__':
    thread_num = 10
    threads = [ChildThread("thread_" + str(i + 1), broker, topic) for i in ge(thread_num)]

    for i in range(thread_num):
        threads[i].setDaemon(True)
    for i in range(thread_num):
        threads[i].start()

  因为我使用的 Topic 共有8个分区,所以我分别测试了线程数在5个、8个和10个时消费50万数据所需要的时间,并计算每秒可消费的数据条数。

(2)多线程模型2

  测试代码:

import time
from datetime import datetime
from confluent_kafka import Consumer
from threadpool import ThreadPool, makeRequests


class KafkaConsumerTool:
    def __init__(self, broker, topic):
        config = {
            'bootstrap.servers': broker,
            'session.timeout.ms': 30000,
            'auto.offset.reset': 'earliest',
            'api.version.request': False,
            'broker.version.fallback': '2.6.0',
            'group.id': 'mini-spider'
        }
        self.consumer = Consumer(config)
        self.topic = topic

    def receive_msg(self, x):
        self.consumer.subscribe([self.topic])
        print(datetime.now())
        while True:
            msg = self.consumer.poll(timeout=30.0)
            print(msg)


if __name__ == '__main__':
    thread_num = 10
    consumer = KafkaConsumerTool(broker, topic)
    pool = ThreadPool(thread_num)
    for r in makeRequests(consumer.receive_msg, [i for i in range(thread_num)]):
        pool.putRequest(r)
    pool.wait()

  主要使用 threadpool 这个第三方库来实现线程池,此处当然也可以使用其他库来实现,这里我分别测试了线程数量在5个和10个时消费50万数据所需要的时间,并计算每秒可消费的数据条数。

测试结果

  • 多线程模型1
 总数据量/万线程数量总耗时/秒每秒数据条数
5052718518.51
5082420833.33
50102619230.76
  • 多线程模型2
  总数据量/万线程数量总耗时/秒每秒数据条数
5051729411.76
50101338461.53

测试结论

  使用多线程可以有效提高 Kafka 的 Consumer 消费数据的效率,而选用线程池共用一个 KafkaConsumer 的消费方式的消费效率更高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/171290.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

umi4+antd5兼容360安全浏览器

项目场景: umi4创建的大屏项目,部分模块使用了antd5进行开发 问题描述 开发完成后,得知客户是360安全浏览器,内核为86,测试过程中出现了样式混乱。 混乱样式有下拉内容的组件(如select、dataPicker&#…

Microsoft 365中的智能应用—翻译、朗读、听写

Microsoft 365 是一种订阅式的跨平台办公软件,基于云平台提供多种服务,通过将Word、Excel、PowerPoint和Outlook、OneNote等应用与OneDrive 和 Microsoft Teams等强大的云服务相结合,让任何人使用任何设备随时随地创建和共享内容。 Microsof…

【JavaScript】仿青柠搜索界面

点击搜索栏&#xff0c;背景模糊&#xff0c;出现图标。点击界面任意处&#xff0c;失去焦点&#xff0c;恢复原样 代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta http-equiv"X…

Elasticsearch高级查询—— 查询所有文档

目录一、初始化文档数据二、查询所有文档示例一、初始化文档数据 在 Postman 中&#xff0c;向 ES 服务器发 POST 请求 &#xff1a;http://localhost:9200/user/_doc/1&#xff0c;请求体内容为&#xff1a; {"name":"张三","age":22,"sex…

Qt的开源库TabToolbar

开源地址&#xff1a;https://github.com/SeriousAlexej/TabToolbar 该库的使用方式有两种&#xff1a; 使用json配置文件配置TabToolBar使用代码构建TabToolBar 编译 项目是使用Qt和CMake管理的&#xff0c;并且在开发的时候使用的是Qt6&#xff0c;我实测通过更改CMake的…

设计模式——代理模式

文章目录引入案例提出问题解决思路遇到的困难代理模式概念生活中的代理相关术语静态代理动态代理织入的概念基于JDK的动态代理基于Cglib的动态代理JDK 和 CGLIB 的区别引入案例 计数器的接口 public interface Calculator {int add(int i, int j);int sub(int i, int j);int …

pandas案例——预处理部分地区数据

数据清洗的任务是过滤那些不符合要求的数据&#xff0c;将过滤的结果交给业务主管部门&#xff0c;确认是否过滤掉还是由业务单位修正之后再进行抽取。不符合要求的数据主要是有不完整的数据、错误的数据、重复的数据三大类。数据清洗是与问卷审核不同&#xff0c;录入后的数据…

使用反射和泛型简化Golang查询数据库代码的方案

大纲Postgresql数组案例常规写法定义结构体查询数据问题反射泛型写法结构体定义接口Tag实现逻辑泛型设计实例化模型结构体获取表名过滤字段组装SQL语句查询遍历读取结果实例化模型结构体组装Scan方法的参数调用Scan方法并保存结果完整代码小结Postgresql数组 Postgresql有个很…

7、操作DOM对象(重点)

核心&#xff1a;浏览器网页就是一个DOM树形结构 更新&#xff1a;更新该DOM节点的内容&#xff0c;相当于更新了该DOM节点表示的HTML的内容&#xff1b; 遍历&#xff1a;遍历该DOM节点下的子节点&#xff0c;以便进行进一步操作&#xff1b; 添加&#xff1a;在该DOM节点下…

Matlab中的dsp.AudioFileReader函数的认识和学习

在Matlab中的dsp.AudioFileReader函数的认识和学习1.描述2.语法2.1 语法描述2.2 属性Properties2.3 举例Stream from audio file 来自音频文件的流 1.描述 dsp.AudioFileReader系统对象™ 从音频文件读取音频样本。 要从音频文件读取音频样本&#xff0c;请执行以下操作&…

小方制药冲刺A股上市:毛利率走低,方之光、鲁爱萍夫妇为实控人

近日&#xff0c;上海小方制药股份有限公司&#xff08;下称“小方制药”&#xff09;公开预披露更新招股书&#xff0c;准备在上海证券交易所主板上市。据贝多财经了解&#xff0c;小方制药于2022年7月1日递交招股书&#xff0c;国信证券为其保荐机构。 本次冲刺上市&#xff…

扫码器:壹码通(EMT 6621)二维码带多个回车换行处理

摘要&#xff1a;二维码运用越来越广泛了&#xff0c;目前在医院中一个二维码可以串联多个系统&#xff0c;二维码的内容也可以设置一些特殊字符去达成系统便捷性。本次遇到为二维码中开头内置了回车和空格&#xff0c;在程序判断为回车(KEY_ENTER)时就会触发业务逻辑&#xff…

mybatis之一级缓存和二级缓存

缓存&#xff1a; 查询需要连接数据库&#xff0c;非常的耗费资源&#xff0c;将一次查询的结果&#xff0c;暂存在一个可以直接取到的地方&#xff0c;我们将其称之为缓存&#xff0c;当我们需要再次查询相同的数据时&#xff0c;直接走缓存这个过程&#xff0c;就不用走数据…

【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)

RabbitMQ工作队列模式为什么要有工作队列模式如何使用工作队列模式轮询消息确认验证消息确认消息持久化公平调度验证公平调度**现在将消费者1中的Thread.sleep(1000)改为Thread.sleep(3000);不添加公平调度相关代码进行测试。**现在将消费者1中的Thread.sleep(1000)改为Thread.…

BC即将登录Coinbase Institutional,2023年以全新姿态出发

以支付为最初定位的加密资产&#xff0c;在支付领域的发展始终停滞不前&#xff0c;尤其是在2022年&#xff0c;加密行业经历了几次“至暗时刻”&#xff0c;导致加密市场资金不断出逃市场全面转熊&#xff0c;越来越多的人对加密资产市场的发展前景失去信心。 而在2021年年底开…

【GD32F427开发板试用】移植CoreMark验证0等待区Flash大小

本篇文章来自极术社区与兆易创新组织的GD32F427开发板评测活动&#xff0c;更多开发板试用活动请关注极术社区网站。作者&#xff1a;Doravmon 引言 非常荣幸能够参与到此次GD32F427开发板试用的活动中来&#xff0c;在拿到开发板之前就翻了翻手册&#xff0c;一直有个疑问困惑…

APM/STM32F072RB基于HAL库配置USB CDC虚拟串口功能

APM/STM32F072RB基于HAL库配置USB CDC虚拟串口功能&#x1f4e2;采用的自制开发板&#xff0c;开源PCB工程详情放在《极海APM32F072RB开发环境测试》✨本案例基于STM32CubeMX工具配置。&#x1f4fa;使用STM32CubeMX工具配置工程改为APMF072RB型号过程如下&#xff1a; ⛳注意…

性能测试实战 | 电商业务的性能测试(一): 必备基础知识

本文为霍格沃兹测试学院优秀学员课程学习系列笔记&#xff0c;想一起系统进阶的同学文末加群交流。 1.1 测试步骤总览 需求分析与测试设计&#xff08;性能需求目标业务模型拆解&#xff09; 测试数据准备和构造(基于模型的数据准备) 性能指标预期(性能需求目标) 发压工具配…

vue2 使用@vue/composition-api依赖包 编译、打包各种报错

vue2 使用vue/composition-api依赖包 编译、打包各种报错问题来源解决办法最近在维护以前&#xff08;大概一年前&#xff09;的项目时&#xff0c;遇到个这种问题&#xff1a; 项目本身是用 vue-cli 创建的 vue 2.x.xx 版本的项目&#xff0c;然后引入 vue/composition-api 依…

MIT6.830-2022-lab5实验思路详细讲解

文章目录前言一、实验背景二、实验正文Exercise 1 &#xff1a;SearchExercise 2 &#xff1a;Insert - Splitting PagesExercise 3 &#xff1a;Delete - Redistributing pagesExercise 4&#xff1a;Delete - Redistributing pages总结前言 Datebase中很重要的一部分就是ind…