Kafka集群调优

news2025/1/10 17:21:12

一、前言

我们需要对4个规格的kafka能力进行探底,即其可以承载的最大吞吐;4个规格对应的单节点的配置如下:

  • 标准版: 2C4G
  • 铂金版: 4C8G
  • 专业版: 8C16G
  • 企业版: 16C32G

另外,一般来讲,在同配置下,kafka的读性能是要优于写性能的,写操作时,数据要从网卡拷贝至堆内存,然后进行一堆数据校验、解析后,会将数据拷贝至堆外内存,然后再拷贝至操作系统的page cache,最后操作系统异步刷盘至设备中。而读操作时,kafka使用了零拷贝技术,数据会从disk或page cache直接拷贝到网卡,节省了大量的内存拷贝。因此我们这次探底将聚焦于链路的短板,即kafka的写操作进行压测

注:本文不是专业的压测报告,而是针对不同集群调优,以获取其最大的吞吐能力

二、磁盘能力探底

在真正开始对kafka压测前,我们首先对磁盘的能力进行一个摸底。因为kafka是典型的数据型应用,是强依赖磁盘性能的,一旦有了这个数据,那么这个就是kafka的性能天花板。如果磁盘是传统的机械磁盘,那么瓶颈毫无悬念一般都会落在磁盘上;但如果磁盘类型是SSD,而且性能很高的话,操作系统会极力压榨cpu,从而获取一个最大刷盘吞吐,因此瓶颈在哪就很难讲了

要测试磁盘吞吐的话,2个变参的影响较大:

  1. 单次写入磁盘的数据量
  2. 写盘的线程数

2.1、单次刷盘大小

现在的硬件厂商,对于磁盘的优化,基本上都是4K对齐的,因此我们的参数一般也要设置为4K的整数倍,例如4K\8K\16K... 如果单次写入量小于4K,例如只写了10byte,那么底层刷盘的时候,也会刷4K的量,这就是臭名昭著的写放大

而具体单次写多少数据量能达到最优呢? 这就需要不断的benchmark了

2.2、刷盘线程数

我们知道kafka的broker通过参数num.io.threads来控制io的线程数量,通常是cpu * 2,不过这个参数并不能真实反应在同一时刻写盘的线程数,因此我们探底的时候,也需要动态修改这个参数,从而获取磁盘真实的吞吐

2.3、探底工具

public class DiskMain2 {
    private static final long EXE_KEEP_TIME = 30 * 1000;

    public static void main(String[] args) throws Exception {
        new DiskMain2().begin(args);
    }

