Flink SQL 基于Update流出现空值无法过滤问题

news2024/11/18 15:22:37

问题背景

  • 问题描述
基于Flink-CDC ,Flink SQL的实时计算作业在运行一段时间后,突然发现插入数据库的计算结果发生部分主键属性发生失败,导致后续计算结果无法插入,
超过失败次数失败的情况
  • 问题报错
	Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO dm_hljy.dws_table_name (op_date, school_year, campus_name, school_name, depart_name, total_opfare, ids, update_time) VALUES ('2024-03-11 00:00:00+08', '2023', 'xxxx', 'xxxx学校', 'xxxx小学部', '203333300000', '57', '2024-03-21 09:31:08.47+08') ON DUPLICATE KEY UPDATE school_year=VALUES(school_year), total_opfare=VALUES(total_opfare), ids=VALUES(ids), update_time=VALUES(update_time) was aborted: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraint  Call getNextException to see other errors in the batch.
		at com.huawei.gauss200.jdbc.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:171) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:586) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]
		at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332]
		... 1 more
	Caused by: com.huawei.gauss200.jdbc.util.PSQLException: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraint
		at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2856) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2587) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:575) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]
		at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332

在这里插入图片描述

定位

定位思路

1.方向一:怀疑数据库插入存在数据处理时,造成数据处理出现空值的情况,即数据本身不为空,但是数据插入却出现了空
2.方向二:Flink-SQL在消费kafka数据时存在了空值,故加工的数据计算结果存在空值

定位过程

  • 因插入数据库定位比较麻烦,且数据库已经设置该字段为主属性,故出现插入时处理为空值的概率较小。故先从较为简单的Flink SQL查询数据
  • 定位方法一,查询该字段为空的记录,待作业执行完成后,未查询到空值对应记录
 select  select * from table_name where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
  • 因考虑到使用Flink-CDC进行变更数据捕获,故对应的update流存在-U,+U,-D,+I记录,因此随着插入记录存在空值被记录进去的情况,故采用view的方式,先将宽表的加工、关联方式创建为view,然后进行空值的过滤。实施如下
create view view_prd as 
select a.* ,b.*  from a join b on a.id = b.id

    select * from view_prd where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
  • 通过查询结果,发现存在最后一条记录存在空值的原因,往源头定位,发现该字段之前为空,后面进行更新填充到值出现-U记录,导致数据插入持续失败
    在这里插入图片描述

原因

  • 因为flink-SQL消费的数据时kafka topic,flink以upsert-kafka形式的connector进行写入,故存在changelog 流中数据更新存在-U,+U的记录(按照Key进行区分唯一条记录),value 为空(-U)的记录kafka也,导致出现空值,
    在这里插入图片描述

解决

通过在DWS宽表创建一层View(如上),在写入DWS宽表的kafka topic之前,现将该字段空值过滤,即可排除空值涉及记录被纳入结果指标计算的范围中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1550103.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机网络——28自治系统内部的路由选择

自治系统内部的路由选择 RIP 在1982年发布的BSD-UNIX中实现Distance vector算法 距离矢量:每条链路cost 1,# of hops(max 15 hops)跳数DV每隔30秒和邻居交换DV,通告每个通告包括:最多25个目标子网 RIP通告 DV:在…

虚机无法进入系统一直轮转在内核启动页面处理

【问题现象】 在日常处理虚机过程中会出现,虚机无法进入系统,一直轮转在内核启动页面的情况 【处理办法】 【步骤一】登录原先内核系统,设置默认新内核启动 【步骤二】进入系统后设置 # grubby --set-default /boot/vmlinuz-3.10.0-957.ax…

2023自适应霍夫曼编码High-performance RDHEI with adaptive Huffman code

RRBE 本文仅供自我学习使用,切勿转载和搬运,如有侵权,联系立删~ 方法总框架 首先由内容所有者生成原始图像像素点的标签映射; 然后数据隐藏者采用自适应霍夫曼编码将多个信息插入加密图像像素点;接收端进行数据提取和图像恢复。在数据提取之前,必须对标记的加密图像进行…

Digital Image processing (DIP)

Camera FOV: Filed of view DOV: deep of view 景深 被F f/D 衡量,f 是焦距,D 是光圈大小。 当确定好了景深后,如何光线较暗,则需要补光,或者适当延长曝光时间(快门) 分辨率、像素尺寸&…

qt-C++笔记之QSpinBox控件

qt-C笔记之QSpinBox控件 code review! 文章目录 qt-C笔记之QSpinBox控件1.运行2.main.cpp3.main.pro4.《Qt6 C开发指南》&#xff1a;4.4 QSpinBox 和QDoubleSpinBox 1.运行 2.main.cpp #include <QApplication> #include <QSpinBox> #include <QPushButton&g…

Hides for Mac:应用程序隐藏工具

Hides for Mac是一款功能强大的应用程序隐藏工具&#xff0c;专为Mac用户设计。它能够帮助用户快速隐藏当前正在运行的应用程序窗口&#xff0c;保护用户的隐私和工作内容&#xff0c;避免不必要的干扰。 软件下载&#xff1a;Hides for Mac下载 Hides for Mac的使用非常简单直…

2024年springboot+vue毕业设计选题推荐

2024年&#xff0c;随着技术的发展和市场需求的变化&#xff0c;基于Spring Boot和Vue的毕业设计选题可以更加注重新兴技术的融合和解决实际问题。以下是一些建议的选题方向&#xff1a; 1. 基于Spring Boot和Vue的智能健康管理系统 - 设计并实现一个集成了运动数据、睡眠监…

