flink 写入数据到 kafka 后,数据过一段时间自动删除

news2024/11/24 20:27:36

版本

  • flink 1.16.0
  • kafka 2.3

流程描述:

flink利用KafkaSource,读取kafka的数据,然后经过一系列的处理,通过KafkaSink,采用 EXACTLY_ONCE 的模式,将处理后的数据再写入到新的topic中。

问题描述:

数据写入到新的topic后,过上几分钟的时间,利用工具offset explorer观察对应topic的数据量,显示为0。
刚写入没多久的数据消失了 ???大写的懵 ???

定位问题:

  • 首先查看kafka的日志:

在这里插入图片描述

  • 阅读flink 官方文档 kafkaSink的介绍:

DeliveryGuarantee.EXACTLY_ONCE: In this mode, the KafkaSink will write
all messages in a Kafka transaction that will be committed to Kafka on
a checkpoint. Thus, if the consumer reads only committed data (see
Kafka consumer config isolation.level), no duplicates will be seen in
case of a Flink restart. However, this delays record visibility
effectively until a checkpoint is written, so adjust the checkpoint
duration accordingly. Please ensure that you use unique
transactionalIdPrefix across your applications running on the same
Kafka cluster such that multiple running jobs do not interfere in
their transactions! Additionally, it is highly recommended to tweak
Kafka transaction timeout (see Kafka producer transaction.timeout.ms)»
maximum checkpoint duration + maximum restart duration or data loss
may happen when Kafka expires an uncommitted transaction.

  • 翻译过来的意思大概就是:

在EXACTLY_ONCE这种模式下,KafkaSink在事务中写入所有的消息,这些消息在checkpoint上提交给kafka。因此,在flink重启的情况下,如果消费者值读取提交的数据,不会看到重复的数据。缺点就是延迟记录可见性,知道写入检查点为止。强烈建议调整kafka的事务超时时间(见Kafka producer transaction.timeout.ms),超时时间要大于【最大检查点持续时间+最大重启持续时间】,否则当Kafka过期未提交的事务时可能会发生数据丢失。

  • 阅读kafka的官网介绍:

Producer Configs:
transaction.timeout.ms:60000(默认值)

参数描述:
The maximum amount of time in ms that the transaction coordinator will
wait for a transaction status update from the producer before
proactively aborting the ongoing transaction.If this value is larger
than the transaction.max.timeout.ms setting in the broker, the request
will fail with a InvalidTransactionTimeout error.

Broker Configs
transaction.max.timeout.ms:900000(默认值)

参数描述:
The maximum allowed timeout for transactions. If a client’s requested
transaction time exceed this, then the broker will return an error in
InitProducerIdRequest. This prevents a client from too large of a
timeout, which can stall consumers reading from topics included in the
transaction.

  • 最后排查
    在flink中设置的超时时间违反了kafka producer对应的参数规定。

解决问题

在kafkaSink的配置中,加入

Properties properties = new Properties();
// 根据上面的介绍自己计算这边的超时时间,满足条件即可
properties.setProperty("transaction.timeout.ms","900000");

KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(bootstrapServers)
                .setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()
                        .setTopic(sinkTopic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setKafkaProducerConfig(properties)
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("flink-xhaodream-")
                .build();

总结

在使用现有框架和工具的时候,往往只是懂得怎么用,具体底层的逻辑、原理,了解的很少。往往只有真正理解了原理,遇到了问题,才会更快、更准确的定位问题、解决问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/989332.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【SpringMVC]获取参数的六种方式

目录 1.通过ServletAPI获取 2.通过控制器方法的形参获取 3.RequestParam&#xff1a;将请求参数和控制器方法的形参绑定 4.RequestHeader&#xff1a;将请求头信息与控制器方法的形参的值进行绑定 5. CookieValue&#xff1a;将cookie数据和控制器方法的形参绑定 Cookie&…

入门人工智能 —— 学习一门编程语言 python 基础代码编写和运算符介绍(1)

入门人工智能 —— 学习一门编程语言 python&#xff08;1&#xff09; 入门流程1.安装pythonwindowslinux ubuntu 代码编写打印输出结果 基本加减法介绍基本运算符 随着人工智能技术的快速发展&#xff0c;越来越多的年轻人开始关注这个领域。作为入门者&#xff0c;学习人工智…

文心一言 VS 讯飞星火 VS chatgpt (88)-- 算法导论8.3 1题

一、用go语言&#xff0c;参照图 8-3 的方法&#xff0c;说明 RADIX-SORT在下列英文单词上的操作过程:COW&#xff0c;DOG&#xff0c;SEA&#xff0c;RUG&#xff0c;ROW&#xff0c;MOB&#xff0c; BOX&#xff0c; TAB&#xff0c; BAR&#xff0c; EAR&#xff0c;TAR&…

Java基于 SpringBoot 的车辆充电桩系统

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝30W,Csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 文章目录 1、效果演示效果图技术栈 2、 前言介绍&#xff08;完整源码请私聊&#xff09;3、主要技术3.4.1 …

AI人工智能Mojo语言:AI的新编程语言

推荐&#xff1a;使用 NSDT场景编辑器 快速搭建3D应用场景 Mojo的主要功能包括&#xff1a; 类似Python的语法和动态类型使Python开发人员易于学习Mojo&#xff0c;因为Python是现代AI / ML开发背后的主要编程语言。使用Mojo&#xff0c;您可以导入和使用任何Python库&#xf…

2.8 PE结构:资源表详细解析

在Windows PE中&#xff0c;资源是指可执行文件中存放的一些固定不变的数据集合&#xff0c;例如图标、对话框、字符串、位图、版本信息等。PE文件中每个资源都会被分配对应的唯一资源ID&#xff0c;以便在运行时能够方便地查找和调用它们。PE文件中的资源都被组织成一个树形结…

