kafka的 __consumer_offsets

news2024/12/27 16:35:15
  • Zookeeper不适合大批量的频繁写入操作。
  • Kafka 1.0.2将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。

1、创建topic “tp_test_01”

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_test_01 --partitions 5 --replication-factor 1

2、使用kafka-console-producer.sh脚本生产消息

[root@node1 ~]# for i in `seq 100`; do echo "hello lagou $i" >> messages.txt;done

[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt

        由于默认没有指定key,所以根据round-robin方式,消息分布到不同的分区上。 (本例中生产了100条消息)

3、验证消息生产成功

[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt

>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>

[root@node1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node1:9092 --topic tp_test_01 --time -1

tp_test_01:2:20
tp_test_01:4:20
tp_test_01:1:20
tp_test_01:3:20
tp_test_01:0:20
[root@node1 ~]#

结果输出表明100条消息全部生产成功!

4、创建一个console consumer group

[root@node1 ~]#kafka-console-consumer.sh --bootstrap-server node1:9092 --topic tp_test_01 --from-beginning

5、获取该consumer group的group id(后面需要根据该id查询它的位移信息)

[root@node1 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list

输出: console-consumer-49366 (记住这个id!)

6、查询__consumer_offsets topic所有内容

注意:运行下面命令前先要在consumer.properties中设置exclude.internal.topics=false

[root@node1 ~]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

        默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。

7、计算指定consumer group在__consumer_offsets topic中分区信息

        这时候就用到了第5步获取的group.id(本例中是console-consumer-49366)。Kafka会使用下面公式计算该group位移保存在__consumer_offsets的哪个分区上:

Math.abs(groupID.hashCode()) % numPartitions

        对应的分区=Math.abs("console-consumer-49366".hashCode()) % 50 = 19,即__consumer_offsets的分区19保存了这个consumer group的位移信息。 

8、获取指定consumer group的位移信息

[root@node1 ~]# kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 19 --broker-list node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

下面是输出结果:

...
[console-consumer-49366,tp_test_01,3]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
[console-consumer-49366,tp_test_01,4]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
[console-consumer-49366,tp_test_01,0]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
[console-consumer-49366,tp_test_01,1]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
[console-consumer-49366,tp_test_01,2]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
[console-consumer-49366,tp_test_01,3]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424707212,ExpirationTime 1596511107212]
[console-consumer-49366,tp_test_01,4]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424707212,ExpirationTime 1596511107212]
[console-consumer-49366,tp_test_01,0]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424707212,ExpirationTime 1596511107212]
...

        上图可见,该consumer group果然保存在分区11上,且位移信息都是对的(这里的位移信息是已消费的位移,严格来说不是第3步中的位移。由于我的consumer已经消费完了所有的消息,所以这里的位移与第3步中的位移相同)。另外,可以看到__consumer_offsets topic的每一日志项的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/78054.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《Linux运维总结:Centos7.6部署二进制mongodb4.4.8三节点副本集群》

一、Mongodb集群模式 1、三种集群介绍 MongoDB有三种集群部署模式&#xff0c;分别为主从复制&#xff08;Master-Slaver&#xff09;、副本集&#xff08;Replica Set&#xff09;和分片&#xff08;Sharding&#xff09;模式。 1、Master-Slaver 是一种主从副本的模式&#x…

自动驾驶之行人轨迹预测数据集

一、 Real Data ETH: Univ. Hotel;750 pedestrians exhibiting complex interactions UCY: Zara01, Zara02 and Uni.780 pedestrians 单应性矩阵&#xff0c;SLAM中的当用多个不同相机拍摄同一个三维平面需要考虑的矩阵&#xff0c;适应场景为平面情况 商场 这个数据集是用双…

Onvif学习

ONVIF onvif&#xff08;Open Network Video Interface Forum&#xff0c;开放型网络视频接口论坛&#xff09;协议. onvif协议涵盖了设备发现、设备配置、事件、PTZ控制、视频分析和实时流媒体直播功能&#xff0c;以及搜索&#xff0c;回放和录像录音管理功能。 先去看许振…

git 常用操作

Git 是一个分布式版本控制系统&#xff0c;用于项目开发中的版本控制。从本质上来讲Git是一个内容寻址(content-addressable)文件系统&#xff0c;并在此之上提供了一个版本控制系统的用户界面。 Git的核心部分是一个简单的键值对数据库(key-value data store)。 你可以向该数据…

XML配置文件、用来约束XML文档:DTD、Schema(类型更多)

上一章properties作为配置文件的内容好像还没讲&#xff1f; properties相对于XML的缺点&#xff1a;如果要运行多个方法&#xff0c;只能在properties配置文件里等号后面加逗号&#xff08;或指定符号&#xff09;隔开&#xff0c;然后再加值&#xff0c;这样累加下去会导致阅…

基于分布式数据库集群的大数据职位信息统计

目录 任务一&#xff1a; MongoDB 分布式集群关键配置信息截图&#xff08;启动参数文件、初始化参数文件、启动命令等&#xff09; ch0的参数文件配置&#xff1a; ​编辑 ch1的参数文件配置&#xff1a; ​编辑chconfig的参数文件配置&#xff1a; router的参数文件配置…

SpringSecurity整合Oauth2.0

SpringSecurity整合Oauth2.0一、概述与原理1.1 、OAuth2.0 是什么&#xff1f;1.2、OAuth2.0中角色解释1.3、OAuth2.0的4中授权模式1.3.1、授权码模式&#xff08;重点&#xff09;1.3.1.1 原理1.3.1.2 代码1.3.2、密码模式&#xff08;重点&#xff09;1.3.2.1 原理1.3.2.2 代…

开发运维(DevOps)自动化运维与持续交付企业级实战

一、网站部署流程 1、传统网站部署流程 传统的网站部署,大家在运维过程中,网站部署是运维的工作之一,网站部署的流程大致分为: 需求分析—原型设计—开发代码—提交测试—内网部署—确认上线—备份数据—外网更新-最终测试,如果发现外网部署的代码有异常,需要及时回滚…

[附源码]JAVA毕业设计心理健康系统(系统+LW)

[附源码]JAVA毕业设计心理健康系统&#xff08;系统LW&#xff09; 项目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&…

网络安全观察报告恶意软件观察

攻击类型分析 2018 年&#xff0c;主要的攻击类型 1 为 SYN Flood&#xff0c;UDP Flood&#xff0c;ACK Flood&#xff0c;HTTP Flood&#xff0c;HTTPS Flood&#xff0c; 这五大类攻击占了总攻击次数的 96&#xff05;&#xff0c;反射类攻击不足 3%。和 2017 年相比&…

使用分页导入的方式把大量数据从mysql导入es

1、首先要有分页功能的代码 如何使用mybatis-plus实现分页&#xff0c;可参考 http://t.csdn.cn/ddnlk 2、要创建feign远程调用模块 可以参考 http://t.csdn.cn/gshFw 3、在feign模块中声明远程调用接口 1.在feign模块中创建一个接口&#xff0c;名字可以是你要调用的服务名&…

指定区域内实现多尺度、多维度2D图形随机填充(如圆、椭圆、多边形)之MATLAB实现

N久之前&#xff0c;咱在公众号中分享了如何用MATLAB实现在指定区域内随机填充圆&#xff0c;并将相关功能封装一个名为randCircle函数里面&#xff0c;其可实现的功能如下&#xff1a; (1) 设定是否允许填充圆相交、相切或独立存在 (2) 指定区域内圆的生成个数 (3) 设定是否允…

[附源码]计算机毕业设计基于vuejs的文创产品销售平台appSpringboot程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

【面试题】说说 Promise是什么?如何使用

大厂面试题分享 面试题库 前端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★ 地址&#xff1a;前端面试题库 前言 本文主要介绍和总结Promise的作用、使用方式和其对应的一些方法,供大家参考学习&#xff0c;如有写的不准确的地方欢迎大家指出&a…

Android 使用 jni Demo示例

Android 使用 jni Demo示例简介1. NDK的介绍1.1 NDK 简介1.2 NDK 特点2. JNI介绍2.1 JNI 简介2.2 为什么要有 JNI&#xff1f;3. NDK 与 JNI 的关系NDK下载及环境配置1. 使用Android studio SDK Manager下载2.配置NDK2.1 配置环境变量2.2 Android studio配置NDK示例Demo流程1.版…

RabbitMQ - 安装和使用

RabbitMQ - 安装和使用一. 安装二. RabbitMQ的简单使用2.1 创建交换机2.1.1 交换机类型2.1.2 持久化方式2.2 创建队列2.3 绑定交换机和队列2.4 SpringBoot整合2.5 另外一种监听写法一. 安装 一键安装&#xff1a; docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 …

rtl8221b+mcu,2.5g光纤收发器的开发备份

1、rtl8221b是一款2.5g的光电转换的phy 系统的构建如下 为了省成本&#xff0c;不用mac来对接其中的gmii接口直接接光模块 2、mdio和mdc由mcu的gpio来模拟&#xff0c;在csdn上有很多的文章来参考 mdio的参数如下 不想看英文可以参考下面的文章 MDIO(clause 22 与 clause 4…

Java基础之《netty(10)—Reactor三种模式》

一、单Reactor单线程模式 1、工作原理图 2、方案说明 &#xff08;1&#xff09;Select是前面I/O复用模型介绍的标准网络编程API&#xff0c;可以实现应用程序通过一个阻塞对象监听多路连接请求。 &#xff08;2&#xff09;Reactor对象通过Select监控客户端请求事件&#xf…

一元钱注册 chatGPT账号

文章目录打开 openai chatgpt 主页注册 chatGPT 账号找境外的电话号码激活账号查看服务价格账号注册充值成功参考视频 打开 openai chatgpt 主页 打开之前首先登录 vpn。但是使用 vpn 有可能还是会被告知 当前国家没有开放服务个人建议&#xff1a; 使用美国的 ip 地址我使用…

PIN TO PIN替代GM8775C|DSI转LVDS转换方案芯片CS5518|CS5518完全替代GM8775C

GM8775C 型 DSI 转双通道 LVDS 发送器产品主要实现将 MIPI DSI 转单/双通道 LVDS 功能&#xff0c;MIPI 支持 1/2/3/4 通道可选&#xff0c;最大支持 4Gbps 速率。LVDS 时钟频率最高 154MHz&#xff0c; 最大支持视频格式为 FULL HD&#xff08;1920 x 1200&#xff09; CS551…