【快速解决】kafka崩了,重启之后,想继续消费,怎么做?

news2025/1/23 22:36:59

目录

一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 

1、一个问题:

 2、查看消费者消费主题__consumer_offsets

3、一个重要前提:消费时要提交offset

二、指定 Offset 消费


假如遇到kafka崩了,你重启kafka之后,想要继续消费,应该怎么办?

  1. 首先确定要消费的主题是哪几个
  2. 其次使用命令或者其他的组件查看 __consumer_offset 主题下的偏移量信息,找到我们关心的主题在崩溃之前消费到了哪里。
  3. 最后使用 java 代码,里面有一个非常重要的方法 seek,指定需要消费的主题,分区以及偏移量,就可以继续消费了。

下面是解决这个问题的具体步骤!!! 

一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 

        因为__consumer_offset 主题下记录了主题的偏移量信息,所以提交offset之后,消费__consumer_offset 主题便可查看所有主题的偏移量信息

1、一个问题:

__consumer_offset 主题下的数据是不能查看的,怎么解决?

解决方案:

在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false。然后分发一下

分发:(这里我使用了脚本)

xsync.sh /opt/installs/kafka3/config/consumer.properties

注意:修改之前要先关闭kafka和zookeeper,修改完毕后再开启!

说明:默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。如果不修改是无法查看offset的值的,因为这些都是加密数据。

 2、查看消费者消费主题__consumer_offsets

kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server bigdata01:9092 --consumer.config /opt/installs/kafka3/config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" 

查询结果如下:

图中:1是消费者组名;2是Topic主题名;3是分区;4是偏移量,说明该主题崩溃前消费到了这里

此时便查询到了偏移量信息!!!

3、一个重要前提:消费时要提交offset

能查询到偏移量的前提是消费时要自动提交 offset(默认开启)或者手动提交 offset

一般不用管,因为自动提交会默认开启!!!

二、指定 Offset 消费

 kafka提供了seek方法,可以让我们从分区的固定位置开始消费。

seek (TopicPartition Partition,offset offset):指定分区和偏移量

package com.bigdata._03offsetTest;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;

public class _03CustomConsumerSeek {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //连接kafka setProperty和put都行
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
        // 字段反序列化   key 和  value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test5");

        // 是否自动提交 offset  通过这个字段设置
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?
        List<String> topics = new ArrayList<>();
        topics.add("bigdata");// list可以设置多个主题的名称
        kafkaConsumer.subscribe(topics);

        // 执行计划
        // 此时的消费计划是空的,因为没有时间生成
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        while(assignment.size() == 0){

            // 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环
            assignment = kafkaConsumer.assignment();
        }
        
        // 获取分区0的offset =100 以后的数据
       kafkaConsumer.seek(new TopicPartition("bigdata",0),100);
        // 因为消费者是不停的消费,所以是while true
        while(true){
            // 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 循环打印每一条数据
            for (ConsumerRecord record:records) {
                // 打印一条数据
                System.out.println(record);
                // 打印数据中的值
                System.out.println(record.value());
            }
        }
    }
}

执行这个java代码就可以从精确的指定位置继续消费了!!!

结果如下:

从上图可以看出,确实是从指定的主题、分区、偏移量开始消费的! 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2242329.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【设计模式】行为型模式(四):备忘录模式、中介者模式

《设计模式之行为型模式》系列&#xff0c;共包含以下文章&#xff1a; 行为型模式&#xff08;一&#xff09;&#xff1a;模板方法模式、观察者模式行为型模式&#xff08;二&#xff09;&#xff1a;策略模式、命令模式行为型模式&#xff08;三&#xff09;&#xff1a;责…

GRE做题笔记(零散的个人经验)

locomotive机车By 1813, the Luddite resistance had all but vanished. all but表示“几乎完全”的程度&#xff0c;或者表示排除piston活塞attributed to 归因于how a sportsperson accounted for their own experience of stress 运动员如何解释自己的压力经历 &#xff0c;…

【vmware+ubuntu16.04】vm虚拟机及镜像安装-tools安装包弹不出来问题

学习机器人这门课需要下载虚拟机&#xff0c;做一下记录 首先我下载的是vm虚拟机16&#xff0c; 下载版本可参考该文章课堂上我下载 的镜像是16.04&#xff0c;虚拟机安装教程和镜像添加可参考该博主 按照教程安装成功 安装tools&#xff0c;但是我的弹不出来那个压缩包&…

Redis设计与实现 学习笔记 第十七章 集群

Redis集群是Redis提供的分布式数据库方案&#xff0c;集群通过分片&#xff08;sharding&#xff0c;水平切分&#xff09;来进行数据共享&#xff0c;并提供复制和故障转移功能。 17.1 节点 一个Redis集群通常由多个节点&#xff08;node&#xff09;组成&#xff0c;在刚开…

第03章 文件编程

目标 了解Linux系统文件IO/标准IO基本概念掌握Linux系统文件IO/标准IO常用函数掌握Linux系统文件属性常用函数掌握Linux系统目录文件常用函数 3.1 Linux系统概述 3.1.1 预备知识&#xff08;相关概念&#xff09; &#xff08;1&#xff09;应用程序 和 内核程序 应用程序是…

51c大模型~合集42

我自己的原文哦~ https://blog.51cto.com/whaosoft/11859244 #猎户座 「草莓」即将上线&#xff0c;OpenAI新旗舰大模型曝光&#xff0c;代号「猎户座」 ChatGPT 要进化了&#xff1f; 本月初&#xff0c;OpenAI 创始人、CEO 山姆・奥特曼突然在 X 上发了一张照片&#xff0…

SpringBootTest常见错误解决