“链主品牌”竞争战略四大误区

在《财富》500 强企业名单中&#xff0c;2008 年中国企业仅有 37 家&#xff0c;而 2019 年攀升至 119 家&#xff0c;与美国仅相差两家。2008 年&#xff0c;中国关注的问题是其在世界 500 强榜单上的企业数量太少。而如今的讨论则集中在为何中国企业规模大却实力不强。原因在…

有哪些做流程图的软件?分享一些制作方法和注意事项

流程图是一种常用的图表&#xff0c;可以用于表示各种工作流程、系统架构、决策流程等。在现代工作生活中&#xff0c;制作流程图已经成为了必备的技能之一。本文将介绍一些常用的做流程图的工具&#xff0c;并分享一些制作方法和注意事项。 做流程图的工具 1.迅捷画图&#x…

Node.js安装教程图文详解

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 下载Node.js 请下载Node.js并保存至本地&#xff0c;官方网址&#xff1a;https://nodejs.org/zh-cn/ 在此&#xff0c;选择windows系统64位的16.13.1版本进行下载。 下载…

【SLAM】Sophus库的超详细解析

在视觉SLAM中&#xff0c;李群李代数是描述位姿比较常用的一种表达形式。但是&#xff0c;在Eigen中并不提供对它的支持&#xff0c;一个较好的李群和李代数的库是Sophus库&#xff0c;它很好的支持了SO3、so3、SE3、se3。 Sophus简介 代码仓库&#xff1a;https://github.com…

laravel系列(二) Dcat admin框架开发工具使用

开发工具可以非常好的帮助我们去快速的开发CURD等操作,但也是有部分框架有些不是太便捷操作,这篇博客主要为大家介绍Dcat admin的开发工具详细使用. 如何创建页面: 在联表我们首先要去.env文件中去找连接数据库方法: APP_NAMELaravel APP_ENVlocal APP_KEYbase64:thO0lOVlzj0…

机器学习入门教学——人工智能、机器学习、深度学习

1、人工智能 人工智能相当于人类的代理人&#xff0c;我们现在所接触到的人工智能基本上都是弱AI&#xff0c;主要作用是正确解释从外部获得的数据&#xff0c;并对这些数据加以学习和利用&#xff0c;以便灵活的实现特定目标和任务。例如&#xff1a; 阿尔法狗、智能汽车简单…

苹果跨入“迷你超级周期”:iPhone15Pro占比提升75%,性价比高?

据美国投行韦德布什证券的分析师Daniel Ives最近发布的投资者报告&#xff0c;苹果公司今年推出的iPhone 15机型比例已经从往年的3&#xff1a;2调整为了3&#xff1a;1&#xff0c;这标志着苹果公司跨入了“迷你超级周期”。 分析师郭明錤曾预测&#xff0c;今年iPhone 15系列…

【深度学习】 Python 和 NumPy 系列教程(二):Python基本数据类型:3、字符串(索引、切片、运算、格式化)

目录 一、前言 二、实验环境 三、Python基本数据类型 3. 字符串&#xff08;Strings&#xff09; 1. 初始化 2. 索引 3. 切片 4. 运算 a. 拼接运算 b. 复制运算 c. 子串判断 d. 取长度 5. 格式化 a. 使用位置参数 b. 使用关键字参数 c. 使用属性访问 f-string…

在家也能轻松使用用友畅捷通T3管理财务,实现高效率远程办公!

文章目录 前言1. 用友畅捷通T3借助cpolar实现远程办公1.1 在被控端电脑上&#xff0c;点击开始菜单栏&#xff0c;打开设置——系统1.2 找到远程桌面1.3 启用远程桌面 2. 安装cpolar内网穿透2.1 注册cpolar账号2.2 下载cpolar客户端 3. 获取远程桌面公网地址3.1 登录cpolar web…

三.listview或tableviw显示

一.使用qt creator 转变类型 变形为listview或tableviw 二.导出ui文件为py文件 # from123.py 为导出 py文件 form.ui 为 qt creator创造的 ui 文件 pyuic5 -o x:\xxx\from123.py form.uifrom123.py listview # -*- coding: utf-8 -*-# Form implementation generated fro…

GDAL Python 过滤Shape Polygon中的面积小于某个阈值的小图斑

# -*- coding: utf-8 -*- # !/usr/bin/mgdal_env # Time : 2023/9/6 9:36 # Author : Hexk # 过滤矢量文件中的面积小于某个阈值的小图斑from osgeo import ogr, gdal, osr import osdef ShapeFiltratePitch(_input_path, _output_path, _area_threshold):"""过…

logback/log4j基本配置和标签详解

什么是logback logback 继承自 log4j&#xff0c;它建立在有十年工业经验的日志系统之上。它比其它所有的日志系统更快并且更小&#xff0c;包含了许多独特并且有用的特性。 logback.xml 首先直接上配置&#xff0c;我在项目过程中发现一些同时遇到需要logback文件的时候就去…

2023-9-8 求组合数(一)

题目链接&#xff1a;求组合数 I #include <iostream> #include <algorithm>using namespace std;const int mod 1e9 7;int n; const int N 2010; int c[N][N];void init() {for(int i 0; i < N; i )for(int j 0; j < i; j)if(!j) c[i][j] 1;else c[i]…

学习SpringMvc第三战-利用SpringMvc实现CRUD

前言&#xff1a; 小编讲述了参数传递&#xff0c;返回值以及页面跳转&#xff01;为我们的CRUD提供了理论基础&#xff0c;接下来小编会通过SpringMvc实现CRUD来讲述在企业开发中必须要学会的CRUD 一.前期环境搭建 1.替换pom.xml的内容 <properties><project.buil…