自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例

news2024/11/25 23:24:17

Kafka:分布式消息系统的核心原理与安装部署-CSDN博客

自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客

Kafka 生产者全面解析:从基础原理到高级实践-CSDN博客

Kafka 生产者优化与数据处理经验-CSDN博客

Kafka 工作流程解析:从 Broker 工作原理、节点的服役、退役、副本的生成到数据存储与读写优化-CSDN博客

Kafka 消费者全面解析:原理、消费者 API 与Offset 位移-CSDN博客

Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍-CSDN博客

Kafka 数据倾斜:原因、影响与解决方案-CSDN博客

Kafka 核心要点解析_kafka mirrok-CSDN博客

Kafka 核心问题深度解析:全面理解分布式消息队列的关键要点_kafka队列日志-CSDN博客

目录

一、脚本功能概述

二、生产者性能测试

三、消费者性能测试

四、查看可用主题列表

五、脚本使用示例

六、脚本源码

七、总结


        在大数据处理的领域中,Kafka 扮演着极为重要的角色,它作为一个分布式流处理平台,能够高效地处理大规模的实时数据。而 Kafka 提供了一系列的脚本工具,帮助我们更好地管理和测试 Kafka 集群。今天,我们就来深入探讨一个自定义的 Kafka 脚本 kf-use.sh,了解其功能与使用场景。

一、脚本功能概述

        kf-use.sh 脚本主要提供了两个核心功能:生产者性能测试和消费者性能测试,同时还具备查看可用主题列表以及退出脚本等功能。通过这些功能,我们可以对 Kafka 集群的生产和消费能力进行评估,以便优化集群配置和数据处理流程。

二、生产者性能测试

  1. 参数输入
    • 当选择生产者性能测试功能时,脚本首先会提示用户输入主题名称。如果输入的主题不存在,脚本会要求用户重新输入,确保测试的主题是有效的。
    • 接着,用户需要输入要生产的记录数量,并且脚本会验证输入是否为整数,如果不是则提示重新输入。
    • 最后,用户要输入每条记录的大小(单位为字节),同样会进行整数验证。
  2. 性能测试执行
    • 一旦用户输入了正确的参数,脚本会调用 kafka-producer-perf-test.sh 工具,并传入相应的参数,如主题名称、记录数量、记录大小等,同时设置吞吐量为 -1(表示不限制吞吐量),并指定 Kafka 集群的地址 bigdata01:9092。这样就可以开始对生产者的性能进行测试,测试结果将反映出在给定条件下生产者向指定主题发送数据的效率。

三、消费者性能测试

  1. 主题选择
    • 在消费者性能测试功能中,首先会调用 kafka-topics.sh 工具列出当前可用的主题列表,然后提示用户输入要测试的主题名称。如果输入的主题不存在,脚本会要求重新输入。
  2. 性能测试执行
    • 当用户选择了存在的主题后,脚本会调用 kafka-consumer-perf-test.sh 工具,传入 Kafka 集群地址 bigdata01:9092、主题名称以及要消费的消息数量(这里固定为 100000 条),从而对消费者从指定主题消费数据的性能进行测试,测试结果可以帮助我们了解消费者处理数据的速度和效率。

四、查看可用主题列表

        无论是在生产者还是消费者性能测试功能中,都提供了查看可用主题列表的选项。通过调用 kafka-topics.sh 工具并传入集群地址 bigdata01:9092,可以获取当前 Kafka 集群中所有的主题名称,方便用户了解集群中的数据主题情况,以便做出正确的测试选择。

五、脚本使用示例

假设我们要对一个名为 test_topic 的主题进行生产者性能测试,我们可以按照以下步骤操作:

  1. 运行 kf-use.sh 脚本。
  2. 选择生产者性能测试功能(可能是对应的数字选项,如 1)。
  3. 输入主题名称 test_topic
  4. 输入要生产的记录数量,例如 10000。
  5. 输入每条记录的大小,比如 100 字节。
  6. 脚本会自动执行生产者性能测试,并输出测试结果,包括发送的总字节数、每秒发送的记录数、每秒发送的字节数等信息。

