flink写入到kafka 大坑解析。

news2024/9/30 15:24:03

1.kafka能不能发送null消息?

   能!

2 flink能不能发送null消息到kafka?

不能!

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment
            .getExecutionEnvironment();
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"9.135.68.201:9092" );
    FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>( "cc_test",new SimpleStringSchema(),properties);
    env.fromCollection(Lists.newArrayList("111", "222", "333"))
        .map(s->{return s.equals("222")?null:s;})
            .addSink(flinkKafkaProducer);
    env.execute("ContractLabelJob");
}

 

 

这里就报了java的最常见错误 空指针,原因就是flink要把kafka的消息getbytes。所以flink不能发送null到kafka。

这种问题会造成什么后果?

flink直接挂掉。

如果我们采取了失败重试机制会怎样?

env.setRestartStrategy(  RestartStrategies.fixedDelayRestart(3, Time.seconds(5))  );

数据重复或者丢失。

还有此时kafka的offset由flink在管理, 消费的offset 一直没有被commit,所以一直重复消费。

来个demo

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();
        env.enableCheckpointing(6000);
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"9.135.68.201:9092" );
        FlinkKafkaProducer<String> sink = new FlinkKafkaProducer<>( "cc_test2",new SimpleStringSchema(),properties);
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setTopics("cc_test")
                .setGroupId("cc_test1234")
                .setBootstrapServers("9.135.68.201:9092")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.earliest())
                .build();
        DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka");
//        stringDataStreamSource.print("kafka msg");
        stringDataStreamSource
                .addSink(sink);
        env.execute("test");

 从topic cc_test消费 然后写到cc_test2里面去

cc_test里的数据

 cc_test_2里写入的数据

 可以看到一个null 报错了,然后它分区的333就会一直被提交。

总之大家小心这个问题。

不加检查点 flink报错后就会直接停掉。。

加了检查点env.enableCheckpointing(6000); flink失败后会一直重试

加了重试机制 env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5000, TimeUnit.SECONDS),Time.of(5000,TimeUnit.SECONDS))); 失败的任务只会重试几次。

还是得熟悉源码呀。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/788907.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mysql 第九章

目录 1.mha 搭建 2.总结 1.mha 搭建 主从同步&#xff1a; 安装 mha 软件&#xff1a; mha 模拟 vip 飘移、master 切换&#xff1a; 2.总结 mha 是一套优秀的 mysql 高可用环境下故障切换和主从复制的软件。mha 解决 mysql 单点的问题。mysql 故障切换过程中&#xff0c;mh…

Linux 学习记录57(ARM篇)

Linux 学习记录57(ARM篇) 本文目录 Linux 学习记录57(ARM篇)一、外部中断1. 概念2. 流程图框 二、相关寄存器1. GIC CPU Interface (GICC)2. GIC distributor (GICD)3. EXTI registers 三、EXTI 寄存器1. 概述2. 内部框图3. 寄存器功能描述4. EXTI选择框图5. EXTI_EXTICR1 &…

Qt Creator mainwindow.obj:-1: error: LNK2019

构建的时候报错&#xff1a; mainwindow.obj:-1: error: LNK2019: 无法解析的外部符号 "public: __thiscall mynotedig::mynotedig(class QWidget *)" (??0mynotedigQAEPAVQWidgetZ)&#xff0c;该符号在函数 "public: void __thiscall MainWindow::mynoteab…

C语言宏替换的注意事项

先思考一个问题&#xff1a; #include <string> #include <Windows.h>namespace ui { int MessageBox(HWND hwnd, const std::wstring &text, const std::wstring &caption,UINT flags) {UINT actual_flags flags;const wchar_t *text_ptr text.c_str();…

推荐 3 个实用的 GitHub 项目

本期推荐开源项目目录&#xff1a; 1. 开源知识库 2. 去中心化的社交平台 3. h2oGPT 01 开源知识库 AFFINE 是 Notion、Miro 等知识库产品的开源替代品&#xff0c;目前已经获得了近 20k 的 Stark。通过 AFFINE 你可以进行写作、绘画、计划管理。 类似于 Notion 的 Block &…

STC12C5A系列单片机片内看门狗的应用

wdt.c #include "wdt.h"void wdt_init(void) {WDT_CONTR 0x24; // 0010 0100 - 1.1377s }void wdt_feed(void) {WDT_CONTR | 0x10; // 喂狗 }wdt.h #ifndef _WDT_H_ #define _WDT_H_#include "stc12c5a60s2.h"// 函数声明 extern void wdt_init(void); …

Ajax 黑马学习

Ajax 资源 数据是服务器对外提供的资源,通过 请求 - 处理 - 响应方式获取 请求服务器数据, 用到 XMLHttpRequest 对象 XMLHttpRequest 是浏览器提供的js成员, 通过它可以请求服务器上的数据资源 let xmlHttpRequest new XMLHttpRequest(); 请求方式 : get向服务器获取数据…

requests---jsonpath在接口自动化中的应用

