kafka-consumer-offset位移

news2024/11/22 21:18:25

目录

1 offset的默认维护位置

1.1 消费offset案例

2 自动提交offset

3 手动提交offset

3.1 原理

3.2 代码示例

3.2.1 同步提交

3.2.2 异步提交(生产常用)

4 指定offset消费

5 指定时间消费

6 漏消费和重复消费分析

6.1 重复消费

 6.2 漏消费

6.3 消费者事务

 7 数据积压


1 offset的默认维护位置

_consumer_offsets主题里面采用key和 value的方式存储数据。key是 group.id+topic+分区号value 就是当前offset的值。每隔一段时间,kafka 内部会对这个topic进行compact(压缩),也就是每个group.id+topic+分区号就保留最新数据。

Kafka0.9版本之前,consumer黑认将offset保存在Zookeeper中。0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为_consumer_offsets。

将offset信息存储在zk中的不足:如果将offset信息存储在zk中,那么所有的consumer都会访问zk,会消耗大量的网络资源,消费速度慢。

1.1 消费offset案例

  1. 思想:_consumer_offsets为Kafka中的 topic,那就可以通过消费者进行消费。
  2. 在配置文件 config/consumer.properties中添加配置exclude.internal.topics = false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为false。修改以后执行分发命令:xsync consumer.properties。
  3. 采用命令行方式,创建一个新的topic。
    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --replication-factor 2
  4. 启动生产者往atguigu生产数据。
    [atguigu@hadoop102 kafka] $ bin/kafka-console-producer.sh --topic atguigu --bootstrap-server hadoop102:9092
  5. 启动消费者消费atguigu数据。
    [atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh bootstrap-server hadoop102:9092--topic atguigu --group test
    注意:指定消费者组名称,更好观察数据存储位置(key是 group.id+topic+分区号)。
  6. 查看消费者消费主题_consumer_offsets。
    [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic _consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
     

2 自动提交offset

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。自动提交offset的相关参数:

  • enable.auto.commit:是否开启自动提交offset功能,默认是true
  • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

消费者配置代码:

//配置是否是自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//提交时间间隔,单位是ms
properties.put(ConsumerConfig.AUTO_COMNIT_INTERVAL_NS_CONFI6,1000);

3 手动提交offset

3.1 原理

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。
手动提交offset的方法有两种:分别是commitSync(同步提交)commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败)﹔而异步提交则没有失败重试机制,故有可能提交失败。

  • commitSync(同步提交)﹔必须等待offset提交完毕,再去消费下一批数据。
  • commitAsync(异步提交):发送完提交offset请求后,就开始消费下一批数据了

3.2 代码示例

3.2.1 同步提交

//手动提交属性配置
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,false);
//消费代码逻辑
XXX
XXX
XXX
//手动提交代码(处理完数据以后,这里为了方便,只展示关键代码)
//手动提交offset
kafkaConsumer.commitsync();

3.2.2 异步提交(生产常用)

//手动提交属性配置
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,false);
//消费代码逻辑
XXX
XXX
XXX
//手动提交代码(处理完数据以后,这里为了方便,只展示关键代码)
//手动提交offset
kafkaConsumer.commitAsync();

4 指定offset消费