同样,如果要进行消费者性能测试,例如对 test_topic 主题:

  1. 运行 kf-use.sh 脚本并选择消费者性能测试功能(可能是数字 7 后再选择 1)。
  2. 查看可用主题列表,确认 test_topic 存在后输入该主题名称。
  3. 脚本会执行消费者性能测试,并给出消费 100000 条消息的相关性能数据,如每秒消费的消息数等。

六、脚本源码

#!/bin/bash

while true; do
    # 命令大全系统界面
    echo "Kafka 命令大全系统:"
    echo "1. 主题操作(topics)"
    echo "2. 生产者操作(producer)"
    echo "3. 消费者操作(consumer)"
    echo "4. 配置操作(configs)"
    echo "5. 消费者组操作(consumer groups)"
    echo "6. 生产者性能测试(producer perf test)"
    echo "7. 消费者性能测试(consumer perf test)"
    echo "0. 退出"

    read -p "请输入功能选项数字:" choice

    case $choice in
    1)
        # 主题操作菜单
        while true; do
            echo "主题操作功能:"
            echo "1. 查看所有主题"
            echo "2. 创建主题"
            echo "3. 查看某主题详细信息"
            echo "4. 修改某主题分区数"
            echo "5. 删除主题"
            echo "0.返回命令大全系统界面"
            read -p "请输入主题操作选项数字:" topic_choice

            case $topic_choice in
            1)
                kafka-topics.sh --bootstrap-server bigdata01:9092 --list
                ;;
            2)
                # 创建主题
                read -p "请输入要创建的主题名称:" topic_name
                while true; do
                    read -p "请输入分区数(整数):" partitions
                    if [[ $partitions =~ ^[0-9]+$ ]]; then
                        break
                    else
                        echo "分区数必须是整数,请重新输入。"
                    fi
                done
                while true; do
                    read -p "请输入副本数(整数,且不超过可用 broker 数量):" replication_factor
                    if [[ $replication_factor =~ ^[0-9]+$ ]]; then
                        # 这里可添加检查副本数不超过可用 broker 数量的逻辑,暂时省略
                        break
                    else
                        echo "副本数必须是整数,请重新输入。"
                    fi
                done
                kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions $partitions --replication-factor $replication_factor --topic $topic_name
                ;;
            3)
                # 查看某主题详细信息
                kafka-topics.sh --bootstrap-server bigdata01:9092 --list
                echo "当前可用主题列表如下:"
                read -p "请输入要查看详细信息的主题名称:" topic_to_describe
                kafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic $topic_to_describe
                ;;
            4)
                # 修改某主题分区数
                kafka-topics.sh --bootstrap-server bigdata01:9092 --list
                echo "当前可用主题列表如下:"
                read -p "请输入要修改分区数的主题名称:" topic_to_alter
                while true; do
                    read -p "请输入新的分区数(整数且大于当前分区数):" new_partitions
                    if [[ $new_partitions =~ ^[0-9]+$ ]]; then
                        # 这里可添加检查新分区数是否大于当前分区数的逻辑,暂时省略
                        break
                    else
                        echo "新分区数必须是整数,请重新输入。"
                    fi
                done
                kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic $topic_to_alter --partitions $new_partitions
                ;;
            5)
                # 删除主题
                kafka-topics.sh --bootstrap-server bigdata01:9092 --list
                echo "当前可用主题列表如下:"
                read -p "请输入要删除的主题名称:" topic_to_delete
                kafka-topics.sh --bootstrap-server bigdata01:9092 --delete --topic $topic_to_delete
                ;;
            0)
                break
                ;;
            *)
                echo "无效的主题操作选择。"
                ;;
            esac
        done
        ;;
    2)
        # 生产者操作菜单
        while true; do
            echo "生产者操作功能:"
            echo "1. 发送消息到指定主题"
            echo "0.返回命令大全系统界面"
            read -p "请输入生产者操作选项数字:" producer_choice

            case $producer_choice in
            1)
                kafka-topics.sh --bootstrap-server bigdata01:9092 --list
                echo "当前可用主题列表如下:"
                read -p "请输入要发送消息的主题名称:" topic_name
                echo "开始发送消息到主题 $topic_name。输入'EXIT'退出发送。"
                while true; do
                    read -p "请输入消息内容:" message
                    if [ "$message" = "EXIT" ]; then
                        break
                    fi
                    kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic $topic_name <<< "$message"
                done
                ;;
            0)
                break
                ;;
            *)
                echo "无效的生产者操作选择。"
                ;;
            esac
        done
        ;;
    3)
        # 消费者操作菜单
        while true; do
            echo "消费者操作功能:"
            echo "1. 消费指定主题的消息"
            echo "2. 从主题开头消费所有消息"
            echo "0.返回命令大全系统界面"
            read -p "请输入消费者操作选项数字:" consumer_choice

            case $consumer_choice in
            1)
                kafka-topics.sh --bootstrap-server bigdata01:9092 --list
                echo "当前可用主题列表如下:"
                read -p "请输入要消费消息的主题名称:" topic_name
                kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic $topic_name
                ;;
            2)
                kafka-topics.sh --bootstrap-server bigdata01:9092 --list
                echo "当前可用主题列表如下:"
                read -p "请输入要从开头消费消息的主题名称:" topic_name
                kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --from-beginning --topic $topic_name
                ;;
            0)
                break
                ;;
            *)
                echo "无效的消费者操作选择。"
                ;;
            esac
        done
        ;;
    4)
        # 配置操作菜单
        while true; do
            echo "配置操作功能:"
            echo "1. 查看主题配置"
            echo "2. 修改主题配置"
            echo "0.返回命令大全系统界面"
            read -p "请输入配置操作选项数字:" config_choice

            case $config_choice in
            1)
                kafka-topics.sh --bootstrap-server bigdata01:9092 --list
                echo "当前可用主题列表如下:"
                read -p "请输入要查看配置的主题名称:" topic_name
                kafka-configs.sh --bootstrap-server bigdata01:9092 --describe --entity-type topics --entity-name $topic_name
                ;;
            2)
                kafka-topics.sh --bootstrap-server bigdata01:9092 --list
                echo "当前可用主题列表如下:"
                read -p "请输入要修改配置的主题名称:" topic_name
                read -p "请输入配置项名称:" config_name
                read -p "请输入配置项值:" config_value
                kafka-configs.sh --bootstrap-server bigdata01:9092 --alter --entity-type topics --entity-name $topic_name --add-config $config_name=$config_value
                ;;
            0)
                break
                ;;
            *)
                echo "无效的配置操作选择。"
                ;;
            esac
        done
        ;;
    5)
        # 消费者组操作菜单
        while true; do
            echo "消费者组操作功能:"
            echo "1. 查看消费者组列表"
            echo "2. 查看消费者组详情"
            echo "3. 重置消费者组偏移量"
            echo "0.返回命令大全系统界面"
            read -p "请输入消费者组操作选项数字:" consumer_group_choice

            case $consumer_group_choice in
            1)
                kafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --list
                ;;
            2)
                read -p "请输入要查看详情的消费者组名称:" group_name
                kafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --describe --group $group_name
                ;;
            3)
                read -p "请输入要重置偏移量的消费者组名称:" group_name
                read -p "请输入要重置偏移量的主题名称:" topic_name
                kafka-topics.sh --bootstrap-server bigdata01:9092 --list
                echo "当前可用主题列表如下:"
                kafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --reset-offsets --group $group_name --topic $topic_name --to-earliest
                ;;
            0)
                break
                ;;
            *)
                echo "无效的消费者组操作选择。"
                ;;
            esac
        done
        ;;
    6)
        # 生产者性能测试菜单
        while true; do
            echo "生产者性能测试功能:"
            echo "1. 执行生产者性能测试"
            echo "2. 查看可用主题列表"
            echo "0.返回命令大全系统界面"
            read -p "请输入生产者性能测试选项数字:" producer_perf_choice

            case $producer_perf_choice in
            1)
                kafka-topics.sh --bootstrap-server bigdata01:9092 --list
                echo "当前可用主题列表如下:"
                read -p "请输入要测试的主题名称:" topic_name
                existing_topics=$(kafka-topics.sh --bootstrap-server bigdata01:9092 --list)
                if echo "$existing_topics" | grep -q "$topic_name"; then
                    while true; do
                        read -p "请输入要发送的记录数量(整数):" num_records
                        if [[ $num_records =~ ^[0-9]+$ ]]; then
                            break
                        else
                            echo "记录数量必须是整数,请重新输入。"
                        fi
                    done
                    while true; do
                        read -p "请输入每条记录的大小(整数,单位字节):" record_size
                        if [[ $record_size =~ ^[0-9]+$ ]]; then
                            break
                        else
                            echo "记录大小必须是整数,请重新输入。"
                        fi
                    done
                    kafka-producer-perf-test.sh --topic $topic_name --num-records $num_records --record-size $record_size --throughput -1 --producer-props bootstrap.servers=bigdata01:9092
                else
                    echo "主题 $topic_name 不存在,请重新输入。"
                fi
                ;;
            2)
                kafka-topics.sh --bootstrap-server bigdata01:9092 --list
                ;;
            0)
                break
                ;;
            *)
                echo "无效的生产者性能测试选择。"
                ;;
            esac
        done
        ;;
    7)
        # 消费者性能测试菜单
        while true; do
            echo "消费者性能测试功能:"
            echo "1. 执行消费者性能测试"
            echo "2. 查看可用主题列表"
            echo "0.返回命令大全系统界面"
            read -p "请输入消费者性能测试选项数字:" consumer_perf_choice

            case $consumer_perf_choice in
            1)
                kafka-topics.sh --bootstrap-server bigdata01:9092 --list
                echo "当前可用主题列表如下:"
                read -p "请输入要测试的主题名称:" topic_name
                existing_topics=$(kafka-topics.sh --bootstrap-server bigdata01:9092 --list)
                if echo "$existing_topics" | grep -q "$topic_name"; then
                    kafka-consumer-perf-test.sh --broker-list bigdata01:9092 --topic $topic_name --messages 100000
                else
                    echo "主题 $topic_name 不存在,请重新输入。"
                fi
                ;;
            2)
                kafka-topics.sh --bootstrap-server bigdata01:9092 --list
                ;;
            0)
                break
                ;;
            *)
                echo "无效的消费者性能测试选择。"
                ;;
            esac
        done
        ;;
    0)
        echo "退出脚本。"
        break
        ;;
    *)
        echo "无效的选择。"
        ;;
    esac