    private void begin(String[] args) throws Exception {
        AtomicLong totalLen = new AtomicLong();
        long begin = System.currentTimeMillis();
        int threadNum = args.length > 0 ? Integer.parseInt(args[0]) : 4;
        int msgK = args.length > 1 ? Integer.parseInt(args[1]) : 16;
        int size = msgK * 1024;
        List<Thread> threadList = new ArrayList<>();
        for (int j = 0; j < threadNum; j++) {
            Thread thread = new Thread(() -> {
                try {
                    File file = new File("/bitnami/kafka/" + UUID.randomUUID() + ".txt");
                    file.createNewFile();
                    FileChannel channel = FileChannel.open(Paths.get(file.getPath()), StandardOpenOption.WRITE);
                    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(size);
                    for (int i = 0; ; i++) {
                        byteBuffer.clear();
                        byteBuffer.position(size);
                        byteBuffer.flip();
                        channel.write(byteBuffer);
                        if (i % 100 == 0) {
                            long cost = System.currentTimeMillis() - begin;
                            if (cost > EXE_KEEP_TIME) {
                                break;
                            }
                        }
                    }
                    channel.force(false);
                    totalLen.addAndGet(file.length());
                    file.delete();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            thread.start();
            threadList.add(thread);
        }
        for (Thread thread : threadList) {
            thread.join();
        }
        long cost = (System.currentTimeMillis() - begin) / 1000;
        System.out.println((totalLen.get() / 1024 / 1024) + " MB");
        System.out.println(cost + " sec");
        System.out.println(totalLen.get() / cost / 1024 / 1024 + " MB/sec");
    }
}

简单描述下这个工具做的事儿:启动M(可配)个线程,每个线程单次往磁盘中写入N(可配)K的数据,整个过程持续30秒,然后统计所有写入文件的总大小,最后除以时间,从而计算磁盘吞吐

几个注意点:

  • 大块的磁盘写入,一定要使用FileChannel,与kafka中的log写入对齐
  • 尽量减少cpu的使用,将压力下放给磁盘,demo中使用的ByteBuffer,通过修改position的值模拟大块数据
  • 使用DirectByteBuffer,减少一次堆内存 --> 对外内存的拷贝

2C4G

[root@jmc-pod kafka]# java DiskMain 2 4 802 MB/sec

[root@jmc-pod kafka]# java DiskMain 2 8 1016 MB/sec

[root@jmc-pod kafka]# java DiskMain 2 16 1105 MB/sec

[root@jmc-pod kafka]# java DiskMain 2 32 942 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 4 1013 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 8 941 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 16 1062 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 32 1076 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 4 916 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 8 993 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 16 1035 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 32 982 MB/sec

4C8G

[root@jmc-pod kafka]# java DiskMain 2 16 1320 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 4 2172 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 8 2317 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 16 2580 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 32 2271 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 4 2150 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 8 2315 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 16 2498 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 32 2536 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 64 2434 MB/sec

8C16G

[root@jmc-pod kafka]# java DiskMain 4 16 2732 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 4 3440 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 8 3443 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 16 3531 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 32 3561 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 64 3562 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 4 3515 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 8 3573 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 16 3659 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 32 3673 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 64 3674 MB/sec

[root@jmc-pod kafka]# java DiskMain 12 16 3672 MB/sec

16C32G

[root@jmc-pod kafka]# java DiskMain 16 16 3918 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 8 3814 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 32 3885 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 64 3894 MB/sec

[root@jmc-pod kafka]# java DiskMain 24 8 4053 MB/sec

[root@jmc-pod kafka]# java DiskMain 24 16 4039 MB/sec

[root@jmc-pod kafka]# java DiskMain 24 32 4080 MB/sec

[root@jmc-pod kafka]# java DiskMain 24 64 4050 MB/sec

[root@jmc-pod kafka]# java DiskMain 32 16 4042 MB/sec

[root@jmc-pod kafka]# java DiskMain 32 32 4078 MB/sec

通过反复压测,得出如下结论:

规格

最大吞吐量

cpu

参数

2C4G

1105 MB/sec

200%

2线程,16K

4C8G

2580 MB/sec

400%

4线程,16K

8C16G

3674 MB/sec

790%

16线程,64K

16C32G

4080 MB/sec

950%

24线程,32K

在kafka 3副本的经典协议下,上述表格便是其吞吐量的天花板。其中16C32G在算力上虽然比8C16G强了1倍,但其落盘速度却基本持平,在大数据的压力下,瓶颈终究会落在磁盘,因此可以大胆预测,其性能不会比8C16G高出太多

分析上述数据可知:

  • 2C4G:磁盘的吞吐量约1G/s,远没有达到上限,此时的瓶颈在cpu
  • 4C8G:吞吐量虽上升了一倍不止,不过cpu飚满,瓶颈还在cpu
  • 8C16G:吞吐量约为3.5G/s,相比较4C8G并没有翻倍,后端的cpu几乎吃满,光看这组数据不好定位瓶颈
  • 16C32G:终于探到磁盘的底了,在cpu还有大量剩余的前提下,磁盘明显写不动了

磁盘的性能是真好,居然能压出 4 GB/s的速率,叹叹

三、Kafka吞吐量概述

一般描述某个kafka集群的吞吐量时,通常写为 3*n MB/s,例如 3*100 MB/s。 之所以习惯这样描述,是基于kafka自身的3副本协议,即1主2备的模式,leader收到业务流量n后,2个follower还需要从leader将数据同步过来,这样在集群角度看来,是一共处理了3*n流量

某个topic所拥有的副本数,理论上是不能大于整个集群的broker数量的,因为副本本质上是做高可用的,当topic的副本数大于整个集群的broker数量后,那势必某个broker存在2个及以上副本,这样也就丧失了高可用的初衷

3.1、集群横向扩容

所谓集群横向扩容,是指为集群添加同构broker,集群的broker数量初始为3,扩容后可能变为了6,这里的broker数量与topic的replica不是同一个概念,注意区分。

某个topic副本数过多,将带来集群内部大量的数据流转,而副本数过少,例如单副本,又存在一些高可用的风险,因此即便随着broker数量的增多,kafka最佳实践还是建议将topic的副本数设置为3,这样每增加3个broker,集群的能力将会得到横向的扩容

这里的横向扩容出来的能力跟broker数量是严格呈线性关系的,本文不会对横向扩容进行压测对比

3.2、集群纵向扩容

纵向扩容是指集群的broker数量不变,但是提升broker的配置。例如之前集群是3 * 2C4G的规格,进行纵向扩容后,集群将变为3 * 4C8G

broker的配置线性提升了,其提供的吞吐能力也会随着线性提升吗? 答案是否定的;如果磁盘用的是机械磁盘,我们可能很快能够断言瓶颈将卡在disk上,但SSD的高吞吐也是非常吃cpu的,内容比较复杂,内存、磁盘、cpu等都息息相关,这里没有很好的捷径,只能benchmark

纵向压测、对比也是本文的重心

四、发压准备

4.1、客户端准备

4.1.1、发压程序

发压程序使用官方的工具kafka-producer-perf-test.sh,这个工具实际调用的是kafka内核中的类:

org.apache.kafka.tools.ProducerPerformance

当然,单个Producer的pool、开辟内存、与server端的连接等都是有上限的,因此真正发压时,需要启动多个发压进程。发压脚本如下

bash kafka-producer-perf-test.sh \
--producer-props \
	bootstrap.servers=10.0.0.10:9094,10.0.0.11:9094,10.0.0.12:9094 \
	acks=1  \
  buffer.memory=134217728 \
--producer.config=admin5.conf \
--topic topic_6 \
--throughput=-1  \
--num-records 100000000 \
--record-size 1024000

admin5.conf 配置如下(因为开启了ACL认证,因此需要申明SASL配置)

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-a9asfbx5pl" password="a9asfbxmsn";

关于发压脚本的参数配置做一下说明

  • bootstrap.servers 连接集群的接入点
  • acks 响应方式,这个对性能影响非常大,这里使用默认的1。有3种配置
    • 1 : leader收到消息后便返回成功
    • 0 : 不需要等待任何副本确认
    • -1 : 生产者将等待所有的副本接收到消息并进行确认
  • buffer.memory 这里是设置了producer的128M的缓冲区,默认为32M
  • producer.config 因为目标集群开启了acl,这里需要存放一些相关配置
  • throughput 发送流量的一个上限值,-1表示不设置上限
  • num-records 发送消息的条数,因为要压测,所以这里放一个大值
  • record-size 每条消息的大小。这里配置的1M,因为需要压测集群的极限值,这里直接设置一个相对大的值
    • 注意,如果消息大小配置较小的话,可以通过调整batch.size及linger.ms参数来控制攒批

4.1.2、发压机器

因为kafka实例是被k8s孵化出来的,因此独立开辟了 5台ECS发压,其配置

10.252.128.183

8C 16G

10.252.128.185

8C 16G

10.252.128.136

4C 8G

10.252.128.140

4C 8G

10.252.128.176

4C 8G

因为发压程序不会占用大量cpu及内存,当开启多进程压测时,只要网络带宽不是瓶颈就OK

4.2、服务端准备

4.2.1、常规压测配置

集群新建出来后,有一些关键的配置还是需要留意设置一下的,否则性能会打很大的折扣

配置项

建议值

说明

num.network.threads

与cpu核数相近

broker端的网络线程,负责将数据从网卡搬运至broker堆内存中,这个过程涉及内存的拷贝,是一个典型的吃cpu的操作。设置太小,网卡搬运工作将会成为瓶颈,设置太大,又会造成频繁的线程切换,建议将其设置为cpu+1

num.io.threads

2*cpu + 1

broker端典型的IO线程,所有写log的操作都是该线程触发的

log.retention.bytes

-1

parition目录的最大阈值,当partition目录超过该值时就会触发删除老消息的操作。这个是kafka提供的原生配置,设置为-1,代表不对其做限制

log.flush.interval.messages

Long.MaxValue

接收到指定条数的消息后刷盘,这里建议配置为最大值,即刷盘的行为留给操作系统自己去控制。此值切勿设置的过小,否则将会导致磁盘频繁的sync,对性能影响很大

log.flush.interval.ms

null

达到指定时间后将内存中的消息刷盘。这个配置项与log.flush.interval.messages类似,也是刷盘的策略,这里同样建议不对其设置,交由操作系统来控制已获得最大的吞吐性能

message.max.bytes

10M

这个值限定了单条消息在broker端的上限,同样在producer也有一个参数来配置max.request.size,producer端的这个配置一定是要小于server端的

num.replica.fetchers

cpu+1

这个值没有特定大小,根据不同的场景来设定,如果想保证配置acks=-1的性能较高,那么需要提高此值,建议设置为cpu+1,否则就是默认的1即可

replica.fetch.max.bytes

10M

设置副本单次拉取的最大字节数,这个值需要大于单条消息的最大值,否则可能导致性能偏低

4.2.2、副本同步

前文说过,kafka选择不同的副本同步策略、同步副本数量,对性能影响很大;如果选择单副本的话,那么最大吞吐就是上文使用工具测出来的磁盘性能,而如果选择多副本的话,则整体吞吐的计算公式是是业务流量*副本数,后文我们将针对不同acksnum.replica.fetchers参数以及不同的副本数分别进行压测,最终得出一个多维参考值

4.2.3、服务端监控

服务端监控主要是查看集群整体流量、broker cpu、内存参数。我们通过top命令可以快速获取cpu、memory参数,而集群整体流量,为了快速获取,可通过JMX去拉取kafka原生监控项

public class JMXMain {
    public static void main(String[] args) throws Exception {
        new JMXMain().begin(args);
    }

    private void begin(String[] args) throws Exception {
        String metricName = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";
        List<MBeanServerConnection> connectionList = initConnectionList(args);

        while (true) {
            long total = 0L;
            long[] arr = new long[3];
            int index = 0;
            for (MBeanServerConnection connection : connectionList) {
                Optional<ObjectName> first = connection.queryNames(new ObjectName(metricName), null).stream().findFirst();
                if (first.isPresent()) {
                    Object oneMinuteRate = connection.getAttribute(first.get(), "OneMinuteRate");
                    long single = transToM(oneMinuteRate);
                    arr[index++] = single;
                    total += single;
                } else {
                    System.out.println("none");
                }
            }

            System.out.println(Arrays.toString(arr));
            System.out.println("rate is " + total + " MB/sec");
            
            Thread.sleep(10000);
        }
    }

    private List<MBeanServerConnection> initConnectionList(String[] args) throws Exception {
        // 10.0.0.21:9094,10.0.0.22:9094,10.0.0.23:9094
        List<String> connList = new ArrayList<>();
        if (args != null) {
            for (String ip : args) {
                String s = "service:jmx:rmi:///jndi/rmi://" + ip + ":5555/jmxrmi";
                connList.add(s);
            }
        }
        List<MBeanServerConnection> resultList = new ArrayList<>();
//        String[] arr = {"service:jmx:rmi:///jndi/rmi://10.0.0.19:5555/jmxrmi", "service:jmx:rmi:///jndi/rmi://10.0.0.20:5555/jmxrmi", "service:jmx:rmi:///jndi/rmi://10.0.0.21:5555/jmxrmi"};
        for (String jmxUrl : connList) {
            System.out.println(jmxUrl);
            JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl));
            MBeanServerConnection connection = connector.getMBeanServerConnection();
            resultList.add(connection);
        }
        return resultList;
    }
    
    public static long transToM(Object count) {
        if (count == null) {
            return 0;
        } else {
            double v = Double.parseDouble(count.toString()) / 1024 / 1024;
            return (long) v;
        }
    }
}

每隔10秒打印一下集群整体的流量及每个broker各自流量,例如:

[406, 407, 405]
rate is 1218 MB/sec
[409, 409, 407]
rate is 1225 MB/sec
[408, 408, 408]
rate is 1224 MB/sec
[406, 406, 405]
rate is 1217 MB/sec

注意:这里打印的流量,仅包含leader处理的业务流量,不包括follower从leader同步的备份流量,例如,我创建一个单partition,3副本的topic,然后向集群写入100MB/s的流量,因为设置了3副本,因此虽然只会向其中某个broker发送数据,但是另外2个broker中同时也均会有100MB/s的备份流量,但是使用上述工具则只会打印leader的流量: [100, 0, 0]

五、发压

5.1、2C4G

5.1.1、单partition/单副本

最小配置,首先测试一下单partition、单replica 的性能,从而与磁盘极限性能做个对比;这个值体现了kafka单broker的极限能力

创建topic:【1 partition、1 replica、acks=1】

发压命令

bash /root/kafka_2.12-2.8.2/bin/kafka-producer-perf-test.sh \ 
--producer-props \
	bootstrap.servers=10.0.0.10:9094,10.0.0.11:9094,10.0.0.12:9094 \
  acks=0 \
  max.request.size=2048000   \
--producer.config=/root/kafka_2.12-2.8.2/bin/admin5.conf \
--topic topic_1_1 \
--throughput=-1  --num-records 100000000 --record-size 1924000

当启动了8个producer客户端后,监控集群的吞吐峰值来到了 550MB/s;其实启动了4个客户端后,吞吐量就达到了530,后续通过增加客户端数量带来的收益越来越小,说明broker端能力已达上限

[0, 546, 0]
rate is 546 MB/sec
[0, 545, 0]
rate is 545 MB/sec
[0, 550, 0]
rate is 550 MB/sec

简单做个对比

规格

理论峰值

Kafka吞吐

2C4G

1105 MB/sec

550MB/s

在开始对磁盘用工具进行压测时候,2C4G的规格就因为cpu成为了短板,压测工具自身没有消耗cpu的逻辑,几乎全量的cpu都消耗在了刷盘的操作中

而kafka的构成要比磁盘工具复杂很多,涉及内存的数据拷贝、数据解析、正确性验证、刷盘等,而这些操作无疑会消耗大量cpu,cpu本身就是短板,因此压测出来的kafka吞吐量会比理论值低很多

因此当前2C4G的3节点的极限能力是 3 * 550MB/s

5.1.2、多partition/三副本/all acks

当选项acks设置为all时,代表只有当3个副本的消息都落盘后,才会response,这个设置也是严格的保证了数据的高可用,不会有任何数据的丢失,同时这种配置也是效率最低的,我们创建一个 6 partition,3副本的topic,同时将acks设置为all,再查看此时的性能,做一个对比

创建topic:【6 partition、3 replica、acks=all】

发压命令

bash /root/kafka_2.12-2.8.2/bin/kafka-producer-perf-test.sh \
--producer-props \
	bootstrap.servers=10.0.0.10:9094,10.0.0.11:9094,10.0.0.12:9094 \
  acks=-1 \
  max.request.size=2048000   \
--producer.config=/root/kafka_2.12-2.8.2/bin/admin5.conf \
--topic topic_6_3 \
--throughput=-1  --num-records 100000000 --record-size 1924000

启动了4个producer客户端后,监控集群的吞吐峰值来到了 3 * 320MB/s

[106, 107, 105]
rate is 318 MB/sec
[107, 107, 106]
rate is 320 MB/sec
[108, 108, 106]
rate is 322 MB/sec
[108, 107, 107]
rate is 322 MB/sec
[108, 107, 107]
rate is 322 MB/sec
[108, 106, 107]
rate is 321 MB/sec

4个客户端的延迟都已经很高,达到了400ms左右,说明数据都积攒到了broker端排队处理,4个客户端数据采样:

terminal-1:
230 records sent, 45.8 records/sec (84.07 MB/sec), 395.2 ms avg latency, 2228.0 ms max latency.
223 records sent, 44.4 records/sec (81.56 MB/sec), 437.7 ms avg latency, 1771.0 ms max latency.

terminal-4:
216 records sent, 43.1 records/sec (79.11 MB/sec), 429.5 ms avg latency, 1628.0 ms max latency.
225 records sent, 45.0 records/sec (82.54 MB/sec), 399.1 ms avg latency, 1279.0 ms max latency.

可见,acks参数的不同,对最终结果的影响甚大

因cpu核数只有2,因此调整num.replica.fetchers参数对最终的压测影响不大,后续等cpu核数增加后可以考虑增加此值

5.1.3、多partition/三副本/leader

然而我们实际生产中,通常既不会将topic的副本数设置为1,也不会将acks设置为all,那么这个时候的最大流量值,体现的便是集群能够处理业务流量的峰值,一旦这个值超过了550MB/s,那么follower一定出现不同程度的落后leader的现象,等流量回落后,follower再逐步进行追赶,因此这个值也是具有相当重要的参考价值

创建topic:【3 partition、3 replica、acks=1】

发压命令

bash /root/kafka_2.12-2.8.2/bin/kafka-producer-perf-test.sh --producer-props bootstrap.servers=10.0.0.10:9094,10.0.0.11:9094,10.0.0.12:9094 acks=1 max.request.size=2048000   --producer.config=/root/kafka_2.12-2.8.2/bin/admin5.conf --topic topic_3_3 --throughput=-1  --num-records 100000000 --record-size 1924000

启动了12个producer客户端后,监控集群的业务流量峰值来到约了 1GB/s

[347, 349, 343]
rate is 1039 MB/sec
[340, 345, 342]
rate is 1027 MB/sec
[339, 344, 341]
rate is 1024 MB/sec
[343, 347, 344]
rate is 1034 MB/sec

客户端的延迟都已经达到了400ms左右,说明瓶颈不在客户端侧,客户端数据采样:

terminal-1:
228 records sent, 45.6 records/sec (83.60 MB/sec), 412.7 ms avg latency, 1173.0 ms max latency.
268 records sent, 53.5 records/sec (98.11 MB/sec), 334.0 ms avg latency, 774.0 ms max latency.

terminal-3:
218 records sent, 43.4 records/sec (79.62 MB/sec), 394.7 ms avg latency, 1422.0 ms max latency.
252 records sent, 50.3 records/sec (92.35 MB/sec), 377.1 ms avg latency, 1226.0 ms max latency.

注意:我这里并没有使用 3 * 1GB/s 的描述,是因为虽然leader确实已经接受了1GB/s的流量,但是其并没有在同一时刻事实同步给follower,事实上,随着时间的推移,follower已经落后的越来越多

5.1.4、整理总结

能力

描述

流量

单broker磁盘能力

当前cpu+磁盘所具备的极限写入能力,作为判断kafka能力的参考

1105 MB/s

严格写入能力

严格高可用地写入数据,即acks=all的方式写入数据,这种模式下,3个broker中的数据齐头并进

3 * 320 MB/s

常规集群能力

用的最多的方式, acks=1,即写leader的模式,也是讨论最多的模式

3 * 550 MB/s

可应对峰值能力

这种模式下,leader的流量将远大于follower的,

1000 MB/s

5.2、4C8G

相关认证配置 admin7.conf

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-acegzesojh" password="acegzesfmj";

5.2.1、单partition/单副本

创建topic:【1 partition、1 replica、acks=1】

当启动了8个producer端时,集群的流量来到580 MB/s左右,这个值与2C4G的基本持平,难道它们两个的性能相当吗?其实不尽然,因为单partition、单副本的case,注定broker将会是同时写入1个文件,此时的瓶颈将落在IO上,因此,单纯的加cpu是不会提升吞吐的

[0, 584, 0]
rate is 584 MB/sec
[0, 589, 0]
rate is 589 MB/sec
[0, 586, 0]
rate is 586 MB/sec

看一下cpu的使用率

cpu基本维持在180%上下,而4C的上限是400%

5.2.2、multi 【单partition/单副本】

创建topic:4 * 【1 partition、1 replica、acks=1】

既然1个topic无法探知broker的上限,那我们就创建多个【单partition/单副本】的topic,使其落在同一个broker上,然后再向这个broker发压即可。(也可以通过手动指定将某个topic的分区都分布在1台broker上实现)

查看topic_1_1的ISR分布情况:

bash /root/kafka_2.12-2.8.2/bin/kafka-topics.sh \
--bootstrap-server 10.0.0.5:9094,10.0.0.9:9094,10.0.0.18:9094 \
--command-config  /root/kafka_2.12-2.8.2/bin/admin7.conf  \
--describe --topic topic_1_1

返回结果

Topic: topic_1_1	Partition: 0	Leader: 1000	Replicas: 1000	Isr: 1000

通过这种方式,选取4个topic:topic_1_1、topic_i_1_1、topic_j_1_1、topic_n_1_1,然后每个topic启动4个producer进行发压,也就是一共开启了16个client端发压

首先看一下broker端的流量统计指标,单broker的流量来到了 1GB/s

[0, 1009, 0]
rate is 1009 MB/sec
[0, 1016, 0]
rate is 1016 MB/sec
[0, 1021, 0]
rate is 1021 MB/sec
[0, 1016, 0]
rate is 1016 MB/sec
[0, 1015, 0]
rate is 1015 MB/sec

再观察一下cpu利用率,维持在400%,已经打满

客户端的监控日志采样。延迟也高达500ms,说明压力已经完全给到了broker

196 records sent, 38.8 records/sec (71.24 MB/sec), 457.4 ms avg latency, 560.0 ms max latency.
167 records sent, 33.4 records/sec (61.21 MB/sec), 536.9 ms avg latency, 649.0 ms max latency.
147 records sent, 29.1 records/sec (53.35 MB/sec), 623.0 ms avg latency, 782.0 ms max latency.
184 records sent, 36.6 records/sec (67.19 MB/sec), 489.4 ms avg latency, 588.0 ms max latency.
189 records sent, 37.8 records/sec (69.33 MB/sec), 478.6 ms avg latency, 563.0 ms max latency.

至此,探知当前配置的单broker的处理上限为 1 GB/s,因此集群的最大吞吐为 3 * 1 GB/s

5.2.3、多partition/三副本/all acks

创建topic:【6 partition、3 replica、acks=-1】

发压命令

bash /root/kafka_2.12-2.8.2/bin/kafka-producer-perf-test.sh \
--producer-props \
	bootstrap.servers=10.0.0.27:9094,10.0.0.28:9094,10.0.0.29:9094 \
  acks=-1 max.request.size=2048000 \
--producer.config=/root/kafka_2.12-2.8.2/bin/admin77.conf  \
--topic topic_a_6_3 --throughput=-1  \
--num-records 100000000 --record-size 1924000

吞吐停留在330 MB/s,怎么跟2C4G的相差不大呢?

[112, 111, 111]
rate is 334 MB/sec
[112, 111, 111]
rate is 334 MB/sec
[112, 111, 112]
rate is 335 MB/sec

这里别忘了一个参数num.replica.fetchers,这个参数默认为1,调大这个参数将加快follower从leader拉取数据的速率;我们首先看下当前这个参数的设置:

sh kafka-configs.sh \
--bootstrap-server 10.0.0.27:9094,10.0.0.28:9094,10.0.0.29:9094 \
--command-config  admin77.conf --all \
--entity-type brokers --describe  |  grep "num.replica.fetchers"

最终返回

num.replica.fetchers=1 sensitive=false synonyms={DEFAULT_CONFIG:num.replica.fetchers=1}
num.replica.fetchers=1 sensitive=false synonyms={DEFAULT_CONFIG:num.replica.fetchers=1}
num.replica.fetchers=1 sensitive=false synonyms={DEFAULT_CONFIG:num.replica.fetchers=1}

这个参数是可以调用命令进行直接修改的,我们将其修改为cpu核数

sh kafka-configs.sh \
--bootstrap-server 10.0.0.27:9094,10.0.0.28:9094,10.0.0.29:9094 \
--command-config  admin77.conf \
--alter --entity-type brokers --entity-default \
--add-config 'num.replica.fetchers=4' 

最终broker的性能提升至了550 MB/s

[181, 182, 182]
rate is 545 MB/sec
[183, 184, 183]
rate is 550 MB/sec
[183, 184, 183]
rate is 550 MB/sec

cpu利用率也相当低,大量的耗时都停留在三副本sync上

5.2.4、多partition/三副本/leader

将参数“num.replica.fetchers”调整为默认值后,同时将acks设置为1,再次发压

bash /root/kafka_2.12-2.8.2/bin/kafka-producer-perf-test.sh \
--producer-props \
	bootstrap.servers=10.0.0.27:9094,10.0.0.28:9094,10.0.0.29:9094 \
  acks=1 max.request.size=2048000 \
--producer.config=/root/kafka_2.12-2.8.2/bin/admin77.conf  \
--topic topic_a_6_3 --throughput=-1  \
--num-records 100000000 --record-size 1924000

共启动了16个客户端,流量来到了2200 MB/s

[734, 737, 733]
rate is 2204 MB/sec
[734, 736, 744]
rate is 2214 MB/sec
[737, 745, 741]
rate is 2223 MB/sec

同时cpu被打满

部分发压程序日志采样。随着producer的增多,吞吐量维持在恒定值

432 records sent, 86.2 records/sec (158.12 MB/sec), 198.7 ms avg latency, 1601.0 ms max latency.
378 records sent, 75.4 records/sec (138.33 MB/sec), 238.2 ms avg latency, 1297.0 ms max latency.
413 records sent, 82.4 records/sec (151.17 MB/sec), 222.9 ms avg latency, 1315.0 ms max latency.
353 records sent, 70.5 records/sec (129.39 MB/sec), 268.1 ms avg latency, 1442.0 ms max latency.

5.2.5、整理总结

能力

描述

流量

单broker磁盘能力

当前cpu+磁盘所具备的极限写入能力,作为判断kafka能力的参考

2580 MB/s

严格写入能力

严格高可用地写入数据,即acks=all的方式写入数据,这种模式下,3个broker中的数据齐头并进

3 * 550 MB/s

常规集群能力

用的最多的方式, acks=1,即写leader的模式,也是讨论最多的模式

3 * 1000 MB/s

可应对峰值能力

这种模式下,leader的流量将远大于follower的,会产生了流量倾斜

2250 MB/s

5.3、8C16G

用到的配置信息 admin6.conf

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-acntgcoin9" password="acntgcozqb";

5.3.1、multi 【单partition/单副本】

创建topic:【12 partition、1 replica、acks=1】

通过命令创建topic,将12个分区全部都放在第一个broker上

bash /root/kafka_2.12-2.8.2/bin/kafka-topics.sh \
--bootstrap-server 10.0.0.5:9094,10.0.0.6:9094,10.0.0.7:9094 \
--command-config  /root/kafka_2.12-2.8.2/bin/admin6.conf  \
--create --topic topic_1_1 \
--replica-assignment 1000,1000,1000,1000,1000,1000,1000,1000,1000,1000,1000,1000

启动28个producer端发压客户端后,broker流量趋于稳定

[1964, 0, 0]
rate is 1964 MB/sec
[1959, 0, 0]
rate is 1959 MB/sec
[1962, 0, 0]
rate is 1962 MB/sec

查看对应pod的cpu使用率,已经打满

最终得出结论,单台broker的吞吐上限为 1.9 GB/s

5.3.2、多partition/三副本/all acks

创建topic:【12 partition、3 replica、acks=-1】

当前规格配置较高,需要创建更多的partition以压榨更多的cpu资源

查看参数num.replica.fetchers

sh kafka-configs.sh \
--bootstrap-server 10.0.0.5:9094,10.0.0.6:9094,10.0.0.7:9094 \
--command-config  admin6.conf --all \
--entity-type brokers --describe  |  grep "num.replica.fetchers"

返回

