【shell-10】shell实现的各种kafka脚本

news2025/1/18 4:48:28

kafka-shell工具

    • 背景
    • 日志 log
    • 一.启动kafka->(start-kafka)
    • 二.停止kafka->(stop-kafka)
    • 三.创建topic->(create-topic)
    • 四.删除topic->(delete-topic)
    • 五.获取topic列表->(list-topic)
    • 六. 将文件数据 录入到kafka->(file-to-kafka)
    • 七.将kafka数据 下载到文件->(kafka-to-file)
    • 八. 查看topic的groupID消费情况->(list-group)

背景

注意:我用的kafka版本是 3.2.1 其他版本kafka提供的 命令行可能有细微区别。
因为经常要用kafka环境参与测试,所以写了不少脚本。在很多时候可以大大提高测试的效率。
主要包含如下功能:
topic的管理【创建,删除】
topic信息查看【topic列表,topic groupid 消费情况】
topic数据传输【file数据录入到topic,topic数据下载到本地文件】
脚本中做了各种检查,日志的输出做了颜色区分,用起来没啥问题。

日志 log

此文件是个额外的日志文件主要用于打印日志,该文件会被下面的shell文件引用

#!/bin/bash
#日志级别 debug-1, info-2, warn-3, error-4, always-5
LOG_LEVEL=2

#调试日志
function log_debug(){
  content="$(date '+%Y-%m-%d %H:%M:%S') [DEBUG]: $@"
  [ $LOG_LEVEL -le 1  ] && echo -e "\033[32m"  ${content}  "\033[0m"
}
#信息日志
function log_info(){
  content="$(date '+%Y-%m-%d %H:%M:%S') [INGO]: $@"
  [ $LOG_LEVEL -le 2  ] && echo -e "\033[32m"  ${content} "\033[0m"
}
#警告日志
function log_warn(){
  content="$(date '+%Y-%m-%d %H:%M:%S') [WARN] $@"
  [ $LOG_LEVEL -le 3  ] && echo -e "\033[33m" ${content} "\033[0m"
}
#错误日志
function log_err(){
  content="$(date '+%Y-%m-%d %H:%M:%S') [ERROR]: $@"
  [ $LOG_LEVEL -le 4  ] && echo -e "\033[31m" ${content} "\033[0m"
}
~  

一.启动kafka->(start-kafka)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/log

pid=`ps -aux | grep /home/kafka/kafka_2.12-3.2.1/bin/ | grep -v grep |awk '{print$2}'`
log_info "Start checking kafka process"
if [ -z $pid ]; then
   log_info "The kafka process does not exist, startting.........................................................................................."
else
   log_warn "The kafka process exists and does not need to be started"
   exit 1
fi
nohup kafka-server-start.sh /home/kafka/kafka_2.12-3.2.1/config/server.properties >>/home/kafka/kafka.log 2>&1 &
# 日志的路径是安装kafka的时候指定的,也要替换成自己的路径
tail -f 20 /home/kafka/kafka.log

二.停止kafka->(stop-kafka)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/log
log_info "Start checking kafka process"
pid=`ps -aux | grep /home/kafka/kafka_2.12-3.2.1/bin/ | grep -v grep |awk '{print$2}'`
if [ -z $pid ]; then
   log_warn "The kafka process does not exist and does not need to be stopped"
   exit 1
else
   log_info "The kafka process alive, stopping.............................................................................................................."
fi
kafka-server-stop.sh
log_info "Stop kafka success"