done

七、总结

        kf-use.sh 脚本为我们提供了一个便捷的方式来测试 Kafka 集群的生产者和消费者性能,同时方便地查看可用主题列表。通过合理使用这个脚本,我们可以更好地了解 Kafka 集群在数据生产和消费方面的能力,及时发现潜在的性能瓶颈并进行优化,从而提高整个大数据处理流程的效率。无论是对于 Kafka 初学者还是有一定经验的开发者,这个脚本都是一个非常实用的工具,可以帮助我们更好地管理和优化 Kafka 集群的运行。

        在实际应用中,我们可以根据不同的业务需求和数据处理场景,灵活调整测试参数,深入分析测试结果,以确保 Kafka 集群能够稳定、高效地运行,满足日益增长的大数据处理需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2246797.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

day03(单片机高级)RTOS

目录 RTOS(实时操作系统) 裸机开发模式 轮询方式 前后台&#xff08;中断方式&#xff09; 改进&#xff08;前后台&#xff08;中断&#xff09;&#xff09;定时器 裸机进一步优化 裸机的其他问题 RTOS的概念 什么是RTOS 为什么要使用 RTOS RTOS的应用场景 RTOS的…

cookie反爬----普通服务器,阿里系

目录 一.常见COOKIE反爬 普通&#xff1a; 1. 简介 2. 加密原理 二.实战案例 1. 服务器响应cookie信息 1. 逆向目标 2. 逆向分析 2. 阿里系cookie逆向 1. 逆向目标 2. 逆向分析 实战&#xff1a; 无限debugger原理 1. Function("debugger").call() 2. …