  num.replica.fetchers=1 sensitive=false synonyms={DEFAULT_CONFIG:num.replica.fetchers=1}
  num.replica.fetchers=1 sensitive=false synonyms={DEFAULT_CONFIG:num.replica.fetchers=1}
  num.replica.fetchers=1 sensitive=false synonyms={DEFAULT_CONFIG:num.replica.fetchers=1}

用同样的方法查看参数replica.fetch.max.bytes,返回

replica.fetch.max.bytes=30240000 sensitive=false synonyms={STATIC_BROKER_CONFIG:replica.fetch.max.bytes=30240000, DEFAULT_CONFIG:replica.fetch.max.bytes=1048576}
replica.fetch.max.bytes=30240000 sensitive=false synonyms={STATIC_BROKER_CONFIG:replica.fetch.max.bytes=30240000, DEFAULT_CONFIG:replica.fetch.max.bytes=1048576}
replica.fetch.max.bytes=30240000 sensitive=false synonyms={STATIC_BROKER_CONFIG:replica.fetch.max.bytes=30240000, DEFAULT_CONFIG:replica.fetch.max.bytes=1048576}

将参数num.replica.fetchers修改为cpu核数

sh kafka-configs.sh \
--bootstrap-server 10.0.0.5:9094,10.0.0.6:9094,10.0.0.7:9094 \
--command-config  admin6.conf \
--alter --entity-type brokers --entity-default \
--add-config 'num.replica.fetchers=8' 

发压

bash /root/kafka_2.12-2.8.2/bin/kafka-producer-perf-test.sh \
--producer-props \
	bootstrap.servers=10.0.0.5:9094,10.0.0.6:9094,10.0.0.7:9094 \
	acks=-1  \
max.request.size=2048000   \
--producer.config=/root/kafka_2.12-2.8.2/bin/admin6.conf \
--topic topic_a_12_3 --throughput=-1  --num-records 100000000 --record-size 1924000

一共启动了24个producer压测,强劲的cpu发挥了作用,写入速率达到了950 M/s

[315, 315, 315]
rate is 945 MB/sec
[315, 317, 318]
rate is 950 MB/sec
[313, 317, 317]
rate is 947 MB/sec

5.3.3、多partition/三副本/leader

创建topic:【12 partition、3 replica、acks=1】

按照常规,我们压一下三副本写leader的case;此时num.replica.fetchers同样设置为8

启动28个压测客户端后,流量趋于稳定

1034, 1032, 1028]
rate is 3094 MB/sec
[1034, 1038, 1034]
rate is 3106 MB/sec
[1036, 1039, 1040]
rate is 3115 MB/sec

