Kafka极客 - 15 重设消费者位移 Offset

news2025/1/8 0:49:02

文章目录

      • 1. 为什么要重设消费者组位移?
      • 2. 重设位移策略
      • 3. 消费者 API 方式设置
      • 4. 命令行方式设置

1. 为什么要重设消费者组位移?

我们知道,Kafka 和传统的消息引擎在设计上是有很大区别的,其中一个比较显著的区别就是,Kafka 的消费者读取消息是可以重演的(replayable)。

像 RabbitMQ 或 ActiveMQ 这样的传统消息中间件,它们处理和响应消息的方式是破坏性的(destructive),即一旦消息被成功处理,就会被从 Broker 上删除。

反观 Kafka,由于它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,是只读的操作,因此消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此它能够很容易地修改位移的值,实现重复消费历史数据的功能。

在实际使用场景中,我该如何确定是使用传统的消息中间件,还是使用 Kafka 呢?我在这里统一回答一下。如果在你的场景中,消息处理逻辑非常复杂,处理代价很高,同时你又不关心消息之间的顺序,那么传统的消息中间件是比较合适的;反之,如果你的场景需要较高的吞吐量,但每条消息的处理时间很短,同时你又很在意消息的顺序,此时,Kafka 就是你的首选。

2. 重设位移策略

不论是哪种设置方式,重设位移大致可以从两个维度来进行:

① 位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给定的位移值。

② 时间维度。我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前的位移值。

下面的这张表格罗列了 7 种重设策略。接下来,我来详细解释下这些策略。

在这里插入图片描述

Earliest 策略表示将位移调整到主题当前最早位移处。这个最早位移不一定就是 0,因为在生产环境中,很久远的消息会被 Kafka 自动删除,所以当前最早位移很可能是一个大于 0 的值。如果你想要重新消费主题的所有消息,那么可以使用 Earliest 策略

Latest 策略表示把位移重设成最新末端位移。如果你总共向某个主题发送了 15 条消息,那么最新末端位移就是 15。如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使用 Latest 策略。

Current 策略表示将位移调整成消费者当前提交的最新位移。有时候你可能会碰到这样的场景:你修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置,那么,Current 策略就可以帮你实现这个功能。

表中第 4 行的 Specified-Offset 策略则是比较通用的策略,表示消费者把位移值调整到你指定的位移处。这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理。在实际使用过程中,可能会出现 corrupted 消息无法被消费的情形,此时消费者程序会抛出异常,无法继续工作。一旦碰到这个问题,你就可以尝试使用 Specified-Offset 策略来规避。

如果说 Specified-Offset 策略要求你指定位移的绝对数值的话,那么 Shift-By-N 策略指定的就是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的“跳”是双向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重设成当前位移的前 100 条位移处,此时你需要指定 N 为 -100。

刚刚讲到的这几种策略都是位移维度的,下面我们来聊聊从时间维度重设位移的 DateTime 和 Duration 策略。

DateTime 允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天 0 点。

Duration 策略则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。如果你熟悉 Java 8 引入的 Duration 类的话,你应该不会对这个格式感到陌生。它就是一个符合 ISO-8601 规范的 Duration 格式,以字母 P 开头,后面由 4 部分组成,即 D、H、M 和 S,分别表示天、小时、分钟和秒。举个例子,如果你想将位移调回到 15 分钟前,那么你就可以指定 PT0H15M0S。

我会在后面分别给出这 7 种重设策略的实现方式。不过在此之前,我先来说一下重设位移的方法。目前,重设消费者组位移的方式有两种。

① 通过消费者 API 来实现。

② 通过 kafka-consumer-groups 命令行脚本来实现。

3. 消费者 API 方式设置

首先,我们来看看如何通过 API 的方式来重设位移。我主要以 Java API 为例进行演示。如果你使用的是其他语言,方法应该是类似的,不过你要参考具体的 API 文档。

通过 Java API 的方式来重设位移,你需要调用 KafkaConsumer 的 seek 方法,或者是它的变种方法 seekToBeginning 和 seekToEnd。我们来看下它们的方法签名。

void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);