C++中的erase()函数用法总结

在 C 中&#xff0c;erase() 是 std::string 和 std::vector 等容器中的成员函数&#xff0c;用于删除容器中的元素。erase可以删去容器中指定位置的元素&#xff0c;容器的size&#xff08;大小&#xff09;会改变&#xff0c;但是容器的容量不变。 常用用法&#xff1a; 1.…

全面解析:HTML页面的加载全过程(四)--浏览器渲染之样式计算

主线程遍历得到的 DOM 树&#xff0c;依次为树中的每个节点计算出它最终的样式&#xff0c;称之为 Computed Style。 通过前面生成的DOM 树和 CSSOM 树&#xff0c;遍历 DOM 树&#xff0c;为每一个 DOM 节点&#xff0c;计算它的所有 CSS 属性&#xff0c;最后会得到一棵带有…

Linux|内存级文件原理

目录 进程与文件 Linux下的文件系统 文件操作&#xff0c;及文件流 C语言函数 文件流 文件描述符 系统调用操作 系统调用参数 重定向与文件描述符 输出重定向 输入重定向 文件内容属性 Linux下一切皆文件 进程与文件 当我们对文件进行操作时&#xff0c;文件必须…

40分钟学 Go 语言高并发:Context包与并发控制

Context包与并发控制 学习目标 知识点掌握程度应用场景context原理深入理解实现机制并发控制和请求链路追踪超时控制掌握超时设置和处理API请求超时、任务限时控制取消信号传播理解取消机制和传播链优雅退出、资源释放context最佳实践掌握使用规范和技巧工程实践中的常见场景…

