解决Flink读取kafka主题数据无报错无数据打印的重大发现(问题已解决)

news2024/11/27 13:50:51



        亦菲、彦祖们,今天使用idea开发的时候,运行flink程序(读取kafka主题数据)的时候,发现操作台什么数据都没有只有满屏红色日志输出,关键干嘛?一点报错都没有,一开始我觉得应该执行程序的姿势有问题,然后我重新执行了一次还是不行,我就一直等待,发现等了好久都没有数据来到,我就开始察觉不对了。

        下面是我排查的思路:

        1.kafka broker有没有数据:因为我是读取kafka主题数据,所以我屁颠屁颠的去kakfa查看我的消费主题是否有数据,查看没有问题!

        2.读取的主题是否出现问题:经过切换其他主题读取数据,发现也是没有数据出现在操作台,所以不是主题的问题

        3.查看flink与kafka 连接器的配置是否有问题:我就回去查看构建kafka连接器的builder是否问题,我尝试把偏移量改为从最早的偏移量开始读取,也是无动于衷呀!

        通过以上思路之后,我就彻底无语了,那到底是什么问题?

因为我是从flink连接kafka读取数据的,所以我觉得直接连接kafka读取主题数据试一试,这样就可以排除是不是flink有问题了,所以我就写了以下代码进行测试:

 // 配置 Kafka 消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  "hadoop101:10092,hadoop102:10092,hadoop103:10092"); // Kafka 集群地址
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组 ID
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Key 反序列化器
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Value 反序列化器
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的偏移量开始读取

        // 创建消费者、
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("AllData_topic_ods"));
        //拉取超时时间
        ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(6000));
        for (ConsumerRecord<String, String> record : poll) {
            System.out.printf("offset = %d,key= %s.value=%s%n",record.offset(),record.key(),record.value());
        }
        consumer.close();

         一开始当拉取超时时间为100ms的时候,我也是消费不到数据的,但是我就想是不是我拉取超时时间太短了,因为我网络io和电脑性能匹配不上的话,它拉取时间是需要进行网络io的。所以我尝试修改拉取时间为6000ms,就是6s啦!
        然后突然就消费到数据了,我了个豆,搞定了我感觉我已经!

        100ms

        6000ms 

        于是我就回去把我那个builder的参数也修改了,一执行flink程序,这次不负众望,成功消费到数据了!!!

return KafkaSource.<String>builder()
                .setProperty("max.poll.interval.ms","10000") // 设置拉取超时时间为10s
                .setProperty("partition.discovery.interval.ms", "10000")
                .setProperty("commit.offsets.on.checkpoint", "true")
                .setProperty("isolation.level", "read_committed")//read_committed 只会读取事务型成功提交事务写入的消息;  read_uncommitted 默认值,能够读取到 Kafka 写入的任何消息
                .setBootstrapServers(bootstrapServers)
                .setTopics(topicName)
                .setGroupId(groupId)
                .setClientIdPrefix(clientIdPrefix)
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST));

        亦菲、彦祖们,搞定了!如果不是这个问题的话,也参照我上面的排查思路看看是哪里出现了问题!我能解决也是一个一个排查到,给点耐心。 

        如果帮到你,恭喜呀!如果解决不了,那当我没说,你去看别人的文章吧! 



 感谢各位的观看,创作不易,能不能给哥们来一个点赞呢!!!

好了,今天的分享就这么多了,有什么不清楚或者我写错的地方,请多多指教!

私信,评论我呗!!!!!! 

关注我下一篇不迷路哦!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2248447.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

零基础3分钟快速掌握 ——Linux【终端操作】及【常用指令】Ubuntu

1.为啥使用Linux做嵌入式开发 能广泛支持硬件 内核比较高效稳定 原码开放、软件丰富 能够完善网络通信与文件管理机制 优秀的开发工具 2.什么是Ubuntu 是一个以桌面应用为主的Linux的操作系统&#xff0c; 内核是Linux操作系统&#xff0c; 具有Ubuntu特色的可视…

xiaolin coding 图解网络笔记——TCP篇

1. TCP 头格式有哪些&#xff1f; 序列号&#xff1a;在建立连接时由计算机生成的随机数作为其初始值&#xff0c;通过 SYN 包传给接收端主机&#xff0c;每发送一次数据&#xff0c;就【累加】一次该【数据字节数】的大小。用来解决网络包乱序问题。 确认应答号&#xff1a;指…

使用Go 语言连接并操作 MySQL 数据库

新建项目&#xff0c;我这里使用的vscode&#xff1a; 1.新建项目初始化&#xff1a; 手动创建工程文件夹go安装目录->src->projectName 在项目下创建 main.go文件&#xff1a; 在vscode中点击文件->打开文件夹&#xff0c;选择刚刚新建的文件夹。打开后&#xff0…

Jmeter中的断言

7&#xff09;断言 1--响应断言 功能特点 数据验证&#xff1a;验证响应数据是否包含或不包含特定的字符串、模式或值。多种匹配类型&#xff1a;支持多种匹配类型&#xff0c;如文本、正则表达式、文档等。灵活配置&#xff1a;可以设置多个断言条件&#xff0c;满足复杂的测…

springboot实战(19)(条件分页查询、PageHelper、MYBATIS动态SQL、mapper映射配置文件、自定义类封装分页查询数据集)

引言&#xff1a; 该类博客的学习是基于b站黑马视频springbootvue视频学习&#xff01;具体围绕项目——"大事件"进行实战学习。 目录 一、功能介绍&#xff08;需求&#xff09;。 1、文章列表功能基本介绍。 2、条件分页查询功能与注意。 3、前端页面效果。&#x…

