Kafka性能测试初探

news2025/1/21 6:01:25

相信大家对Kafka不会陌生,但首先还是要简单介绍一下。

Kafka是一种高性能的分布式消息系统,由LinkedIn公司开发,用于处理海量的实时数据流。它采用了发布/订阅模式,可以将数据流分发到多个消费者端,同时提供了高可靠性、高吞吐量和低延迟的特性。

Kafka的应用场景非常广泛,例如日志收集、事件流处理、实时监控等。在这些场景中,Kafka可以提供高可靠性和低延迟的数据传输,确保数据的稳定性和实时性。与此同时,Kafka还提供了丰富的API和管理工具,使得用户可以方便地配置和管理Kafka集群。

很多高性能方案都会用到Kafka,今天我来分享如何使用Kafka Client API进行Kafka生产者和消费者压测。

依赖

我用了Gradle创建的项目,依赖配置如下:

compile group: ‘org.apache.kafka’, name: ‘kafka-clients’, version: ‘3.4.0’

kafka服务端

我本地用了Kafka最新版本:kafka_2.12-3.4.0,这个版本可以不依赖zookeeper,非常方便,用来本地功能验证和测试我是十分推荐的。基本做到了开箱即用。

具体的流程可以自行搜索。

生产者压测Demo

在创建生产者时,会有不少的参数需要配置,这里建议使用默认的。或者使用待测试参数组合。下面是我自己的配置,常用的参数我都列了出来。具体参数含义,可以自行搜索,这方面资料还是很多的,下面直接进入压测用例环节。

package com.funtest.kafka


import com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent
import com.funtester.utils.StringUtil
import groovy.util.logging.Log4j2
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer

@Log4j2
class Produce extends SourceCode {

    static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); //所有分区副本都收到确认信息,才能确认写入
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384")
        properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")
        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.name);
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        def topic = "testkafka"
        def test = {
            producer.send(new ProducerRecord<>(topic, StringUtil.getString(10)))
        }
        new FunQpsConcurrent(test,"Kafka测试").start()

        producer.close();
    }

}

这里用到了动态QPS模型,最后的close()也可以不使用,毕竟main方法的代码结束了就真的结束了。

消费者

呼应生产者,消费者也有一堆需要配置的参数。这里先按下不表,有兴趣的可以自行学习。

Kafka消费者有两种订阅消息的方式,分别是订阅模式和分配模式。

订阅模式是指消费者订阅一个或多个主题,然后自动分配分区进行消费。这种模式下,Kafka会自动管理消费者与分区之间的关系,当有新的消费者加入或者退出消费组时,Kafka会自动重新分配分区,保证每个消费者都能够获取到消息。

而分配模式则是由消费者主动向Kafka请求分配指定的分区进行消费。这种模式下,消费者需要手动管理分区与消费者之间的关系,需要注意的是,当有新的消费者加入或者退出消费组时,需要手动重新分配分区。

订阅模式相对于分配模式来说更加简单易用,但是分配模式可以更加灵活地控制消费者与分区之间的关系。所以我选择了订阅模式。

package com.funtest.kafka

import com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer

import java.time.Duration

class Cunsumer extends SourceCode {

    static void main(String[] args) {
        KafkaConsumer<String, String> consumer;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FunTester32");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");
        properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"10000");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
                , "earliest");
        consumer = new KafkaConsumer<>(properties);

        String topic = "testkafka";
//        TopicPartition topicPartition = new TopicPartition(topic, 0);
//        List<TopicPartition> topics = Arrays.asList(topicPartition);
//        consumer.assign(topics);
//        consumer.seekToEnd(topics);
//        long current = consumer.position(topicPartition);
//        consumer.seek(topicPartition, current - 10);//手动设置偏移量
        consumer.subscribe([topic])//订阅模式,不能与assign混用
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            sleep(1.0)
        }

        def test = {
            consumer.poll(Duration.ofMillis(1000));
        }
        new FunQpsConcurrent(test,"Kafka消费").start()
        consumer.close()

    }
}

由于本地机器原因,需要在服务器上启动一个Kafka服务,用来测试不同参数组合情况下Kafka的性能表现。后续有机会再来分享。