客户端延迟1s+

426 records sent, 85.1 records/sec (156.14 MB/sec), 204.5 ms avg latency, 1791.0 ms max latency.

broker cpu使用率接近饱和

至此,可以得出结论,陡增业务流量可承接 3.1 GB/s

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1288735.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Excel——多列合并成一列的4种方法

Excel怎么将多列内容合并成一列&#xff1f; 怎么将多个单元格的内容连接起来放在一个单元格里&#xff1f; 比如下图&#xff0c;要将B、C、D列的内容&#xff0c;合并成E列那样&#xff0c;该怎么做呢&#xff1f; △图1 本文中&#xff0c;高潜老师将给大家介绍 4种 将多…

制作一个RISC-V的操作系统五-RISC-V汇编语言编程二

文章目录 RISC-V汇编指令操作对象RISC-V汇编指令编码格式小端序的概念RISC-V汇编指令分类RISC-V汇编伪指令 RISC-V汇编指令操作对象 RV32I&#xff1a;RISC-V32位机器整数指令集 指令集分非特权指令集和特权指令集 XLEN&#xff1a;变量代表当前机器的字长&#xff08;32位 64…

「Verilog学习笔记」根据状态转移写状态机-三段式

专栏前言 本专栏的内容主要是记录本人学习Verilog过程中的一些知识点&#xff0c;刷题网站用的是牛客网 状态机可以分为Moore状态机和Mealy状态机。 Moore状态机&#xff1a;输出只由当前状态决定Mealy状态机&#xff1a;输出由当前状态和当前的输入共同决定。 三段式状态机是指…