根据方法的定义,我们可以知道,每次调用 seek 方法只能重设一个分区的位移。OffsetAndMetadata 类是一个封装了 Long 型的位移和自定义元数据的复合类,只是一般情况下,自定义元数据为空,因此你基本上可以认为这个类表征的主要是消息的位移值。seek 的变种方法 seekToBeginning 和 seekToEnd 则拥有一次重设多个分区的能力。我们在调用它们时,可以一次性传入多个主题分区。

好了,有了这些方法,我们就可以逐一地实现上面提到的 7 种策略了。我们先来看 Earliest 策略的实现方式,代码如下:

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

// 要重设位移的 Kafka 主题 
String topic = "test";  
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
    consumer.subscribe(Collections.singleton(topic));
    consumer.poll(0);
    consumer.seekToBeginning(
        consumer.partitionsFor(topic).stream().map(partitionInfo ->          
        new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList())
    );
} 

这段代码中有几个比较关键的部分,你需要注意一下。

① 你要创建的消费者程序,要禁止自动提交位移。

② 组 ID 要设置成你要重设的消费者组的组 ID。

③ 调用 seekToBeginning 方法时,需要一次性构造主题的所有分区对象。

④ 最重要的是,一定要调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0))。

虽然社区已经不推荐使用 poll(long) 了,但短期内应该不会移除它,所以你可以放心使用。另外,为了避免重复,在后面的实例中,我只给出最关键的代码。

Latest 策略和 Earliest 是类似的,我们只需要使用 seekToEnd 方法即可,如下面的代码所示:

consumer.seekToEnd(
	consumer.partitionsFor(topic).stream().map(partitionInfo ->          
	new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList())
);

实现 Current 策略的方法很简单,我们需要借助 KafkaConsumer 的 committed 方法来获取当前提交的最新位移,代码如下:

consumer.partitionsFor(topic).stream()
    .map(info -> new TopicPartition(topic, info.partition()))
	.forEach(tp -> {
        long committedOffset = consumer.committed(tp).offset();
        consumer.seek(tp, committedOffset);
    }
);

这段代码首先调用 partitionsFor 方法获取给定主题的所有分区,然后依次获取对应分区上的已提交位移,最后通过 seek 方法重设位移到已提交位移处。

如果要实现 Specified-Offset 策略,直接调用 seek 方法即可,如下所示:

long targetOffset = 1234L;
for (PartitionInfo info : consumer.partitionsFor(topic)) {
	TopicPartition tp = new TopicPartition(topic, info.partition());
	consumer.seek(tp, targetOffset);
}

接下来我们来实现 Shift-By-N 策略,主体代码逻辑如下:

for (PartitionInfo info : consumer.partitionsFor(topic)) {
    TopicPartition tp = new TopicPartition(topic, info.partition());
    // 假设向前跳 123 条消息
    long targetOffset = consumer.committed(tp).offset() + 123L; 
    consumer.seek(tp, targetOffset);
}

如果要实现 DateTime 策略,我们需要借助另一个方法:KafkaConsumer. offsetsForTimes 方法。假设我们要重设位移到 2019 年 6 月 20 日晚上 8 点,那么具体代码如下:

long ts = LocalDateTime.of(2019, 6, 20, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream()
    							.map(info -> new TopicPartition(topic, info.partition()))
    							.collect(Collectors.toMap(Function.identity(), tp -> ts));
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : 
    consumer.offsetsForTimes(timeToSearch).entrySet()) {
    consumer.seek(entry.getKey(), entry.getValue().offset());
}

这段代码构造了 LocalDateTime 实例,然后利用它去查找对应的位移值,最后调用 seek,实现了重设位移。

最后,我来给出实现 Duration 策略的代码。假设我们要将位移调回 30 分钟前,那么代码如下:

Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream()
    .map(info -> new TopicPartition(topic, info.partition()))
    .collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30*1000*60));
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : 
     consumer.offsetsForTimes(timeToSearch).entrySet()) {
    consumer.seek(entry.getKey(), entry.getValue().offset());
}

总之,使用 Java API 的方式来实现重设策略的主要入口方法,就是 seek 方法

4. 命令行方式设置

位移重设还有另一个重要的途径:通过 kafka-consumer-groups 脚本。需要注意的是,这个功能是在 Kafka 0.11 版本中新引入的。这就是说,如果你使用的 Kafka 是 0.11 版本之前的,那么你只能使用 API 的方式来重设位移。