最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1227807.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Shell判断:流程控制—if(三)

一、调试脚本 1、调试脚本的其他方法&#xff1a; [rootlocalhost ~] # sh -n useradd.sh 仅调试脚本中的语法错误。 [rootlocalhost ~]# sh -vx useradd.sh 以调试的方式执行&#xff0c;查询整个执行过程。 2、示例&#xff1a; [rootlocalhost ~]# sh -n useradd.sh #调…

OpenCV快速入门:窗口交互

文章目录 前言一、鼠标操作1.1 鼠标操作简介1.2 鼠标事件类型&#xff08;event类型&#xff09;1.3 鼠标事件标志&#xff08;flags&#xff09;1.4 代码示例1.4.1 获取鼠标坐标位置1.4.2 监听鼠标滚轮事件1.4.3 在图像中显示鼠标坐标 二、键盘操作2.1 代码示例2.2 waitKey的等…

坐标系下的运动旋量转换

坐标系下的运动旋量转换 文章目录 坐标系下的运动旋量转换前言一、运动旋量物体运动旋量空间运动旋量 二、伴随变换矩阵三、坐标系下运动旋量的转换四、力旋量五、总结参考资料 前言 对于刚体而言&#xff0c;其角速度可以写为 ω ^ θ ˙ \hat {\omega} \dot \theta ω^θ˙&…

Linux|僵死进程

1.僵死进程产生的原因或者条件: 什么是僵死进程? 当子进程先于父进程结束,父进程没有获取子进程的退出码,此时子进程变成僵死进程. 简而言之,就是子进程先结束,并且父进程没有获取它的退出码; 那么僵死进程产生的原因或者条件就是:子进程先于父进程结束,并且父进程没有获取…

从一到无穷大 #19 TagTree,倒排索引入手是否是优化时序数据库查询的通用方案?

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。 本作品 (李兆龙 博文, 由 李兆龙 创作)&#xff0c;由 李兆龙 确认&#xff0c;转载请注明版权。 文章目录 文章主旨时序数据库查询的一般流程扫描维度聚合时间聚合管控语句 TagTree整体结构索引…

lv11 嵌入式开发 ARM指令集中(伪操作与混合编程) 7

目录 1 伪指令 2 伪操作 3 C和汇编的混合编程 4 ATPCS协议 1 伪指令 本身不是指令&#xff0c;编译器可以将其替换成若干条等效指令 空指令NOP 指令LDR R1, [R2] 将R2指向的内存空间中的数据读取到R1寄存器 伪指令LDR R1, 0x12345678 R1 0x12345678 LDR伪指令可以将任…

本地私域线上线下 线上和线下的小程序

私域商城是一种新型的零售模式&#xff0c;它将传统的线下实体店与线上渠道相结合&#xff0c;通过会员、营销、效率等方式&#xff0c;为消费者提供更加便利和高效的购物体验。私域商城的发展趋势表明&#xff0c;它将成为未来零售业的重要模式&#xff0c;引领零售业的创新和…

各类语言真实性能比较列表

这篇文章是我所做或将要做的所有真实世界性能比较的索引。如果你对想要看到的其他真实世界案例有建议&#xff0c;请在评论中添加。 用例 1 — JWT 验证 & MySQL 查询 该用例包括&#xff1a; 从授权头部获取 JWT验证 JWT 并从声明中获取电子邮件使用电子邮件执行 MySQL…

数据结构:红黑树的插入实现(C++)

个人主页 &#xff1a; 个人主页 个人专栏 &#xff1a; 《数据结构》 《C语言》《C》《Linux》 文章目录 一、红黑树二、红黑树的插入三、代码实现总结 一、红黑树 红黑树的概念&#xff1a; 红黑树是一颗二叉搜索树&#xff0c;但在每个节点上增加一个存储位表示节点的颜色&…

Android图片涂鸦,Kotlin(1)

Android图片涂鸦&#xff0c;Kotlin&#xff08;1&#xff09; import android.content.Context import android.graphics.Canvas import android.graphics.Color import android.graphics.Paint import android.graphics.Path import android.graphics.PointF import android.…