chrome 浏览器报错 This page will not function without javascript enabled

This page will not function without javascript enabled. Please enable javascript on your browser. 在访问公司spark history 页面时&#xff0c;发现页面加载不全&#xff0c;并提示如上报错&#xff0c;因此按照如下步骤&#xff0c;已解决问题。 在浏览器中启用 JavaS…

【ML】类神经网络训练不起来怎么办 5

【ML】类神经网络训练不起来怎么办 5 1. Saddle Point V.S. Local Minima(局部最小值 与 鞍点)2. Tips for training: Batch and Momentum(批次与 动量)2.1 Tips for training: Batch and Momentum2.2 参考文献:2.3 Gradient Descent2.4 Concluding Remarks(前面三讲)3.…

国际伦敦金行情分析中的趋势分析方法

国际伦敦金行情走势复杂多变。近期&#xff0c;金价曾经一度刷新历史的新高点至2222&#xff0c;但就在当天&#xff0c;金价又快速下跌跌超过30美元。不过这么多变的伦敦金行情也为我们的交易创造了空间&#xff0c;有空间就等于有机会&#xff0c;只要我们能够掌握国际伦敦金…

自然语言处理3(NLP)—— 机器学习

1. 自然语言处理在机器学习领域的主要任务 自然语言处理&#xff08;NLP&#xff09;在机器学习领域中扮演着至关重要的角色&#xff0c;旨在使计算机能够理解、解释和生成人类语言。以下是NLP在机器学习领域中的主要任务及其分类方法&#xff1a; 1.1 按照功能类型分类 1.1.…

http模块 url对象的主要属性

在 Node.js 中&#xff0c;URL 对象是一个内置类&#xff0c;用于解析和操作 URL 字符串。URL 对象具有多个属性&#xff0c;这些属性提供了对 URL 不同部分的访问。以下是URL对象的一些主要属性及其含义&#xff1a; &#xff08;1&#xff09;href 返回完整的 URL 字符串。…

鸿蒙OS开发问题:(ArkTS)【 RSA加解密,解决中文乱码等现象】

RSA加解密开始构建工具类就是举步维艰&#xff0c;官方文档虽然很全&#xff0c;但是还是有很多小瑕疵&#xff0c;在自己经过几天的时间&#xff0c;彻底解决了中文乱码的问题、分段加密的问题。 首先看官方示例代码(以RSA非对称加解密&#xff08;多次调用doFinal实现分段&a…

【前端】layui学习笔记

参考视频&#xff1a;LayUI 1.介绍 官网&#xff1a;http://layui.apixx.net/index.html 国人16年开发的框架,拿来即用,门槛低 … 2. LayUi的安装及使用 Layui 是一套开源的 Web UI 组件库&#xff0c;采用自身轻量级模块化规范&#xff0c;遵循原生态的 HTML/CSS/JavaScript…

AssertionError: extension access disabled because of command line flags解决方案

本文收录于《AI绘画从入门到精通》专栏&#xff0c;专栏总目录&#xff1a;点这里&#xff0c;订阅后可阅读专栏内所有文章。 大家好&#xff0c;我是水滴~~ 本文介绍在 Stable Diffusion WebUI 中安装插件时出现 AssertionError: extension access disabled because of comma…

【前端面试3+1】03深拷贝浅拷贝、let和var、css盒模型、【有效括号】

一、深拷贝浅拷贝 深拷贝和浅拷贝都是用于复制对象或数组的概念&#xff0c;但它们之间有着重要的区别&#xff1a; 1. 浅拷贝&#xff1a; 浅拷贝是指在拷贝对象或数组时&#xff0c;只会复制一层对象的属性或元素&#xff0c;而不会递归地复制嵌套的对象或数组。因此&#xf…

POJ3037 + HDU-6714

两道最短路好题 POJ3037 手玩一下 发现每一点的速度可以直接搞出来&#xff0c;就是pow(2,h[1][1]-h[i][j])*V 那么从这个点出发到达别的点的耗费的时间都是上面这个数的倒数&#xff0c;然后直接跑最短路就好了 #include<iostream> #include<vector> #include<…

SiteServer 学习笔记 Day03 添加栏目

1、添加栏目&#xff0c;信息管理->栏目管理->添加按钮&#xff0c;分别添加“关于我们”、“市场服务”、“制造服务”、“测试服务”、 “工程服务”、“补充服务”、“新闻动态”、“网站地图”、“博客”、“联系我们”、“质量保证”、“Banner”。如下图。 2、栏目…

Hack.Summit() 2024再添亮点:Morphism CEO Cecilia Hsueh确认出席

随着Web3技术的风起云涌&#xff0c;区块链行业正在全球范围内以前所未有的速度崭露头角。而在这场变革的浪潮中&#xff0c;备受瞩目的区块链盛会——Hack.Summit() 2024区块链开发者大会&#xff0c;将于2024年4月9日至10日&#xff0c;在香港数码港拉开帷幕。这不仅标志着Ha…

春秋云境CVE-2023-2130

简介 在SourceCodester采购订单管理系统1.0中发现了一项被分类为关键的漏洞。受影响的是组件GET参数处理器的文件/admin/suppliers/view_details.php中的一个未知函数。对参数id的操纵导致了SQL注入。可以远程发起攻击 正文 进入靶场我们可以尝试弱口令爆破&#xff0c;最后…