【SpringMVC - 1】基本介绍+快速入门+图文解析SpringMVC执行流程

目录 1.Spring MVC的基本介绍 2.大致分析SpringMVC工作流程 3.SpringMVC的快速入门 首先大家先自行配置一个Tomcat 文件的配置 配置 WEB-INF/web.xml 创建web/login.jsp 创建com.ygd.web.UserServlet控制类 创建src下的applicationContext.xml文件 重点的注意事项和说明…

neo4j图数据库community-5.50创建多个数据库————————————————

1.找到neo4J中的conf文件&#xff0c;我的路径是&#xff1a;D:\Program Files\neo4j-community-5.5.0-windows\neo4j-community-5.5.0\conf 这里找自己的安装路径&#xff0c; 2.用管理员模式打开conf文件&#xff0c;右键管理员&#xff0c;记事本或者not 3.选中的一行新建一…

如何最简单、通俗地理解Python的迭代器?

我们知道迭代器&#xff08;iterator&#xff09;可以用for循环去取数&#xff0c;这和列表取数有什么区别呢&#xff1f; 想理解Python迭起器的差异&#xff0c;有个很简单的例子 打个比方&#xff0c;你去玩街头投篮机&#xff0c;可以投5个球&#xff0c;这里有两种方式&a…

JavaEE 【知识改变命运】02 多线程(1)

文章目录 线程是什么&#xff1f;1.1概念1.1.1 线程是什么&#xff1f;1.1.2 为什么要有线程1.1.3 进程和线程的区别1.1.4 思考&#xff1a;执行一个任务&#xff0c;是不是创建的线程或者越多是不是越好&#xff1f;&#xff08;比如吃包子比赛&#xff09;1.1.5 ) Java 的线程…

Linux内核USB2.0驱动框架分析--USB包