改进YOLOv8:结合ODConv构成C2f_ODConv:即插即用的动态卷积/可轻量化

🗝️YOLOv8实战宝典--星级指南:从入门到精通,您不可错过的技巧   -- 聚焦于YOLO的 最新版本, 对颈部网络改进、添加局部注意力、增加检测头部,实测涨点 💡 深入浅出YOLOv8:我的专业笔记与技术总结   -- YOLOv8轻松上手, 适用技术小白,文章代码齐全,仅需 …

常见树种(贵州省):003柏类

摘要&#xff1a;本专栏树种介绍图片来源于PPBC中国植物图像库&#xff08;下附网址&#xff09;&#xff0c;本文整理仅做交流学习使用&#xff0c;同时便于查找&#xff0c;如有侵权请联系删除。 图片网址&#xff1a;PPBC中国植物图像库——最大的植物分类图片库 一、柏木 …

创新案例|云服务平台HashiCorp是如何构建开源社区实现B2B增长飞轮

社区文化是HashiCorp企业文化的重要组成部分。虽然众多公司声称自己是社区驱动&#xff0c;但实际付诸行动的很少。与众不同的是&#xff0c;HashiCorp从一开始就将社区视为战略方针的核心&#xff0c;这也影响和塑造了公司今天的发展方向。社区不仅是执行策略之一&#xff0c;…

卷积神经网络(CNN)天气识别

文章目录 前期工作1. 设置GPU&#xff08;如果使用的是CPU可以忽略这步&#xff09;我的环境&#xff1a; 2. 导入数据3. 查看数据 二、数据预处理1. 加载数据2. 可视化数据3. 再次检查数据4. 配置数据集 三、构建CNN网络四、编译五、训练模型六、模型评估 前期工作 1. 设置GP…

SDUT OJ《算法分析与设计》贪心算法

A - 汽车加油问题 Description 一辆汽车加满油后可行驶n公里。旅途中有若干个加油站。设计一个有效算法&#xff0c;指出应在哪些加油站停靠加油&#xff0c;使沿途加油次数最少。并证明算法能产生一个最优解。 对于给定的n和k个加油站位置&#xff0c;计算最少加油次数。 I…

uniapp 微信小程序登录 新手专用 引入即可

预览 第一步导入插件 在引入的页面的登录按钮下拷贝一下代码 <template><view class"content"><button type"primary" click"login">微信登录</button></view><TC-WXlogin :wxloginwxlogin /> </templ…

【Java 进阶篇】Ajax 入门:打开前端异步交互的大门

欢迎来到前端异步交互的世界&#xff01;在这篇博客中&#xff0c;我们将深入探讨 Ajax&#xff08;Asynchronous JavaScript and XML&#xff09;&#xff0c;这是一项能够让你的网页在不刷新的情况下与服务器进行数据交互的技术。无论你是刚刚踏入前端开发的小白&#xff0c;…

python-opencv 培训课程作业

python-opencv 培训课程作业 作业一&#xff1a; 第一步&#xff1a;读取 res 下面的 flower.jpg&#xff0c;读取彩图&#xff0c;并用 opencv 展示 第二步&#xff1a;彩图 -> 灰度图 第三步&#xff1a;反转图像&#xff1a;最大图像灰度值减去原图像&#xff0c;即可得…

C语言——2.安装并使用VS

文章目录 1.编译器是什么2.编译器的选择2.1.VS2019/2022 的初步了解2.2.为什么不选择其他编译器呢&#xff1f; 3.编译器的安装过程&#xff08;保姆级别教学&#xff09;3.1.检查电脑版本3.2.下载安装包3.3.选择安装选项3.4.重启电脑3.5.创建账户登录3.6.颜色配置3.7.VS&#…

「Verilog学习笔记」根据状态转移图实现时序电路

专栏前言 本专栏的内容主要是记录本人学习Verilog过程中的一些知识点&#xff0c;刷题网站用的是牛客网 这是一个典型的米利型状态机。三段式即可解决。 米利型状态机&#xff1a;即输出不仅和当前状态有关&#xff0c;也和输入有关。 其中ST0&#xff0c;ST1&#xff0c;ST3的…