goframe框架bug-记录

implement not found for interface ICompany, forgot register? 错误解决检查&#xff1a; 1.有没有init 2. 注入问题 3. 注入问题

零基础学安全--云技术基础

目录 学习连接 前言 云技术历史 云服务 公有云服务商 云分类 基础设施即服务&#xff08;IaaS&#xff09; 平台即服务&#xff08;PaaS&#xff09; 软件即服务&#xff08;SaaS&#xff09; 云架构 虚拟化 容器 云架构设计 组件选择 基础设施即代码 集成部署…

数据结构 (11)串的基本概念

一、串的定义 1.串是由一个或者多个字符组成的有限序列&#xff0c;一般记为&#xff1a;sa1a2…an&#xff08;n≥0&#xff09;。其中&#xff0c;s是串的名称&#xff0c;用单括号括起来的字符序列是串的值&#xff1b;ai&#xff08;1≤i≤n&#xff09;可以是字母、数字或…

【Java】二叉树:数据海洋中灯塔式结构探秘(上)

个人主页 &#x1f339;&#xff1a;喜欢做梦 二叉树中有一个树&#xff0c;我们可以猜到他和树有关&#xff0c;那我们先了解一下什么是树&#xff0c;在来了解一下二叉树 一&#x1f35d;、树型结构 1&#x1f368;.什么是树型结构&#xff1f; 树是一种非线性的数据结构&…

在 macOS 上安装 MongoDB Community Edition

https://www.mongodb.com/zh-cn/docs/manual/tutorial/install-mongodb-on-os-x/

docker部署单机版doris

文章目录 前言一、系统环境简介二、部署要求三、部署安装1、基础设置2、下载镜像3、下载安装包4、启动镜像环境5、配置fe6、配置be 总结 前言 应项目测试需求&#xff0c;需使用docker部署单机版doris。 一、系统环境简介 #1 系统信息 [roottest][~] $cat /etc/redhat-relea…

c++编程玩转物联网:使用芯片控制8个LED实现流水灯技术分享

在嵌入式系统中&#xff0c;有限的GPIO引脚往往限制了硬件扩展能力。74HC595N芯片是一种常用的移位寄存器&#xff0c;通过串行输入和并行输出扩展GPIO数量。本项目利用树莓派Pico开发板与74HC595N芯片&#xff0c;驱动8个LED实现流水灯效果。本文详细解析项目硬件连接、代码实…

uni-app运行 安卓模拟器 MuMu模拟器

最近公司开发移动端系统&#xff0c;使用真机时每次调试的时候换来换去的麻烦&#xff0c;所以使用模拟器来调试方便。记录一下安装和连接的过程 一、安装MuMu模拟器 百度搜索MuMu模拟器并打开官网或者点这里MuMu模拟器官网 点击下载模拟器 安装模拟器&#xff0c;如果系统…

C语言解析命令行参数

原文地址&#xff1a;C语言解析命令行参数 – 无敌牛 欢迎参观我的个人博客&#xff1a;无敌牛 – 技术/著作/典籍/分享等 C语言有一个 getopt 函数&#xff0c;可以对命令行进行解析&#xff0c;下面给出一个示例&#xff0c;用的时候可以直接copy过去修改&#xff0c;很方便…

传奇996_36——背包图标,物品位置问题

绑定位置不对位 CTRLF9背包物品文件&#xff0c;也就是bag_item文件夹的bag_item.lua文件&#xff0c;这个小框和大框的相对位置会影响那个绑定图标,就是背包物品组合的标签和下面子标签的相对位置 背包物品偏移到看不见 原因&#xff1a;CTRLF9背包物品文件&#xff0c;也就…

使用Python和Pybind11调用C++程序(CMake编译)

目录 一、前言二、安装 pybind11三、编写C示例代码四、结合Pybind11和CMake编译C工程五、Python调用动态库六、参考 一、前言 跨语言调用能对不同计算机语言进行互补&#xff0c;本博客主要介绍如何实现Python调用C语言编写的函数。 实验环境&#xff1a; Linux gnuPython3.10…

如何选择黑白相机和彩色相机

我们在选择成像解决方案时黑白相机很容易被忽略&#xff0c;因为许多新相机提供鲜艳的颜色&#xff0c;鲜明的对比度和改进的弱光性能。然而&#xff0c;有许多应用&#xff0c;选择黑白相机将是更好的选择&#xff0c;因为他们产生更清晰的图像&#xff0c;更好的分辨率&#…

代码美学:MATLAB制作渐变色

输入颜色个数n&#xff0c;颜色类型&#xff1a; n 2; % 输入颜色个数 colors {[1, 0, 0], [0, 0, 1]}; createGradientHeatmap(n, colors); 调用函数&#xff1a; function createGradientHeatmap(n, colors)% 输入检查if length(colors) ~ nerror(输入的颜色数量与n不一…

SAP 零售方案 CAR 系统的介绍与研究

前言 当今时代&#xff0c;零售业务是充满活力和活力的业务领域之一。每天&#xff0c;由于销售运营和客户行为&#xff0c;它都会生成大量数据。因此&#xff0c;公司迫切需要管理数据并从中检索见解。它将帮助公司朝着正确的方向发展他们的业务。 这就是为什么公司用来处理…

【leetcode】动态规划

31. 873. 最长的斐波那契子序列的长度 题目&#xff1a; 如果序列 X_1, X_2, ..., X_n 满足下列条件&#xff0c;就说它是 斐波那契式 的&#xff1a; n > 3对于所有 i 2 < n&#xff0c;都有 X_i X_{i1} X_{i2} 给定一个严格递增的正整数数组形成序列 arr &#xff0…