文章目录
- 前言
- 4 Kafka基准测试
-
- 4.1 基于1个分区1个副本的基准测试
- 4.2 基于3个分区1个副本的基准测试
- 4.3 基于1个分区3个副本的基准测试
- 5 Java编程操作Kafka
-
- 5.1 引入依赖
- 5.2 向Kafka发送消息
- 5.3 从Kafka消费消息
- 5.4 异步使用带有回调函数的生产消息
- 6 幂等性
-
- 6.1 幂等性介绍
- 6.2 Kafka幂等性实现原理
- 7 Kafka事务
-
- 7.1 Kafka事务介绍
- 7.2 事务操作API
- 7.3 Kafka事务编程
-
- 7.3.1 需求
- 7.3.2 创建Topic
- 7.3.3 编写生产者
- 7.3.4 创建消费者
- 7.3.5 消费旧Topic数据并生产到新Topic
- 7.3.6 测试
- 7.3.7 模拟异常测试事务
前言
Kafka学习笔记(一)Linux环境基于Zookeeper搭建Kafka集群、Kafka的架构
4 Kafka基准测试
基准测试(benchmark testing)是一种测量和评估软件性能指标的活动。 我们可以通过基准测试,了解到软件、硬件的性能水平,主要测试负载的执行时间、传输速度、吞吐量、资源占用率等。
4.1 基于1个分区1个副本的基准测试
- 1)创建1个分区1个副本的Topic
- 2)生产消息基准测试
bin/kafka-producer-perf-test.sh --topic topic_1_1 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 acks=1
命令解释:
- bin/kafka-producer-perf-test.sh 性能测试脚本
- –topic Topic的名称
- –num-records 指定生产数据量(默认5000W)
- –throughput 指定吞吐量,即限流(-1不指定)
- –record-size record数据大小(字节)
- –producer-props bootstrap.servers 指定Kafka集群地址
- acks=1 ACK模式
执行以上命令,结果如下:
- 3)消费消息基准测试
bin/kafka-consumer-perf-test.sh --broker-list 192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 --topic topic_1_1 --fetch-size 1048576 --messages 5000000 --timeout 100000
命令解释:
- bin/kafka-consumer-perf-test.sh 消费消息基准测试脚本
- –broker-list 集群Broker列表
- –topic Topic的名称
- –fetch-size 每次拉取的数据大小
- –messages 总共要消费的消息个数
- –timeout 超时时间
执行以上命令,结果如下:
4.2 基于3个分区1个副本的基准测试
- 1)创建3个分区1个副本的Topic
- 2)生产消息基准测试
bin/kafka-producer-perf-test.sh --topic topic_3_1 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 acks=1
指标 | 3分区1副本 | 1分区1副本 | 性能(对比1分区1副本) |
---|---|---|---|
吞吐量 | 10900.822140 records/sec | 8994.536718 records/sec | 提升↑ |
吞吐速率 | 10.40 MB/sec | 8.58 MB/sec | 提升↑ |
平均延迟时间 | 2508.37 ms avg latency | 3418.50 ms avg latency | 提升↑ |
最大延迟时间 | 47436.00 ms max latency | 50592.00 ms max latency |
- 3)消费消息基准测试
bin/kafka-consumer-perf-test.sh --broker-list 192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 --topic topic_3_1 --fetch-size 1048576 --messages 5000000 --timeout 100000
指标 | 3分区1副本 | 1分区1副本 | 性能(对比1分区1副本) |
---|---|---|---|
data.consumed.in.MB 共计消费数据量 | 4768.4021 | 4768.3716 | |
MB.sec 每秒消费数据量 | 28.5637 | 21.1589 | 提升↑ |
data.consumed.in.nMsg 共计消费消息数量 | 5000032 | 5000000 | |
nMsg.sec 每秒消费消息数量 | 29951.2517 | 22186.7235 | 提升↑ |
4.3 基于1个分区3个副本的基准测试
- 1)创建1个分区3个副本的Topic
- 2)生产消息基准测试
bin/kafka-producer-perf-test.sh --topic topic_1_3 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 acks=1
指标 | 1分区3副本 | 1分区1副本 | 性能(对比1分区1副本) |
---|---|---|---|
吞吐量 | 4323.273652 records/sec | 8994.536718 records/sec | 下降↓ |
吞吐速率 | 4.12 MB/sec | 8.58 MB/sec | 下降↓ |
平均延迟时间 | 7533.70 ms avg latency | 3418.50 ms avg latency | 下降↓ |
最大延迟时间 | 32871.00 ms max latency | 50592.00 ms max latency |
可见,副本越多,生产消息的性能反而下降。
- 3)消费消息基准测试
bin/kafka-consumer-perf-test.sh --broker-list 192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 --topic topic_1_3 --fetch-size 1048576 --messages 5000000 --timeout 100000
指标 | 1分区3副本 | 1分区1副本 | 性能(对比1分区1副本) |
---|---|---|---|
data.consumed.in.MB 共计消费数据量 | 4768.3716 | 4768.3716 | |
MB.sec 每秒消费数据量 | 46.9504 | 21.1589 | 下降↓ |
data.consumed.in.nMsg 共计消费消息数量 | 5000000 | 5000000 | |
nMsg.sec 每秒消费消息数量 | 49231.0116 | 22186.7235 | 下降↓ |
同样,副本越多,消费消息的性能也下降。
5 Java编程操作Kafka
创建一个Maven项目,测试Java变成操作Kafka。
5.1 引入依赖
<!-- kafka_demo\pom.xml -->
<!-- kafka客户端工具 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
5.2 向Kafka发送消息
public class KafkaProducerTest {
public static void main(String[] args) {
// 1.创建用于连接Kafka的Properties配置
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.245.130:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2.创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 3.调用send发送1-100消息到`my_topic`主题
for(int i = 0; i < 100; ++i) {
try {
// 获取返回值Future,该对象封装了返回值
Future<RecordMetadata> future = producer.send(new ProducerRecord<>("my_topic", null, i + ""));
// 调用一个Future.get()方法等待响应
future.get