比起 API 的方式,用命令行重设位移要简单得多。针对我们刚刚讲过的 7 种策略,有 7 个对应的参数。下面我来一一给出实例。

Earliest 策略直接指定**–to-earliest**。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute

Latest 策略直接指定**–to-latest**。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute

Current 策略直接指定**–to-current**。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute

Specified-Offset 策略直接指定**–to-offset**。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute

Shift-By-N 策略直接指定**–shift-by N**。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute

DateTime 策略直接指定**–to-datetime**。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute

最后是实现 Duration 策略,我们直接指定**–by-duration**。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/87318.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

怎么看xray发了那些数据包

怎么看xray发了那些数据包。版本说明&#xff1a;Xray 下载地址&#xff1a;https://github.com/chaitin/xray/releases 使用环境&#xff1a;windows、linux、macos皆可 工具说明&#xff1a;Xray扫描器是一款功能强大的安全评估工具。支持主动、被动多种扫描方式&#xff…

UNIAPP实战项目笔记51 登录用户名和密码输入框的数据验证功能

UNIAPP实战项目笔记51 登录账号用户名和密码输入框的数据验证功能 实际案例图片 账号验证 密码验证 登录成功跳转 显示登录和注册页面布局 账号密码的验证功能和登录验证提交 具体内容图片自己替换哈&#xff0c;随便找了个图片的做示例 具体位置见目录结构 完善布局页面和样式…

基于PHP和MySQL的新闻发布系统

关于世界杯⚽️ 国际足联世界杯&#xff08;FIFA World Cup&#xff09;&#xff0c;简称“世界杯”&#xff0c;是由全世界国家级别球队参与&#xff0c;象征足球界最高荣誉&#xff0c;并具有最大知名度和影响力的足球赛事。世界杯全球电视转播观众超过35亿 。世界杯每四年举…

【设计模式】简单工厂模式描述总结

简单工厂模式 定义&#xff1a;定义一个创建对象的接口&#xff0c;让子类决定实例化哪一个类。 类型&#xff1a;创建型模式 介绍&#xff1a; 在简单工厂模式中定义一个抽象产品类&#xff0c;抽象产品类声明公共的特性及属性&#xff0c;具体产品类继承抽象产品类后去实…

Educational Codeforces Round 121 (Rated for Div. 2) C. Monsters And Spells

翻译&#xff1a; Monocarp又在玩电脑游戏了。他是个巫师学徒&#xff0c;只会一个咒语。幸运的是&#xff0c;这个法术可以伤害怪物。 他目前所在的关卡包含&#x1d45b;个怪物。他们中的&#x1d456;-th在关卡开始后&#x1d458;&#x1d456;秒出现&#xff0c;并拥有ℎ…

Java石头剪刀布

✅作者简介&#xff1a;热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏&#xff1a;JAVA开发者…

【iMessage苹果源码家庭推】tils扩大软件安装大概释放事变是由程序员筑造的,很轻易发生MemoryLeak控制

推荐内容IMESSGAE相关 作者推荐内容iMessage苹果推软件 *** 点击即可查看作者要求内容信息作者推荐内容1.家庭推内容 *** 点击即可查看作者要求内容信息作者推荐内容2.相册推 *** 点击即可查看作者要求内容信息作者推荐内容3.日历推 *** 点击即可查看作者要求内容信息作者推荐…

[附源码]计算机毕业设计的小区宠物管理系统Springboot程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot mybatis MavenVue等等组成&#xff0c;B/S模式…

小侃设计模式(十七)-中介者模式

1.概述 中介者模式&#xff08;Mediator Pattern&#xff09;是用来降低多个对象和类之间的通信复杂性&#xff0c;这种模式提供了一个中介类&#xff0c;来封装一组对象之间的交互&#xff0c;它将对象之间的交互委派给中介对象交互&#xff0c;避免了对象之间的直接交互。中…

Vue2基础总结

知识点学了太多还是需要总结复习&#xff0c;否则后面会因为零碎的知识点而感到繁杂&#xff0c;那么今天我来总结一下vue相关的知识点&#xff0c;新学习vue的朋友也可以把这当做一个细致总结&#xff1a; 1.Vue是什么&#xff08;重点&#xff09;&#xff1a; 对于Vue的总…

创建 Vue3.0 工程