项目实战之RabbitMQ重试机制进行消息补偿通知

&#x1f9d1;‍&#x1f4bb;作者名称&#xff1a;DaenCode &#x1f3a4;作者简介&#xff1a;啥技术都喜欢捣鼓捣鼓&#xff0c;喜欢分享技术、经验、生活。 &#x1f60e;人生感悟&#xff1a;尝尽人生百味&#xff0c;方知世间冷暖。 文章目录 &#x1f31f;架构图&#x…

软件设计之装饰模式

装饰模式把每个要装饰的功能放在单独的类中&#xff0c;并让这个类包装它所要装饰的对象&#xff0c;因此&#xff0c;当需要执行特殊行为时&#xff0c;客户代码就可以在运行时根据需要有选择地、按顺序地使用装饰功能包装对象。 案例&#xff1a;穿搭。衣柜有帽子、眼镜、…

Zabbix自动发现机制

Zabbix的自动发现机制 Zabbix客户端主动的和服务端联系&#xff0c;将自己的地址和端口发送服务端&#xff0c;实现自动添加监控主机&#xff0c;客户端是主动的一方缺点自定义网段中主机数量太多&#xff0c;等级耗时会很久&#xff0c;而且这个自动发现机制不是很稳定 Zabb…

二叉树的前序中序后序遍历