前言 我们在做接口测试时&#xff0c;大多数返回的都是json属性&#xff0c;我们需要通过接口返回的json提取出来对应的值&#xff0c;然后进行做断言或者提取想要的值供下一个接口进行使用&#xff0c;但是如果返回的json数据嵌套了很多层&#xff0c;通过查找需要的词&#x…

软件进行用户体验测试有哪些要点?

无论是移动应用还是网页应用&#xff0c;优秀的用户体验是软件成功的关键所在。软件用户体验测试是一种关键的测试方法&#xff0c;旨在评估用户在使用软件过程中的感受和体验。通过深入了解用户需求&#xff0c;模拟真实场景&#xff0c;以及综合用户反馈&#xff0c;软件开发…

【C++】C++11——右值引用和移动语义|可变参数模板

文章目录 一、左值引用和右值引用左值引用和右值引用的定义左值引用和右值引用的比较 二、右值引用的使用场景和意义左值引用的短板移动构造和移动赋值万能引用和完美转发 三、新的类功能类成员变量初始化default 和 delete 四、可变参数模板 一、左值引用和右值引用 传统的C语…

操作系统资源限制问题(Memory Analyzer使用OutOfMemoryError)

java8使用Memory Analyzer大概是10左右的版本&#xff0c;用最新版本要java17 MemoryAnalyzer-1.10.0.20200225-win32.win32.x86_64 MAT(Memory Analyzer Tool)下载 https://www.cnblogs.com/zwh0910/p/15774590.html Eclipse downloads - Select a mirror | The Eclipse F…

"科技与狠活"企业级无代码开发MES系统,一周实现数字化

随着科技的不断发展&#xff0c;企业级无代码开发平台成为了一种新型的解决方案&#xff0c;能够有效降低软件开发门槛&#xff0c;提升开发效率。在制造业领域&#xff0c;MES系统&#xff08;Manufacturing Execution System&#xff09;作为一种关键的生产管理工具&#xff…

小程序开发收费多少

一、小程序开发费用一览表 ① 域名费用 域名的价格取决于选择的域名类型&#xff0c;常见的有如.com、.cn等这些&#xff0c;以及注册商的定价策略。比如&#xff0c;一个.com 域名一年的的注册费用大约在 50-200 元人民币之间。 ②服务器费用 服务器费用是根据服务器的配置…

day37-Pokedex(神奇宝贝图鉴卡牌展示)

50 天学习 50 个项目 - HTMLCSS and JavaScript day37-Pokedex&#xff08;神奇宝贝图鉴卡牌展示&#xff09; 效果 index.html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport&qu…

vscode默认gbk编码格式打开

目录 1. 问题描述2. 解决方案 1. 问题描述 每次打开vscode都是utf-8格式打开文件&#xff0c;然后满屏的中文乱码&#xff0c;自己手动换成gbk编码 后中文显示正常&#xff0c;但是换多了很烦。 2. 解决方案 ctrlshiftP 点首选项&#xff1a;打开用户设置 加上这行在最后&…

文件包含漏洞利用思路

简介 通过PHP函数引入文件时&#xff0c;传入的文件名没有经过合理的验证&#xff0c;从而操作了预想之外的文件&#xff0c;导致意外的文件泄漏甚至恶意代码注入。 常见的文件包含函数 php中常见的文件包含函数有以下四种&#xff1a; include()require()include_once()re…

教你合法安全匿名访问网站的10个技巧

大家常说互联网大数据使得每个人上网更透明&#xff0c;确实隐私性和安全性已成为人们在进行互联网浏览时所关注的焦点。无论您想要对匿名访问目标网站进行调查研究&#xff0c;又或者是爬虫业务、跨境业务&#xff0c;完全保持匿名都可能成为关键。 为了帮助您保护更多隐私和安…

【OC总结-weak的底层原理】

文章目录 1. SideTables1.1 StripedMap1.2 SideTable1.3 引用计数refcnts 存储结构RefcountMap1.4 weak_table_t结构体1.4.1 .weak_entry_t结构体 2.1 weak的实现及其调用的相关函数2.1 初始化时&#xff1a;2.1.1 objc_initWeak方法2.1.2 storeWeak方法 2.2 添加引用时&#x…

【C++从0到王者】第十站:手撕string

文章目录 一、String的基本结构二、String的构造函数1.string(const char* str)2.string()3.string(const char* str " ")4.string(const string& s) 二、String的析构函数三、获取String中的字符串四、获取有效元素个数五、operator[]运算符重载六、增删查改之…

Fastify系列-从0到1超详细手把手教你使用Fastify构建快速的API

什么是Fastify&#xff1f; Fastify是一个web框架&#xff0c;高度专注于以最少的开销和强大的插件架构提供最佳的开发体验。它的灵感来自于Hapi和Express&#xff0c;据我们所知&#xff0c;它是运行在Node.js上的最快的Web框架之一。 为什么使用Fastify&#xff1f; 这些是…