1.启动类所在包错误 问题 由于启动类所在包与需要自动注入的类的包不在一个包下&#xff1a; 启动类所在包&#xff1a; com.exmaple.test_02 但是对于需要注入的类却不在com.exmaple.test_02下或者其子包下&#xff0c;就会导致启动类无法扫描到该类&#xff0c;从而无法对…

初始JavaEE篇 —— 网络编程(2):了解套接字,从0到1实现回显服务器

找往期文章包括但不限于本期文章中不懂的知识点&#xff1a; 个人主页&#xff1a;我要学编程程(ಥ_ಥ)-CSDN博客 所属专栏&#xff1a;JavaEE 目录 TCP 与 UDP Socket套接字 UDP TCP 网络基础知识 在一篇文章中&#xff0c;我们了解了基础的网络知识&#xff0c;网络的出…

机器学习 AdaBoost 算法

AdaBoost 提升学习算法是通过训练多个弱分类算法实现一个强分类算法&#xff0c;做法非常朴素&#xff0c;在训练过程中&#xff0c;提供分类错误的数据权重&#xff0c;降低分类正确的权重&#xff0c;提高分类效果好的弱分类器权重&#xff0c;降低分类效果差的若分类器权重。…

C++编程技巧与规范-类和对象

类和对象 1. 静态对象的探讨与全局对象的构造顺序 静态对象的探讨 类中的静态成员变量(类类型静态成员) 类中静态变量的声明与定义&#xff08;类中声明类外定义&#xff09; #include<iostream> using namespace std;namespace _nmspl {class A{public:A():m_i(5){…

golang中的init函数

程序的初始化和执行都起始于 main 包。如果 main 包还导入了其它的包&#xff0c;那么就会在编译时将它们依次 导入。有时一个包会被多个包同时导入&#xff0c;那么它只会被导入一次&#xff08;例如很多包可能都会用到 fmt 包&#xff0c;但 它只会被导入一次&#x…

【大数据学习 | flume】flume之常见的sink组件

Flume Sink取出Channel中的数据&#xff0c;进行相应的存储文件系统&#xff0c;数据库&#xff0c;或者提交到远程服务器。Flume也提供了各种sink的实现&#xff0c;包括HDFS sink、Logger sink、Avro sink、File Roll sink、HBase sink&#xff0c;。 ​ Flume Sink在设置存…

数学分组求偶数和

问题描述 小M面对一组从 1 到 9 的数字&#xff0c;这些数字被分成多个小组&#xff0c;并从每个小组中选择一个数字组成一个新的数。目标是使得这个新数的各位数字之和为偶数。任务是计算出有多少种不同的分组和选择方法可以达到这一目标。 numbers: 一个由多个整数字符串组…

构建安全护盾:HarmonyOS 应用的数据安全审计与日志管理实战

文章目录 前言数据安全审计与日志管理的重要性什么是数据安全审计&#xff1f;为什么需要日志管理&#xff1f; 数据安全审计与日志管理的基本原则实现数据安全审计与日志管理的技术方案1. 数据安全审计的实现2. 日志管理的实现 ArkUI 与 ArkTS 的代码示例1. 审计日志记录2. 实…

ReactPress与WordPress:两大开源发布平台的对比与选择

ReactPress与WordPress&#xff1a;两大开源发布平台的对比与选择 在当今数字化时代&#xff0c;内容管理系统&#xff08;CMS&#xff09;已成为各类网站和应用的核心组成部分。两款备受欢迎的开源发布平台——ReactPress和WordPress&#xff0c;各自拥有独特的优势和特点&am…

HarmonyOS 开发环境搭建

HarmonyOS&#xff08;鸿蒙操作系统&#xff09;作为一种面向全场景多设备的智能操作系统&#xff0c;正逐渐在市场上崭露头角。为了进入HarmonyOS生态&#xff0c;开发者需要搭建一个高效的开发环境。本文将详细介绍如何搭建HarmonyOS开发环境&#xff0c;特别是如何安装和配置…

基于VUE实现语音通话:边录边转发送语言消息、 播放pcm 音频

文章目录 引言I 音频协议音频格式:音频协议:II 实现协议创建ws对象初始化边录边转发送语言消息 setupPCM按下通话按钮时开始讲话,松开后停止讲话播放pcm 音频III 第三库recorderplayer调试引言 需求:电台通讯网(电台远程遥控软件-超短波)该系统通过网络、超短波终端等无线…

【提高篇】3.3 GPIO(三,工作模式详解 上)

目录 一,工作模式介绍 二,输入浮空 2.1 输入浮空简介 2.2 输入浮空特点 2.3 按键检测示例 2.4 高阻态 三,输入上拉 3.1 输入上拉简介 3.2 输入上拉的特点 3.3 按键检测示例 四,输入下拉 4.1 输入下拉简介 4.2 输入下拉特点 4.3 按键检测示例 一,工作模式介绍…

微服务瞎写

1.微服务解决的问题 1、如何发现新节点以及检查各节点的运行状态&#xff1f; 2、如何发现服务及负载均衡如何实现&#xff1f; 3、服务间如何进行消息通信&#xff1f; 4、如何对使用者暴露服务API&#xff1f; 5、如何集中管理各节点配置文件&#xff1f; 6、如何收集各…

Python Tornado框架教程:高性能Web框架的全面解析

Python Tornado框架教程&#xff1a;高性能Web框架的全面解析 引言 在现代Web开发中&#xff0c;选择合适的框架至关重要。Python的Tornado框架因其高性能和非阻塞I/O特性而备受青睐。它特别适合处理大量并发连接的应用&#xff0c;比如聊天应用、实时数据处理和WebSocket服务…