二叉树的前序中序后序遍历-含递归和迭代代码 前序(中左右)中序(左中右)后序(左右中) 前序(中左右) 对于二叉树中的任意一个节点&#xff0c;先打印该节点&#xff0c;然后是它的左子树&#xff0c;最后右子树 A-B-D-E-C-F //递归 const preorderTraversal (root) > {const…

【荣誉】科东软件荣获广州市软件行业协会双料大奖!

软件产业在数字经济中扮演着基础支撑的角色&#xff0c;对于优化产业结构、提高自主可控、赋能整体经济高质量发展具有关键作用。 近日&#xff0c;广州市软件行业第七届会员大会第三次会议成功召开&#xff01;此次会议旨在回顾过去一年的行业发展&#xff0c;展望未来的趋势和…

Linux文件结构与文件权限

基于centos了解Linux文件结构 了解一下文件类型 Linux采用的一切皆文件的思想&#xff0c;将硬件设备、软件等所有数据信息都以文件的形式呈现在用户面前&#xff0c;这就使得我们对计算机的管理更加方便。所以本篇文章会对Linux操作系统的文件结构和文件权限进行讲解。 首先…

新项目决定用 JDK 17了

大家好&#xff0c;我是风筝。 最近在调研 JDK 17&#xff0c;并且试着将之前的一个小项目升级了一下&#xff0c;在测试环境跑了一段时间。最终&#xff0c;决定了&#xff0c;新项目要采用 JDK 17 了。 JDK 1.8&#xff1a;“不是说好了&#xff0c;他发任他发&#xff0c;…