1.使用 vue-cli 创建 官方文档 : https://cli.vuejs.org/zh/guide/creating-a-project.html#vue-create // 查看vue/cli版本&#xff0c;确保vue/cli版本在4.5.以上 vue --version vue -V// 安装或者升级你的vue/cli、 覆盖安装最新版本; npm install -g vue/cli//1.创建…

C++初阶 stack和queue的模拟实现

作者&#xff1a;小萌新 专栏&#xff1a;C初阶 作者简介&#xff1a;大二学生 希望能和大家一起进步&#xff01; 本篇博客简介&#xff1a;模拟实现STL库中的stack和queue 考试周结束咯 狠狠的学&#xff01; stack和queue的模拟实现容器适配器Stack模拟实现接口函数一览代码…

艾美捷西妥昔单抗Cetuximab化学性质和文献参考

西妥昔单抗&#xff08;抗EGFR&#xff09;是表皮生长因子受体&#xff08;EGFR&#xff09;的抑制剂。 艾美捷西妥昔单抗Cetuximab 品名&#xff1a;西妥昔单抗&#xff0c;抑制剂 完整名称&#xff1a;西妥昔单抗&#xff08;抗EGFR&#xff09; 同义词名称&#xff1a;C2…

2022年电动车与车辆工程国际会议(CEVVE 2022)

2022年电动车与车辆工程国际会议&#xff08;CEVVE 2022&#xff09; 重要信息 会议网址&#xff1a;www.cevve.org 会议时间&#xff1a;2022年12月19-21日 召开地点&#xff1a;中国北海 截稿时间&#xff1a;2022年12月15日 录用通知&#xff1a;投稿后2周内 收录检索…

CPU、内存占用率高排查

CPU高占用 排查思路 top 命令查看CPU占用率高的进程top -H -p ${pid} 命令查看具体是进程的哪个线程占用CPUprintf ‘%x\n’ ${pid} 将线程的pid转为16进制jstack ${十六进制pid} | grep -A 20 查看线程的基本信息与方法调用栈 模拟排查 [rootVM-24-5-centos www]# top top…

vue可视化管理工具创建项目报错解决errno: -4058;连接超时

vue可视化管理工具创建项目报错解决errno: -4058 简介&#xff1a;vue创建项目时&#xff0c;errno&#xff1a;-4058问题解决&#xff0c;使用vue ui指令时会报连接超时问题解决。 基础材料&#xff1a; 使用的node.js版本&#xff1a;18.12.1 vue版本&#xff1a;4.5.15…

【shell脚本】监控磁盘/内存使用率·检测域名是否正常·一键部署LMNP·拉黑攻击服务器的异常ip

文章目录1、监控2台服务器硬盘利用率脚本实战2、批量检查 5个网站域名是否正常3、统计磁盘使用率&#xff0c;磁盘大于%5 就打印mail 小于 硬盘正常 内存也是一样4、有人攻击我服务器 就拉黑异常ip5、使用for循环安装 批量安装3台服务器 php环境 使用&#xff08;LAMP&#xff…

Web前端开发技术课程大作业:简单的网页制作期末作业——狐妖小红娘(6页)

HTML实例网页代码, 本实例适合于初学HTML的同学。该实例里面有设置了css的样式设置&#xff0c;有div的样式格局&#xff0c;这个实例比较全面&#xff0c;有助于同学的学习,本文将介绍如何通过从头开始设计个人网站并将其转换为代码的过程来实践设计。 ⚽精彩专栏推荐&#x1…

PM说 | 如何精准的获取用户需求?需求分析到底分析什么?

如何精准获取用户需求&#xff1f;怎么做好需求分析? 文章目录如何精准获取用户需求&#xff1f;怎么做好需求分析?前言一、用户的正在需求是什么二、如何精准的获取用户需求三、实操项目分析四、需求分析的方法总结前言 不知你是否曾遇到这样的处境&#xff0c;听到需求&am…

多线程~实现多线程

实现多线程 进程&#xff1a;是正在运行的程序 是系统进行资源分配和调用的独立单位每一个进程都有它自己的内存空间和系统资源 线程&#xff1a;是进程中的单个顺序控制流&#xff0c;是一条执行路径 单线程&#xff1a;一个进程如果只有一条执行路径&#xff0c;则称为单…