从kafka和zookeeper中获取生产和消费偏移量

news2024/9/24 0:18:33

从kafka和zookeeper中获取生产和消费偏移量

  • 特殊说明

    • 该命令是使用python进行编译,需要使用centos7系统上进行使用。
  • 命令详细

[root@mongodb_1 get_offsets_num]# ./get_offsets_num -h
usage: get_offsets_num [-h] [-k KAFKA_HOST] [-z ZOOKEEPER_HOST]
                       [-m INTERVAL_MINUTES]

Usage of argparse

optional arguments:
  -h, --help            show this help message and exit
  -k KAFKA_HOST, --kafka_host KAFKA_HOST
                        需要输入kafka:端口
  -z ZOOKEEPER_HOST, --zookeeper_host ZOOKEEPER_HOST
                        需要输入zookeeper:端口
  -m INTERVAL_MINUTES, --Interval_minutes INTERVAL_MINUTES
                        间隔分钟
  • 命令执行
[root@mongodb_1 get_offsets_num]# ./get_offsets_num_v2.py  -k 10.130.25.77:9092 -z 10.130.25.79:2181  
Interval 1 minutes sleep
=======================================================================================
kafka offsets: agent 2574552 2574552
zookeeper offsets: agent 2574552 2574552
agent kafka offsets num: 0 storm offsets num: 0 Actual consumption: 0
=======================================================================================
kafka offsets: record 89110 89110
zookeeper offsets: record 89110 89110
record kafka offsets num: 0 storm offsets num: 0 Actual consumption: 0
=======================================================================================
  • 代码详情
#!/usr/local/python3/bin/python3
import os, time,json,argparse
from kazoo.client import KazooClient
from kafka3 import KafkaConsumer, TopicPartition


def get_zoo_consumer_info(Topology):
    Topology_num = 0
    zk_cli.start()
    path = "/stormOffset/" + Topology + "/partition_0"
    if zk_cli.exists(path):
        str_data, stat = zk_cli.get(path)
        str_data = json.loads(str_data)
        Topology_num =  str_data.get("offset")
        #print("zookeeper now " + path + " offsets: " + str(Topology_num) )
    else:   
        print("Path " + path  + " does not exist.")
    return Topology_num

def get_kafka_consumer_info(server, topic):
    partition = 0
    tp = TopicPartition(topic, partition)
    end_offset = server.end_offsets([tp])[tp]
    #print("kafka topic " + topic + " partition " + str(partition) + " offsets: " + str(end_offset))
    return end_offset


if  __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Usage of argparse')
    parser.add_argument('-k','--kafka_host', type=str, default="10.130.25.77:9092",help='需要输入kafka:端口')
    parser.add_argument('-z','--zookeeper_host', type=str, default="10.130.25.79:2181",help='需要输入zookeeper:端口')
    parser.add_argument('-m','--Interval_minutes', type=int, default="1",help='间隔分钟')
    
    args = parser.parse_args()
    kafka_host= args.kafka_host
    zookeer_host= args.zookeeper_host

    Kafka_production_topics = "agent,record"
    Zoo_consumption_topics= "agentTopology,recordTopology"
    Interval_minutes = args.Interval_minutes

    try:
        zk_cli = KazooClient(hosts=zookeer_host)
        #print("init zookeeper " + zookeer_host + " conn ok")
    except Exception as e:
        print("init zookeeper conn error: "+ str(e))

    try:
        #kafka_server = KafkaConsumer(bootstrap_servers=kafka_host)
        kafka_server = KafkaConsumer(bootstrap_servers=kafka_host)
        #print("init kafka " + kafka_host + "  conn ok")
    except Exception as e:
        print("init kafka conn error: "+ str(e))


    zoo_offset = {}
    kafka_offset = {}
    Kafka_production_topics_list = Kafka_production_topics.split(",")
    Kafka_production_topics_list_2  =  Kafka_production_topics.split(",")
    Zoo_consumption_topics_list = Zoo_consumption_topics.split(",")
    Zoo_consumption_topics_list_2 =   Zoo_consumption_topics.split(",")
    for i in range(0,len(Kafka_production_topics_list)):
        kafka_topics = Kafka_production_topics_list.pop()
        get_kafka_offset_num = get_kafka_consumer_info(kafka_server,kafka_topics)
        kafka_offset[kafka_topics]=get_kafka_offset_num
        zoo_topics = Zoo_consumption_topics_list.pop()
        get_zoo_offset_num = get_zoo_consumer_info(zoo_topics)
        zoo_offset[zoo_topics]= get_zoo_offset_num
    print("Interval " + str(Interval_minutes) + " minutes sleep")
    print("=======================================================================================")
    time.sleep(int(Interval_minutes) * 60)
    
    for i in range(0,len(Kafka_production_topics_list_2)):
        kafka_topics = Kafka_production_topics_list_2.pop()
        get_kafka_offset_num = get_kafka_consumer_info(kafka_server,kafka_topics)
        last_kafka_num = kafka_offset.get(kafka_topics)
        minutes_kafka_offset_num = get_kafka_offset_num - last_kafka_num
        zoo_topics = Zoo_consumption_topics_list_2.pop()
        get_zoo_offset_num = get_zoo_consumer_info(zoo_topics)
        last_zoo_num =  zoo_offset.get(zoo_topics)
        minutes_zoo_offset_num = get_zoo_offset_num - last_zoo_num
        Difference = minutes_kafka_offset_num - minutes_zoo_offset_num
        print("kafka offsets:",kafka_topics,get_kafka_offset_num,last_kafka_num)
        print("zookeeper offsets:",kafka_topics,get_zoo_offset_num,last_zoo_num)
        print(kafka_topics  + " kafka offsets num: " + str(minutes_kafka_offset_num) + " storm offsets num: " + str(minutes_zoo_offset_num) + " Actual consumption: " + str(Difference))
        print("=======================================================================================")
    zk_cli.stop()
    # 关闭消费者连接
    kafka_server.close()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2158860.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【重学 MySQL】三十八、group by的使用