uni-app 设置tabBar的setTabBarBadge购物车/消息等角标

目录 一、效果二、代码实现二、全部代码1.index.vue2.cart.vue 三、真实案例参考最后 一、效果 二、代码实现 只要使用uni.setTabBarBadge和uni.removeTabBarBadge来进行对红点的设置和移除。 主要代码&#xff1a; //设置红点 uni.setTabBarBadge({index: 1, // 底部菜单栏…

urllib 的 get 请求和 post 请求(二)

目录 一、爬取网页、图片视频 二、请求对象的定制 三、get请求的urlencode方法 四、post 请求英文翻译 一、爬取网页、图片视频 目标&#xff1a;下载数据 知识点&#xff1a;urllib.request.urlretrieve()下载 使用urllib下载网页、图片和视频 下载网页&#xff1a; #…

【动态规划】LeetCode2111:使数组 K 递增的最少操作次数

作者推荐 [二分查找]LeetCode2040:两个有序数组的第 K 小乘积 本文涉及的基础知识点 二分查找算法合集 分组 动态规划 题目 给你一个下标从 0 开始包含 n 个正整数的数组 arr &#xff0c;和一个正整数 k 。 如果对于每个满足 k < i < n-1 的下标 i &#xff0c;都有…

项目进度已经落后了,项目经理该怎么办?