auto.offset.reset = earliest | latest | none 默认是latest
当Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

  1. earliest:自动将偏移量重置为最早的偏移量,--from-beginning。
  2. latest(默认值):自动将偏移量重置为最新偏移量。
  3. none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
  4. 任意指定offset位移开始消费。
    //1创建消费者
    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
    // 2订阅主题
    ArrayList<String> topics = new ArrayList<>(;topics.add( "first");
    kafkaConsumer.subscribe(topics);
    
    //指定位置进行消费
    set<TopicPartition> assignment = kafkaConsumer.assignment();//获取所有分区信息
    //保证分区分配方案已经制定完毕,因为由于leader消费者制定分配方案会消耗一定时间,有可能此时获取不到分区信息,所以加一层分区空间判断
    while (assignment.size() == 0){
        //促使获取的分区数量不为0
        kafkaConsumer.poll(Duration.ofSeconds(1));
        assignment = kafkaConsumer.assignment();
    }
    
    //遍历所有分区,指定消费的offset
    for (TopicPartition topicPartition : assignment) {
        kafkaConsumer.seek(topicPartition, 100);
    }
    
    // 3消费数据
    while (true){
    

5 指定时间消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

//1创建消费者
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2订阅主题
ArrayList<String> topics = new ArrayList<>(;topics.add( "first");
kafkaConsumer.subscribe(topics);

//指定位置进行消费
set<TopicPartition> assignment = kafkaConsumer.assignment();//获取所有分区信息
//保证分区分配方案已经制定完毕,因为由于leader消费者制定分配方案会消耗一定时间,有可能此时获取不到分区信息,所以加一层分区空间判断
while (assignment.size() == 0){
    //促使获取的分区数量不为0
    kafkaConsumer.poll(Duration.ofSeconds(1));
    assignment = kafkaConsumer.assignment();
}
//希望把时间转换为对应的offset
HashMap<TopicPartition,Long> topicPartitionLongHashMap = new HashMap<>();
//封装对应集合
for (TopicPartition topicPartition : assignment) {
    //希望获取当前系统时间一天前的数据。
    topicPartitionLongHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
Nap<TopicPartition,OffsetAnd imestamp> topioPartitionffsetAndrtimestampMep = karfiaConsumer.offsetsForTines(topicPartitionL ongHashiap);


//遍历所有分区,指定消费的offset
//指定消费的offset
for (TopicPartition topicPartition : assignment) {
    OffsetAndTimestamp offsetAndTimestamp = topicPartition0ffsetAndTimestampHap.get(topicPartition);
    kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
}

// 3消费数据
while (true){

6 漏消费和重复消费分析

6.1 重复消费

场景1:重复消费。自动提交offset引起。

 6.2 漏消费

场景1:漏消费。设置offset为手动提交,当offset被提交时,数据还在内存中未落盘,此时刚好消费者线程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。

6.3 消费者事务

如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如MySQL)。这部分知识会在后续项目部分涉及。

 7 数据积压

方案1:如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)

方案2:如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/58812.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

高通开发系列 - ALSA声卡驱动中tinymix返回时间过慢

By: fulinux E-mail: fulinux@sina.com Blog: https://blog.csdn.net/fulinus 喜欢的盆友欢迎点赞和订阅! 你的喜欢就是我写作的动力! 目录 问题背景问题分析验证第一个猜测验证第二个猜测问题原因解决方案问题背景 我们一个高通平台上出现一个问题: tingmix命令需要几秒钟…

一文带你深入理解Linux端口重用这一特性

【好文推荐】 需要多久才能看完linux内核源码&#xff1f; 概述Linux内核驱动之GPIO子系统API接口 一篇长文叙述Linux内核虚拟地址空间的基本概括 轻松学会Linux下查看内存频率,内核函数,cpu频率 大家好&#xff0c;我是Linux吴彦祖&#xff01; 开篇我先考大家一个小问题&…

Golang Map 基本原理

Go 语言中的 map 即哈希表。哈希表把元素分到多个桶里&#xff0c;每个桶里最多放8个元素。在访问元素时&#xff0c;首先用哈希算法根据 key 和哈希表种子获得哈希值(暂将其命名为 h)&#xff0c;然后利用 h 的低 bbb 位得到桶的序号。其中桶的个数为 2b2^b2b 个&#xff0c;是…

乐趣国学—品读“富润屋,德润身。”中的智慧

✅作者简介&#xff1a;热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 ✨当前专栏&#xff1a;国学周更-心性养成之路 …

java 基于 SpringMVC+Mybaties+ easyUI 快递公司管理系统 的 设计与实现

一.项目介绍 本系统 角色 权限 动态配置 默认配置了三种 第一种&#xff1a; 超级管理员 第二种&#xff1a; 运输公司 第三种&#xff1a; 订单跟踪人员 超级管理员拥有所有权限&#xff0c;包括车子、路线、订单、是否送达以及交易的统计报表 运输公司&#xff1a;车辆管理权…

使用 Python 和 Streamlit 创建一个很棒的 Web 应用程序

“我们如何制作一个机器学习脚本并将其转换为一个尽可能简单的应用程序,让它基本上感觉像是一个脚本练习?” — Adrien Treuille(Streamlit 的发明者) Web 应用程序是显示数据科学或机器学习项目结果的好方法。从头开始开发 Web 应用程序需要大量时间、精力和技术技能。另一…

世界杯海信再出圈,三星:“谈不上愉悦”

作者 | 曾响铃 文 | 响铃说 本届世界杯作为第一次在北半球冬季举行的世界杯&#xff0c;给全世界球迷带去了一次全新体验。且随着赛程的推进&#xff0c;更多的“惊喜”也一一浮现。 其一便是超多的爆冷&#xff0c;虽然没有具体统计&#xff0c;但此次应该是近几届爆冷最多…

[激光原理与应用-32]:典型激光器 -4- 半导体泵浦固体激光器

目录 第1章 概述 1.1 什么是半导体泵浦固体激光器 1.2 优势 1.3 典型的波长 第2章 半导体泵浦固体激光器的种类 2.1 端面泵浦固体激光器 2.2 侧面泵浦固体激光器 第1章 概述 1.1 什么是半导体泵浦固体激光器 半导体泵浦固体激光器&#xff08;Diode Pump Solid State …

Python函数

一、函数介绍 函数&#xff1a;是组织好的&#xff0c;可重复使用的&#xff0c;用来实现特定功能的代码段。 使用函数的好处是&#xff1a; 将功能封装在函数内&#xff0c;可供随时随地重复利用提高代码的复用性&#xff0c;减少重复代码&#xff0c;提高开发效率二、函数…

学习python第一天

关于Python的数据类型 Python数据类型包括&#xff1a; 数字类型&#xff0c;字符类型&#xff0c;布尔类型&#xff0c;空类型&#xff0c;列表类型&#xff0c;元组类型&#xff0c;字典类型 1、数字类型 包括&#xff1a;整型int 浮点型float(有小数位的都是是浮点型) 注…

代码随想录刷题|LeetCode 1143.最长公共子序列 1035.不相交的线 53. 最大子序和 动态规划

目录 1143.最长公共子序列 思路 1、确定dp数组 2、确定递推公式 3、dp数组初始化 4、遍历顺序 5、推导dp数组 最长公共子序列 1035.不相交的线 思路 不相交的线 53. 最大子序和 思路 最大子序列 动态规划 贪心算法 1143.最长公共子序列 题目链接&#xff1a;力扣 思路 不知道…

你在终端启动的进程,最后都是什么下场?(下)

你在终端启动的进程&#xff0c;最后都是什么下场&#xff1f;&#xff08;下&#xff09; 在上期文章你在终端启动的进程&#xff0c;最后都是什么下场&#xff1f;&#xff08;上&#xff09;当中我们介绍了前台进程最终结束的几种情况&#xff0c;在本篇文章当中主要给大家…

好书分享丨区块链的骨骼——密码技术

开放隐私计算 开放隐私计算 开放隐私计算OpenMPC是国内第一个且影响力最大的隐私计算开放社区。社区秉承开放共享的精神&#xff0c;专注于隐私计算行业的研究与布道。社区致力于隐私计算技术的传播&#xff0c;愿成为中国 “隐私计算最后一公里的服务区”。 180篇原创内容 …

darknet框架GPU编译安装

Darknet: Open Source Neural Networks in C 1、darknet下载 git clone https://github.com/pjreddie/darknet.git cd darknet设置makefile gpu1 cudnn1 opencv1【1】GPU1;需要设置显卡驱动、cuda 使用nvidia-smi 查看显卡型号和支持的cuda版本号 nvidia官网下载cuda,以及…

计算机网络学习笔记(Ⅱ):物理层

目录 1 物理层概念 1.1 物理层基本概念 1.定义 2.主要任务 3.特性 1.2 数据通信基础 1.典型模型 2.相关术语 3.三种通信方式 4.数据传输方式 1.3 物理层内容 1.码元 2.速率 3.带宽 1.4 奈氏准则与香农定理 1.失真 2.码间串扰 3.奈氏准则 4.香农定理 1.5 …

蓝桥杯C/C++VIP试题每日一练之Huffman树

💛作者主页:静Yu 🧡简介:CSDN全栈优质创作者、华为云享专家、阿里云社区博客专家,前端知识交流社区创建者 💛社区地址:前端知识交流社区 🧡博主的个人博客:静Yu的个人博客 🧡博主的个人笔记本:前端面试题 个人笔记本只记录前端领域的面试题目,项目总结,面试技…

基于JSP的某餐厅点餐系统

目 录 第一章 绪论 1 1.1系统研究背景和意义 1 1.2研究现状 1 1.3研究主要内容 2 第二章 相关技术说明 3 2.1 JSP(Java Server Page)简介 3 2.2 Spring框架简介 4 2.3 Spring MVC框架简介 5 2.4 MyBatis 框架简介 5 2.4 MySql数据库简介 6 2.6 Tomcat简介 7 2.7 jQuery简介 8 …

Hadoop原理与技术——Hbase的基本操作

点击链接查看文档 一、实验目的 上机实操&#xff0c;熟悉指令操作Hbase和java代码操作Hbase 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 1&#xff1a;指令操作Hbase (1)&#xff1a;start-all.sh&#xff0c;启动所有进程 (2)…

Ansys(Maxwell、Simplorer)与Simulink联合仿真(二)直线电机

Ansys&#xff08;Maxwell、Simplorer&#xff09;与Simulink联合仿真&#xff08;二&#xff09;直线电机 在仿真过程中&#xff0c;遇到了一个问题&#xff0c;卡了好久得到了解决。 关于 motion setup 提示 moving 找不到面 cannot find the sarface 所有的动态部件要隔开…

【pen200-lab】10.11.1.21(实际获得22权限)

pen200-lab 学习笔记 【pen200-lab】10.11.1.21 &#x1f525;系列专栏&#xff1a;pen200-lab &#x1f389;欢迎关注&#x1f50e;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; &#x1f4c6;首发时间&#xff1a;&#x1f334;2022年11月27日&#x1f334; &#x1f36d;作…