Flink消费kafka出现空指针异常

news2025/1/11 20:42:07

文章目录

      • 出现场景:
      • 表现:
      • 问题:
      • 解决:

tombstone : Kafka中提供了一个墓碑消息(tombstone)的概念,如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息.

出现场景:

双流join时,采用的是left join的方式,众所周知该方式会产生回撤流,下游kafka连接器使用的是upsert-kafka,在产生回撤流时,kafka会删除未join上的消息,填充join后的消息进去。

表现:

在这里插入图片描述

问题:

此时消费该topic的flink程序会出现,空指针异常

DataStream Api会出现,Table Api 未发现

解决:

自定义kafka反序列化器过滤Null值,flink1.14.4
代码:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("")
                .setTopics("test")
                .setGroupId("gid")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new MySimpleStringSchema())
                .setProperty("auto.offset.commit", "false")
                .build();
        DataStreamSource<String> kfkDs = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk");
        kfkDs.print();

        env.execute();
    }

    // 自定义反序列化器
    static class MySimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String>{

        @Override
        public String deserialize(byte[] message) {
            if (message != null) return new String(message, StandardCharsets.UTF_8);
            else{
                return deserialize(new byte[1]); // 返回空 不是Null
            }
        }

        @Override
        public boolean isEndOfStream(String nextElement) {
            return false;
        }

        @Override
        public byte[] serialize(String element) {
            return element.getBytes(StandardCharsets.UTF_8);
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/705511.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

echarts tooltip自定义线条样式及数据提示框内容

option {......tooltip: {trigger: axis,borderWidth: 0, // 去除数据提示框默认的边框axisPointer: {lineStyle: { // 设置hover时竖线样式color: {type: linear,x: 0,y: 0,x2: 0,y2: 1,colorStops: [{offset: 0,color: rgba(128,200,244,0) // 0% 处的颜色},{offset: 1,colo…

【40000字】!最适合新手的Springboot+Vue项目

更多文章&#xff1a;https://mp.weixin.qq.com/mp/appmsgalbum?__bizMzg2NDY3NjY5NA&actiongetalbum&album_id2053253027934863360#wechat_redirect hello我是索奇&#xff0c;本套项目对应bilibili视频&#xff0c;大家可以结合视频看哈&#xff0c;有些基础的只看…

2022年系统架构师论文(回忆版)

2022年11月6日&#xff0c;全国计算机等级下半年考试&#xff0c;在疫情压力下如期举行。 北京市软件架构师考试地点在北京市工贸技师学院&#xff08;机电分院&#xff09;&#xff0c;地址&#xff1a;海淀区北四环北路132号&#xff08;金泰海博大酒店北侧&#xff09; 查看…

网络安全(黑客)自学笔记

建议一&#xff1a;黑客七个等级 黑客&#xff0c;对很多人来说充满诱惑力。很多人可以发现这门领域如同任何一门领域&#xff0c;越深入越敬畏&#xff0c;知识如海洋&#xff0c;黑客也存在一些等级&#xff0c;参考知道创宇 CEO ic&#xff08;世界顶级黑客团队 0x557 成员…

chatgpt赋能python:吐血推荐的Python编程好玩的代码

吐血推荐的Python编程好玩的代码 近年来&#xff0c;Python 成为了全球最受欢迎的编程语言之一。Python 的简洁明了&#xff0c;易学易用&#xff0c;使得越来越多的开发者选择了 Python。Python 的生态系统非常丰富&#xff0c;有很多丰富有趣的库和代码可以供我们玩耍。在本…

使用MySQL根据原型字段创建表结构

⭐️ 不爱生姜不吃醋&#xff0c;原创不易&#xff0c;转载请注明原链接 ❗️ 注&#xff1a;本文写的是基于MySQL对数据库表结构进行的操作(DDL) 文章目录 一、数据库1.基本概念2.关系型数据库&#xff08;RDBMS&#xff09;3.数据模型4.SQL通用语法5.SQL分类 二.创建表结构1.…

关于nlohmann::json的简单使用

nlohmann::json的使用非常简单&#xff0c;只需要包含.hpp文件即可&#xff0c;这是它的官网https://github.com/nlohmann/json 简单使用&#xff1a; #include "json.hpp" #include <iostream>using Info nlohmann::json;int main() {Info info;std::cout &…

Java面试Day11

1. MySQL 事务有哪些隔离级别、分别有什么特点&#xff0c;以及 MySQL 的默认隔离级别是什么&#xff1f; 在MySQL中事务的隔离级别是为了解决常见的并发问题&#xff0c;在保证数据库性能的同时保持事务的隔离性&#xff0c;常见的并发问题有&#xff1a; 脏读&#xff1a;如果…

利用nginx/apache代理wss 实现 小程序 端口 反向代理

除了用Workerman自身的SSL&#xff0c;也可以利用nginx/apache作为wss代理转发给workerman 我就是栽在这大坑里&#xff08;nginx/apache代理wss&#xff0c;workerman部分就不要设置ssl&#xff0c;否则将无法连接&#xff0c;两个方法2选1&#xff09;官方推荐用nginx/apach…

基于matlab基于预训练的膨胀双流卷积神经网络的视频分类器执行活动识别(附源码)

一、前言 此示例首先展示了如何使用基于预训练的膨胀 3-D &#xff08;I3D&#xff09; 双流卷积神经网络的视频分类器执行活动识别&#xff0c;然后展示了如何使用迁移学习来训练此类视频分类器使用 RGB 和来自视频的光流数据 [1]。 基于视觉的活动识别涉及使用一组视频帧预…

数据结构07:查找[C++][红黑二叉排序树RBT]

图源&#xff1a;文心一言 | 提词&#xff1a;动漫风格 红黑树 少女#创意图# 考研笔记整理1.7w字&#xff0c;但是删除操作的代码是有一点问题的{无法正确处理红色结点的删除}&#xff0c;其它功能可正常使用&#xff0c;请小伙伴注意~~&#x1f95d;&#x1f95d; 第1版&…

【线程池】线程池的ctl属性详解

目录 一、ctl介绍 二、线程池ctl源码 三、线程池ctl分析 1、private static int ctlOf(int rs, int wc) { return rs | wc; } 2、private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0)); 3、private static int runStateOf(int c) { return c &am…

TensorFlow框架

TensorFlow框架 本文目录&#xff1a; 一、通过代码了解TensorFlow结构 1.1、TensorFlow实现一个加法运算代码 1.1.1、原生python加法运算 1.1.2、TensorFlow实现加法运算 1.1.3、TensorFlow实现加法运算 1.2、TensorFlow的Hello World 二、TensorFlow架构图 三、Tenso…

WebRTC Docker容器部署方案

文章目录 WebRTC简介WebRTC Docker容器部署优势方案&#xff08;mpromonet/webrtc-streamer&#xff09;步骤 WebRTC简介 WebRTC&#xff08;Web Real-Time Communication&#xff09;是一种开放的实时通信技术&#xff0c;它允许浏览器之间进行音频、视频和数据的实时传输。W…

第九十二天学习记录:C++核心:类和对象Ⅰ(五星重要)

C面向对象的三大特性为&#xff1a;封装、继承、多态 C认为万事万物都皆为对象&#xff0c;对象上有其属性和行为 封装 封装的意义 封装是C面向对象三大特性之一 封装的意义&#xff1a; 1、将属性和行为作为一个整体&#xff0c;表现生活中的事物 2、将属性和行为加以权限…

js判断文件类型详解

js判断文件类型详解 通过file的type属性判断 <input type"file" onchange"onchangecb(this)" /> <script> function onchangecb(e) {const file e.files[0];console.log(file.type); } </script>像html中input标签&#xff0c;就是根…

地下水数值模拟-Visual modflow Flex软件应用

地下水&#xff08;ground water&#xff09;&#xff0c;是指赋存于地面以下岩石空隙中的水&#xff0c;狭义上是指地下水面以下饱和含水层中的水。在国家标准《水文地质术语》&#xff08;GB/T 14157-93&#xff09;中&#xff0c;地下水是指埋藏在地表以下各种形式的重力水。…

基于协议判断目标机器是否出网

内网渗透中时常会碰到一些不出网的主机&#xff0c;不出网的原因有很多&#xff0c;如常见的有&#xff1a;没有设置网关、系统防火墙或者其他设备设置了出入站限制&#xff0c;只允许特定协议或端口出网等&#xff0c;遇到这种情况时可以用以下命令测试目标主机允许哪些协议出…

桥接模式的学习与使用

1、桥接模式的学习 当你需要将抽象部分与实现部分解耦&#xff0c;使它们可以独立地变化&#xff0c;而又能够灵活地组合在一起时&#xff0c;可以使用桥接模式。桥接模式通过将抽象和实现部分分离&#xff0c;使它们可以独立地进行扩展和变化&#xff0c;同时又能够在运行时动…

代码随想录二战day2

977有序数组的平方 力扣 思路&#xff1a; 第一&#xff1a; 和之前一样的&#xff0c;看见数组我们的第一想法就是使用双指针去解。这道题需要额外开辟一个答案数组去储存结果。 第二&#xff1a; 头指针指向第一个元素&#xff0c;尾指针指向最后一个元素。然后对两个元素分…