一&#xff0c; 包的组成 每个包都由SOP&#xff08;包起始域&#xff09;、SYNC&#xff08;同步域&#xff09;、Packet Content&#xff08;包内容&#xff09;、EOP&#xff08;包结束域&#xff09;四部分组成&#xff0c;其中SOP、SYNC、EOP为所有包共有的域&#xff0c…

云轴科技ZStack亮相2024 IDC中国生态峰会,共塑AI时代IT生态新格局

11月21日&#xff0c;2024 IDC中国生态峰会在北京举办&#xff0c;吸引了超过300位生态伙伴齐聚一堂&#xff0c;聚焦行业内最前沿的热点话题。本届峰会以“创见先机&#xff0c;智领风云”为主题&#xff0c;深入探讨宏观经济趋势、技术革新以及如何融合AI与数据技术&#xff…

C0029.在Clion中解决Debug时,提示Process finished with exit code -1的错误

1.错误提示 Process finished with exit code -12.解决办法 如上在使用Debug进行代码调试时&#xff0c;直接出现如上报错&#xff0c;解决办法就是直接点击运行程序&#xff0c;即可查出报错编号&#xff0c;然后根据报错编号来查找问题&#xff1b; 然后在网上就可以根据该…

07-Making a Bar Chart with D3.js and SVG

课程链接 Curran的课程&#xff0c;通过 D3.js 的 scaleLinear, max, scaleBand, axisLeft, axisBottom&#xff0c;根据 .csv 文件生成一个横向柱状图。 【注】如果想造csv数据&#xff0c;可以使用通义千问&#xff0c;关于LinearScale与BandScale不懂的地方也可以在通义千…

读取各种来源格式单细胞数据集构建seurat分析对象,代做生信分析

参考资料和分析注意事项 全流程的分析指导视频 演示数据集网盘文件 分析参数文件路径格式的特别提示 大家给要分析用到的文件路径或目录路径的时候&#xff0c;以D:/omics_tools/demo_data/scrnaseq/GSE189125/GSE189125_5prime_scRNAseq_seqbatchA_counts.txt.gz 这个文件为…

SQL-多表操作

前文所介绍的sql操作都是基于单表进行的&#xff0c;接下来我们来学习多表操作。 多表设计 在实际的项目开发中&#xff0c;会根据业务需求和业务模块之间的关系进行数据库表结构设计&#xff0c;由于业务之间相互关联&#xff0c;所以各个表结构之间也存在着各种联系&#xf…

c++ STL线程安全使用

c STL不是线程安全的&#xff0c;因此在多线程中使用的时候&#xff0c;操作同一个容器&#xff0c;会崩溃&#xff0c;因此需要解决线程安全的问题&#xff1a; 使用实例类似于以下&#xff1a; #include <thread> #include <vector> #include "thread_safe…

Swift 实现判断链表是否存在环:快慢指针法

文章目录 前言摘要描述题解答案题解代码题解代码分析示例测试及结果时间复杂度空间复杂度总结关于我们 前言 本题由于没有合适答案为以往遗留问题&#xff0c;最近有时间将以往遗留问题一一完善。 LeetCode - #141 环形链表 不积跬步&#xff0c;无以至千里&#xff1b;不积小流…

SpringCloud实用-OpenFeign 调用三方接口

文章目录 前言正文一、项目环境二、项目结构2.1 包的含义2.2 代理的场景 三、完整代码示例3.1 定义FeignClient3.2 定义拦截器3.3 配置类3.4 okhttp配置3.5 响应体3.5.1 天行基础响应3.5.2 热点新闻响应 3.6 代理类3.6.1 代理工厂3.6.2 代理客户端3.6.3 FeignClient的建造器 四…

C++设计模式行为模式———中介者模式

文章目录 一、引言二、中介者模式三、总结 一、引言 中介者模式是一种行为设计模式&#xff0c; 能让你减少对象之间混乱无序的依赖关系。 该模式会限制对象之间的直接交互&#xff0c; 迫使它们通过一个中介者对象进行合作。 中介者模式可以减少对象之间混乱无序的依赖关系&…