进度管理是项目管理的核心工作之一&#xff0c;通过可续的进度计划与控制管理&#xff0c;最终实现项目按照目标交付。 进度管理的两大核心工作&#xff1a;计划制定、过程管控。 项目管理过程中难免会遇到工作进度和计划不一致的情况&#xff0c;有效管理项目进度&#xff…

数字工厂时代,如何实现3D数据访问与发布、WEB大模型可视化?

Tech Soft 3D的HOOPS 3D CAD SDK为现代工厂工作流程奠定了基础&#xff0c;通过最快、最准确的CAD数据访问和动态3D可视化支持数字孪生、机器人仿真、设计、流程和规划、IIoT和操作辅助应用程序。 本文将和您详细探讨。如何利用HOOPS技术来增强您的应用程序。 HOOPS_HOOPS试…

INFINI Console 与华为鲲鹏完成产品兼容互认证

何为华为鲲鹏认证 华为鲲鹏认证是华为云围绕鲲鹏云服务&#xff08;含公有云、私有云、混合云、桌面云&#xff09;推出的一项合作伙伴计划&#xff0c;旨在为构建持续发展、合作共赢的鲲鹏生态圈&#xff0c;通过整合华为的技术、品牌资源&#xff0c;与合作伙伴共享商机和利…

【Verilog】 FPGA程序设计---Verilog基础知识

目录 Verilog 和 VHDL 区别 Verilog 和 C 的区别 Verilog 基础知识 1 Verilog 的逻辑值 2 Verilog 的标识符 3 Verilog 的数字进制格式 4 Verilog 的数据类型 1) 寄存器类型 2) 线网类型 3) 参数类型 5 Verilog 的运算符 1) 算术运算符 2) 关系运算…

GOLAND搭建GIN框架以及基础框架搭建

创建GO环境文件夹 终端输入安装GIN go get -u github.com/gin-gonic/gin如果遇到超时错误 package golang.org/x/net/html: unrecognized import path "golang.org/x/net/html": https fetch: Get "https://golang.org/x/net/html?go-get1": dial tcp …

设计模式之GoF23介绍

深入探讨设计模式&#xff1a;构建可维护、可扩展的软件架构 一、设计模式的背景1.1 什么是设计模式1.2 设计模式的历史 二、设计模式的分类2.1 创建型模式2.2 结构型模式2.3 行为型模式 三、七大设计原则四、设计模式关系结论 :rocket: :rocket: :rocket: 在软件开发领域&…

关于大模型在文本分类上的尝试

文章目录 前言所做的尝试总结前言 总共25个类别,在BERT上的效果是48%,数据存在不平衡的情况,训练数据分布如下: 训练数据不多,4000左右 所做的尝试 1、基于 Qwen-14b-base 做Lora SFT,Loss忘记记录 准确率在68%左右 Lora配置 class LoraArguments:lora_r: int = 64…