【重学 MySQL】三十八、group by的使用 基本语法示例示例 1: 计算每个部门的员工数示例 2: 计算每个部门的平均工资示例 3: 结合 WHERE 子句 WITH ROLLUP基本用法示例注意事项 注意事项 GROUP BY 是 SQL 中一个非常重要的子句,它通常与聚合函数(如 COUNT…

MySQL和SQL的区别简单了解和分析使用以及个人总结

MySQL的基本了解 运行环境,这是一种后台运行的服务,想要使用必须打开后台服务,这个后台服务启动的名字是在安装中定义的如下图(个人定义MySQL88)区分大小写图片来源 可以使用命令net start/stop 服务名,开…

实验十八:IIC-EEPROM实验

这个实验比较复杂,是目前第一个多文件项目 KEY1-4:P3^0-P3^3 IIC_SCL=P2^1; IIC_SDA=P2^0; //定义数码管位选信号控制脚 LSA=P2^2; LSB=P2^3; LSC=P2^4; 代码 main.c #include "public.h" #in

常见汽车零部件ASIL等级示例

ASIL(Automotive Safety Integrity Level,汽车安全完整性等级)评级系统是ISO 26262标准中定义的一套风险分类体系,用于评估道路车辆中电子电气系统(E/E系统)功能安全的风险程度,并确保这些系统在…

Linux相关概念和重要知识点(6)(make、makefile、gdb)

1.make、makefile (1)什么是make、makefile? 在我们写完代码后,要编译运行,如果有多个.c文件就需要每次都自己用gcc -o来处理,这十分麻烦。当我们想要自定义多个文件的处理时,我们会浪费很多时…

MatrixOne助力一道创新打造高性能智能制造AIOT系统

客户简介 深圳一道创新(ETAO Innovation)成立于2012年,是一家创新型软件及信息技术服务商,致力于制造戏份行业—电子制造业的数字转型服务,构建万物互联的智能工程。一道创新致力于把先进的软件系统、数字平台、人工智…

拯救者Legion R9000X 2021R(82K8)原厂Win10与Windows11系统恢复镜像下载

LENOVO联想拯救者R9000X锐龙版2021款【82K8】预装OEM系统WIN11/10安装包,恢复原装出厂时开箱状态一模一样 链接:https://pan.baidu.com/s/15dGwacsEG0G8pOiZAHyXaQ?pwd0xgk 提取码:0xgk 联想原装出厂系统自带所有驱动、出厂主题壁纸、系统…

得物App荣获新奖项,科技创新助力高质量发展

近日,备受瞩目的2024中国国际服务贸易交易会(简称“服贸会”)在北京盛大开幕,这一全球唯一的国家级、国际性、综合型服务贸易盛会再次汇聚了全球服务贸易领域的精英与前沿成果。服贸会由商务部和北京市政府携手打造,并…

大数据系统调优:从DAG到单机

目标:优化T10的时效性全局DAG调度层优化:提前任务开始时间: 1. 优化慢结点:T10依赖了T4,T7,T8, 其中T8为瓶颈,如果T8能提前点完成,T10可以早点开始,就能早点完成 2. 快结点做更多预计算…

Android Studio 真机USB调试运行频繁掉线问题

一、遇到问题 Android Studio使用手机运行项目时,总是频繁掉线,连接很不稳定,动不动就消失,基本上无法使用 二、问题出现原因 1、硬件问题:数据线 换条数据线试试,如果可以,那就是数据线的…

如何登录通义灵码,快速开启AI编码之旅?

通义灵码个人版开发者可以使用阿里云账号登录通义灵码 IDE 端插件,本文介绍个人版开发者登录 IDE 端插件的操作指南。 登录通义灵码 步骤 1:准备工作 已成功注册阿里云账号,具体操作可参考:账号注册(PC端)…

通信工程学习:什么是SDN软件定义网络

SDN:软件定义网络 SDN(Software Defined Network),即软件定义网络,是一种新兴的网络架构和技术,它实现了网络控制平面与数据转发平面的分离,并通过软件平台进行集中控制和管理。以下是SDN的详细…

02 BlockChain-- ETH

以太坊与比特币有什么不同? 以太坊立足比特币创新之上,于 2015 年启动,两者之间有一些显著不同。 从宏观的方面: 比特币就仅仅是比特币;以太坊(Ethereum)包括以太币(Ether&#x…

ubuntu中如何查看类型(函数)定义的头文件

问题: 1.该如何查找函数,或者数据类型的头文件? 方法: 1.使用vim搭配ctags 2.使用vscode 使用vscode查看头文件位置的步骤: 1.例如下图,我想添加包含file_operations的头文件 2.双击选中数据类型&#xf…

network request to https://registry.npmjs.org/xxx failed, reason: connect ETIM

目录: 1、问题描述2、解决方案3、npm镜像仓库替换 1、问题描述 npm install 时,报错:npm ERR! network request to https://registry.npmjs.org/postcss-pxtorem failed, reason: connect ETIMEDOU npm ERR! code ETIMEDOUT npm ERR! errno…

DSP学习00-F28379D学习准备(了解一个工程的构成)

叠甲 我也算初学F28379D,不对之处请大家斧正。不同型号的DSP在外设配置的函数上有一些区别,但是掌握一种对其他型号的来说则难度不大。对于我们而言学习DSP最终还是要用于算法验证,而DSP资源的最大化利用、代码效率提升等则是后话。 软件准…

【ASE】第一课_双面着色器

今天我们一起来学习ASE插件,希望各位点个关注,一起跟随我的步伐 今天我们来学习双面着色器,对颜色和贴图进行差值,双面显示不同的效果 最终效果: 思路: 1.先确定前后面的贴图和颜色 贴图(Alb…

华为高级交换技术笔记 2024-2025

2024-2025 一、9/31.通信模型和封装2.以太网3.MAC地址4.以太网帧5.MAC地址表的建立 二、9/61.交换机的数据的处理2.以太网帧的分类3.广播域4.vlan技术开发背景 一、9/3 1.通信模型和封装 2.以太网 3.MAC地址 4.以太网帧 5.MAC地址表的建立 二、9/6 1.交换机的数据的处理 2.以…

[SAP ABAP] 数据字典外键关联

SE11创建自定义数据库表 学校表(ZDBT_SCH_437) 表有3个组成字段: ① MANDT (参考数据元素为MANDT,主键) ② SCHID 学校ID (参考新建数据元素ZDE_SCHID_437,主键,NUMC4) ③ SCHNAME 学校名称 (CHAR20) 学生表(ZDBT_STU_437) 表有7个…

codeforces round974 div3 分层图 树形dp

A Robin Helps 问题&#xff1a; 思路&#xff1a;模拟 代码&#xff1a; #include <bits/stdc.h> using namespace std;const int N 2e5 10;void solve() {int n, k;cin >> n >> k;vector<int> a(n 1);for(int i 1; i < n; i ) cin >&…