三.创建topic->(create-topic)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/log
log_info "脚本功能: 创建topic"
log_info "脚本参数: topic名称(必选)"
if [ $# -ne 1 ]; then
  log_err "错误:请传入topic名称"
  exit 1
fi
#TOPIC名称
TOPIC_NAME=$1
#KAFKA地址
KAFKA_BROKER=ip:9092
# 检查Kafka主题是否存在, 若已存在则放弃创建
if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$TOPIC_NAME$";then
    log_warn "$TOPIC_NAME 已经存在,放弃创建"
else
    # 默认1副本,3分区
    kafka-topics.sh --create --bootstrap-server $KAFKA_BROKER --replication-factor 1 --partitions 3 --topic $TOPIC_NAME
    log_info "请执行topic-list检查创建是否成功"
fi
~     

在这里插入图片描述

四.删除topic->(delete-topic)

下面代码中的路径你要替换成自己的路径

#!/bin/bash

source /home/shell/log
log_info "脚本作用:删除topic"
log_info "脚本参数: 支持多个topic,用空格分开,可以批量删除"
KAFKA_BROKER=ip:9092
function check_kafka_topic() {
    local local_topic_name=$1
    if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$local_topic_name$";then
        log_info "$local_topic_name存在->true"
        return 0  # 返回true  
    else
        log_warn "$local_topic_name 不存在->false"
        return 1  # return false
    fi
}

# 逐个删除topic
for topic in "$@"
do
  if ! check_kafka_topic $topic; then
     log_info "tpoic->$topic 不存在,跳过删除行为"
     continue
  else
     log_info "topic->$topic 执行删除"
     kafka-topics.sh --delete --bootstrap-server $KAFKA_BROKER --topic $topic
     log_info "topic->$topic 删除成功"
  fi
done

在这里插入图片描述

五.获取topic列表->(list-topic)

#!/bin/bash
source /home/shell/log
KAFKA_BROKER=ip:9092  
log_info "脚本作用: 列出topic信息"
log_info "脚本参数: topic名称(可选->未指定topic则列出所有topic信息)"
if [ $# -eq 1 ]; then
    log_info "目标$1 详情如下"
    kafka-topics.sh --describe  --bootstrap-server $KAFKA_BROKER | grep -v "__consumer_offsets" | grep $1
else
    log_info "所有topic 列表如下:"
    kafka-topics.sh --describe  --bootstrap-server $KAFKA_BROKER | grep -v "__consumer_offsets"
fi

在这里插入图片描述

六. 将文件数据 录入到kafka->(file-to-kafka)

#!/bin/bash
source /home/shell/log
log_info "脚本作用: 将文件中的数据录入指定topic"
log_info "脚本参数: 1.文件路劲(必选) 2.topic(必选)"
log_info "参数校验"
log_info "执行条件检查.........................................................................................................."
if [ $# -ne 2 ]; then
  log_err "必须传入两个参数: 1.文件路劲(必选) 2.topic(必选)"
  exit 1
fi

if ! [ -f $1 ]; then
  log_err "$1不是一个有效的数据文件"
  exit 1
fi

FILE_PATH=$1
TOPIC_NAME=$2
KAFKA_BROKER=ip:9092  


#检查topic是否存在
function check_kafka_topic() {
    local local_KAFKA_BROKER=$1
    if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$local_KAFKA_BROKER$";then
        return 0  # 返回true  
    else
        return 1  # return false
    fi
}

#将文件数据推送到kafka
function send_to_kafka(){
    local local_path=$1
    local count=0
    while IFS= read -r line; do  
      kafka-console-producer.sh --broker-list $KAFKA_BROKER --topic $TOPIC_NAME <<< "$line"  
      count=$((count+1))
    done < "$local_path"
    echo $count
}        

if ! check_kafka_topic $TOPIC_NAME;then
  log_err "条件检查不通过, 原因: topic->$TOPIC_NAME不存在, 请先创建topic"
  exit 1
fi


log_info "参数检查通过.........................................................................................................."
start_time=`date "+%Y-%m-%d %H:%M:%S"`
start_seconds=$(date -d "$start_time" +%s)

log_info "开始录入数............................................................................................................"
count=$(send_to_kafka $FILE_PATH)

end_time=`date "+%Y-%m-%d %H:%M:%S"`
end_seconds=$(date -d "$end_time" +%s)
time_diff=$((end_seconds - start_seconds))  

log_info "录入条数: $count"
log_info "花费时间:$time_diff 秒"
log_info "录入完成.............................................................................................................."


在这里插入图片描述

七.将kafka数据 下载到文件->(kafka-to-file)

#!/bin/bash
source /home/shell/log
log_info "脚本作用: 将kafka指定topic的数据消费到指定文件中"
log_info "脚本参数:1.数据文件路径(必选) 2.topic名称(必选) 3.groupID(可选->不存在则从头消费,存在则从grooupID offset 开始消费)"
log_info "group-list 脚本可以查看当前的"
# Kafka的bin目录  
KAFKA_BIN_DIR=/path/to/kafka/bin

#kafka 地址  
KAFKA_SERVER=ip:9092 
 
# Kafka的配置文件目录  
KAFKA_CONFIG_DIR=/home/kafka/kafka_2.12-3.2.1/config

# Kafka消费者配置文件  
CONSUMER_CONFIG=$KAFKA_CONFIG_DIR/consumer.properties

# 指定要消费的主题  
TOPIC_NAME=your_topic_name

# 指定要写入的文件 
FILE_PATH=$1
TOPIC_NAME=$2
GROUP_ID=$3


log_info "执行检察............................................................................................................................"

function check_kafka_topic() {
    local local_topic_name=$1
    if kafka-topics.sh --bootstrap-server $KAFKA_SERVER  --list | grep -q "^$local_topic_name$";then
        return 0  # 返回true  
    else
        return 1  # return false
    fi
}

 
if ! check_kafka_topic $TOPIC_NAME;then
  log_err "topic->$TOPIC_NAME 未找到"
  exit 1
fi
log_info "检查通过............................................................................................................................"

log_info "当前topic,所有groupID的消费情况如下>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
while IFS= read -r line; do
   if [[ $line == *"PARTITION"* ]]; then
         content="$(date '+%Y-%m-%d %H:%M:%S') [INFO] $line"
         echo -e "\033[45m" ${content} "\033[0m"
   else  
       log_info "$line"
   fi
done< <(kafka-consumer-groups.sh --bootstrap-server $KAFKA_SERVER --describe  --all-groups | grep -v '__consumer_offsets' | grep "$TOPIC_NAME\|PARTITION")

log_info "当前topic,所有groupID的消费情况输出完成>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"

 
log_info "消费进程运行中( CTRL+C 可退出消费 )................................................................................................."
# 运行消费者脚本并将输出重定向到文件  
if [ $# -eq 2 ]; then
  kafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER --topic $TOPIC_NAME --from-beginning > $FILE_PATH
fi
if [ $# -eq 3 ]; then
  kafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER --topic $TOPIC_NAME --from-beginning --group $GROUP_ID > $FILE_PATH
fi

在这里插入图片描述

八. 查看topic的groupID消费情况->(list-group)

#!/bin/bash
kafka_broker=ip:9092
source /home/shell/log
log_info "脚本功能: 查看topic的groupID信息"
log_info "脚本参数: topic名称(可选->未指定topic则列出所有topic的groupID信息)"
function check_kafka_topic() {
    local local_topic_name=$1
    if kafka-topics.sh --bootstrap-server $kafka_broker  --list | grep -q "^$local_topic_name$";then
        log_info "$local_topic_name存在->true"
        return 0  # 返回true  
    else
        log_warn "$local_topic_name 不存在->false"
        return 1  # return false
    fi
}

if [ $# -eq 1 ]; then
  if ! check_kafka_topic $1; then
    #topic 不存在则直接退出程序
    log_warn "topic=$1, 不存在"
    exit 1
  fi
  log_info "topic_name=$1 的gruoupID信息如下:"
  kafka-consumer-groups.sh --bootstrap-server $kafka_broker --describe --all-groups | grep $1 | grep -v __consumer_offsets
else
  log_info "所有groupID信息如下:"
  kafka-consumer-groups.sh --bootstrap-server $kafka_broker --describe --all-groups | grep -v __consumer_offsets
fi

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1413528.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Oracle RAC集群日志

文章目录 一、DB日志1、日志所在位置介绍2、知识介绍 二、ASM日志1、日志所在位置介绍2、知识介绍 三、CRS日志1、日志所在位置介绍2、知识介绍 四、RAC相关日志详细总结 一、DB日志 DB日志也就是数据库日志&#xff0c;全称Oracle Database Logs 1、日志所在位置介绍 日志位…

【计算机图形学】实验五 一个简单的交互式绘图系统(实验报告分析+截图+源码)

可以先看一看这篇呀~【计算机图形学】专栏前言-CSDN博客https://blog.csdn.net/m0_55931547/article/details/135863062 目录 一、实验目的 二、实验内容

Transformer and Pretrain Language Models3-6

Pretrain Language Models预训练语言模型 content&#xff1a; language modeling&#xff08;语言模型知识&#xff09; pre-trained langue models(PLMs&#xff09;&#xff08;预训练的模型整体的一个分类&#xff09; fine-tuning approaches GPT and BERT&#xff08;…

银行数据仓库体系实践(3)--数据架构

狭义的数据仓库数据架构用来特指数据分布&#xff0c;广义的数据仓库数据架构还包括数据模型、数据标准和数据治理。即包含相对静态部分如元数据、业务对象数据模型、主数据、共享数据&#xff0c;也包含相对动态部分如数据流转、ETL、整合、访问应用和数据全生命周期管控治理。…

Angular组件(一) 分割面板ShrinkSplitter

Angular组件(一) 分割面板ShrinkSplitter 前言 分割面板在日常开发中经常使用&#xff0c;可将一片区域&#xff0c;分割为可以拖拽整宽度或高度的两部分区域。模仿iview的分割面板组件&#xff0c;用angular实现该功能&#xff0c;支持拖拽和[(ngModel)]双向绑定的方式控制区…

为什么 FPGA 比 CPU 和 GPU 快?

FPGA、GPU 与 CPU——AI 应用的硬件选择 现场可编程门阵列 (FPGA) 为人工智能 (AI) 应用带来许多优势。图形处理单元 (GPU) 和传统中央处理单元 (CPU) 相比如何&#xff1f; 人工智能&#xff08;AI&#xff09;一词是指能够以类似于人类的方式做出决策的非人类机器智能。这包…

Excel 2019 for Mac/Win:商务数据分析与处理的终极工具

在当今快节奏的商业环境中&#xff0c;数据分析已经成为一项至关重要的技能。从市场趋势预测到财务报告&#xff0c;再到项目管理&#xff0c;数据无处不在。而作为数据分析的基石&#xff0c;Microsoft Excel 2019 for Mac/Win正是一个强大的工具&#xff0c;帮助用户高效地处…

77 C++对象模型探索。虚函数- 从静态联编,动态联编出发,分析 虚函数调用问题探究

什么叫做单纯的类&#xff1a; 比较简单的类&#xff0c;尤其不包括 虚函数 和虚基类。 什么叫不单纯的类&#xff1a; 从上一章的学习我们知道&#xff0c;在某些情况下&#xff0c;编译器会往类内部增加一些我们看不见但是真实存在的成员变量&#xff0c;例如vptr&#xff…

matlab appdesigner系列-图窗工具2-工具栏

工具栏&#xff0c;就是一般在任意软件界面上方的工具菜单栏 示例&#xff1a;工具菜单绘制正弦函数 操作步骤如下&#xff1a; 1&#xff09;将坐标区和工具栏拖拽到画布上 2)点击工具栏的号&#xff0c;可以看到可以添加2种工具&#xff0c;按钮工具和切换工具&#xff0c…

【JavaScript权威指南第七版】读书笔记速度

JavaScript权威指南第七版 序正文前言&#xff1a;图中笔记重点知识第1章 JavaScript简介第一章总结 第2章 词法结构注释字面量标识符和保留字Unicode可选的分号第二章总结 第3章 类型、值和变量【重要】原始类型特殊类型第三章总结 第4章 表达式与操作符表达式操作符条件式调用…

【量化交易】股市舞者:小明的撮合交易之旅

马西森AES撮合交易系统 在繁华的都市中&#xff0c;小明&#xff0c;一个普通的青年&#xff0c;刚刚赚到了人生的第一桶金——20万。这笔意外的财富&#xff0c;点燃了他对股市的强烈兴趣。他开始如饥似渴地学习金融知识&#xff0c;钻研各种交易策略。 一天&#xff0c;小…

基于 java+springboot+mybatis电影售票网站管理系统前台+后台设计和实现

基于 javaspringbootmybatis电影售票网站管理系统前台后台设计和实现 &#x1f345; 作者主页 央顺技术团队 &#x1f345; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; &#x1f345; 文末获取源码联系方式 &#x1f4dd; &#x1f345; 查看下方微信号获取联系方式 承…

微软 Power Apps Canvas App 画布应用将上传的附件转化为base64编码操作

微软 Power Apps Canvas App 画布应用将上传的附件结合Power Automate转化为base64编码操作 在使用canvas app的过程中&#xff0c;我们有时需要将上传的文件转换为base64存入数据库或者&#xff0c;调用外部接口传参&#xff0c;那么看下如何将文件转化为base64编码格式。 首先…

金智易表通构建学生缴费数据查询+帆软构建缴费大数据报表并整合到微服务

使用金智易表通挂接外部数据,快速建设查询类服务,本次构建学生欠费数据查询,共有3块设计,规划如下: 1、欠费明细查询:学校领导和财务处等部门可查询全校欠费学生明细数据;各二级学院教职工可查询本二级学院欠费学生明细数据。 2、大数据统计报表:从应收总额、欠费总额…

C语言编程中的陷阱与规避策略

一、引言 C语言作为一门历史悠久且广泛应用的编程语言&#xff0c;其强大的功能和灵活性深受开发者喜爱。然而&#xff0c;这种灵活性也带来了许多潜在的陷阱和难点&#xff0c;特别是对于新手来说&#xff0c;可能会在编程过程中遇到各种预料之外的问题。本文将深入探讨C语言…

自动验证码解析器:CapSolver的Chrome扩展程序自动解析器

自动验证码解析器&#xff1a;CapSolver的Chrome扩展程序自动解析器 验证码是网站实施的一种安全措施&#xff0c;通常对用户构成挑战。然而&#xff0c;随着技术的进步&#xff0c;验证码解析器已经出现&#xff0c;以简化这一过程。在本文中&#xff0c;我们将探讨专为Googl…

【华为 ICT HCIA eNSP 习题汇总】——题目集9

1、缺省情况下&#xff0c;广播网络上 OSPF 协议 Hello 报文发送的周期和无效周期分别为&#xff08;&#xff09;。 A、10s&#xff0c;40s B、40s&#xff0c;10s C、30s&#xff0c;20s D、20s&#xff0c;30s 考点&#xff1a;①路由技术原理 ②OSPF 解析&#xff1a;&…

【Unity3D日常开发】Unity3D中UGUI的Text、Dropdown输入特殊符号

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址我的个人博客 大家好&#xff0c;我是佛系工程师☆恬静的小魔龙☆&#xff0c;不定时更新Unity开发技巧&#xff0c;觉得有用记得一键三连哦。 一、前言 在开发中会遇到需要显示特殊符号的情况&#xff0c;比如上标、…

机房及设备安全智慧监管AI+视频方案的设计和应用

一、背景分析 随着互联网的迅猛发展&#xff0c;机房及其配套设施的数量持续攀升&#xff0c;它们的运行状况对于企业运营效率和服务质量的影响日益显著。作为企业信息化的基石&#xff0c;机房的安全监测与管理的重要性不容忽视。它不仅关乎企业的稳定运营&#xff0c;同时也…

[docker] Docker的私有仓库部署——Harbor

一、Docker原生私有仓库—— Registry 1.1 Registry的简单了解 关于Docker的仓库分为私有库和公有仓库&#xff0c;共有仓库只要在官方注册用户&#xff0c;登录即可使用。但对于仓库的使用&#xff0c;企业还是会有自己的专属镜像&#xff0c;所以私有库的搭建也是很有必要的…