Flink-SQL 写入PostgreSQL 问题汇总

news2024/11/19 19:29:06

1.主键字段为空问题

  • 错误信息
org.apache.flink.table.api.TableException: Column 'bus_no' is NOT NULL, however, a null value is being written into it. You can set job configuration 'table.exec.sink.not-null-enforcer'='DROP' to suppress this exception and drop such records silently.

在这里插入图片描述

  • 问题原因
    	sink 表定义了主键,flink-sql在使用jdbc 插入时,定义的主键中的属性存在空值
    PRIMARY  KEY (col,col2,col3,col4,col5,col6,col7) NOT ENFORCED
    
  • 解决
    确定主键属性是否有空值,若为空则确定是否可作为主键属性(若正确则将其置为默认值);若不符合业务情况,则重新定义主属性集合,确保不出现空值

2.flink-sql jdbc 并发写入Mysql出现死锁

  • 错误信息
 org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:240)
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO dm_hljy.dm_hljy_sims_day_payment_info(sign_year_type, pay_source, campus_name, school_name, depart_name, pay_time, pay_channel_name, day_fee_amounts) VALUES ('2033', '3', '教育 ', ' 初级中学', ' 初级中学', '2022-02-18 00:00:00+08'::timestamp, '微信', '61310.00'::numeric) ON DUPLICATE KEY UPDATE day_fee_amounts=VALUES(day_fee_amounts) was aborted: ERROR: dn_6003_6004: deadlock detected
  Detail: Process 139972369676032 waits for ShareLock on transaction 1091179704; blocked by process 139976955991808.
Process 139976955991808 waits for ShareLock on transaction 1091179703; blocked by process 139972369676032.
  Hint: See server log for query details.  Call getNextException to see other errors in the batch.
	at com.huawei.gauss200.jdbc.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:171) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:586) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:215) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer.processElement(SinkUpsertMaterializer.java:128) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:524) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:216) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:812) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) [flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
Caused by: com.huawei.gauss200.jdbc.util.PSQLException: ERROR: dn_6003_6004: deadlock detected
  Detail: Process 139972369676032 waits for ShareLock on transaction 1091179704; blocked by process 139976955991808.
Process 139976955991808 waits for ShareLock on transaction 1091179703; blocked by process 139972369676032.
  Hint: See server log for query details.
	at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2856) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2587) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:575) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	... 30 more

在这里插入图片描述

  • 错误原因
flink-sql在写入sql时定义了主键,flink-sql进行upser 操作(主键记录不存在则进行insert,存在则进行update)
flink-sql 插入时,会根据主键对sink表进行查询,若并发度大于1,则可能存在两个及以上线程查询同一条记录,出现死锁情况
  • 解决
 设置flink-sql jdbc 并发度为1,(flink jdbc 并发度跟随flink 作业并发度;也可以单独设置)
设置flink-sql 并发度 : parallelism.default: 1
也可以单独设置jdbc 并发度: (不跟随flink作业并发度) 
WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:gaussdb://dws-hl-datalake.dws.myhuaweiclouds.com:8000/hl_datamart',
  'table-name' = 'dm_hljy.dm_hljy_sims_security_send_back_student_info',
   'connection.max-retry-timeout' = '600s',
  'sink.max-retries' = '6',
   'sink.parallelism'= '1',
   'username' = '用户名',
   'password' = ‘密码'
 );

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/694294.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用Excel生成Sql:

用Excel生成Sql: 以如图为例:点击一行数据的后面一个单元格,在上面的fx部分输入,以等号开头证明这是一个公式。在等号的后面写上想要添加的数据,书写规范是这样:“&A2&”表示varchar类型;"&am…

你知道什么是生成对抗网络吗

生成对抗网络(GANs)是一种深度学习模型,已经显示出在许多生成相关任务中的卓越性能。最近几年,越来越多的研究人员将注意力集中于 GAN 的隐空间属性,并提出了许多利用这些属性进行语义图像编辑的方法。然而&#xff0c…

STM32CubeMX联合CLion开发环境搭建

STM32CubeMX联合CLion开发环境搭建 文章目录 STM32CubeMX联合CLion开发环境搭建1. STM32CubeMX与CLion简介1.1 STM32CubeMX1.2 HAL库1.3 CLion 2. 部署过程2.1 软件部署环境2.2 STM32CubeMX下载及安装2.2 OpenOCD下载与安装2.3 CLion设置 3. 第一个STM32项目 1. STM32CubeMX与C…

IntelliJ IDEA - 通过依赖名查找 Pom.xml 引入的源头坐标

问题描述 今天在新建项目的时候,发现一个注解(JsonInclude)不知道是哪个包的源头引入的,后来打开原来的老项目,查看对应的源文件,发现如图所示 但是这个 com.fasterxml.jackson.core:jackson-annotations:…

模型实战(13)之YOLOv8实现手语字母检测与识别+权重分享

YOLOv8实现手语字母检测与识别+权重分享 本文借助yolov8 实现手语字母的检测与识别:先检测手的ROI,进而对手语表达的字母含义进行识别全文将从环境搭建、模型训练及预测来展开对整个算法流程进行讲解文中给出了开源数据集链接及从 Roboflow 上的下载教程实现效果如下: 1. 环…

雪佛龙公司通过使用Liquid UI调动SAP EWM流程,在短短26天内将生产力提高了90%!

背景介绍 雪佛龙是一家美国跨国能源公司,最初被称为加州标准石油公司(Socal)。它的总部位于加利福尼亚州圣拉蒙,活跃于180多个国家。雪佛龙从事石油和天然气行业的各个方面,包括碳氢化合物勘探和生产;炼油、营销和运输…

【单片机】STM32单片机的矩阵键盘驱动,标准库,无阻塞方式的矩阵键盘读取

原理图: 从左到右、从上到下,按键是1到16,没有按键返回0: key.c #include "key.h"/* 按键初始化函数 */ void KEY_Init(void) {GPIO_InitTypeDef GPIO_InitStructure;RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOB, …

前端发送html字符串文本给后端,转PDF格式不正确

大无语事件,前端使用原始html table写出来在前端显示一直正确,但是一发给后端转PDF就失败(如图)。 想着是不是因为前端转义的问题,后来发现转不转的无所谓,然后发现后端本地转PDF也成功,但是通…

深入浅出设计模式 - 模板方法模式

博主介绍: ✌博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家✌ Java知识图谱点击链接:体系化学习Java(Java面试专题) 💕💕 感兴趣的同学可以收…

修复u盘怎么做?4步快速完成u盘修复!

我的u盘用了好多年了,里面存了很多重要的照片。但最近不知道为什么我的u盘出现了各种问题,大家有什么修复u盘的好方法吗?或者我应该怎么样才能恢复我U盘里的重要数据呢? U盘作为一个便捷的存储工具,为我们保存数据提供…

SpringBoot整合SpringSecurity

一、概述 1.1 Spring Security和Shiro Spring Security 是Spring家族中的一个安全管理框架。 相比与另外一个安全框架Shiro,它提供了更丰富的功能,社区资源也比Shiro丰富。 一般来说中大型的项目都是使用SpringSecurity 来做安全框架。 小项目有Shi…

ChatGPT:开放AI平台的最新进展和功能

第一章:引言 在过去的几年中,人工智能技术取得了长足的发展,其在各个领域的应用也日益广泛。而在AI技术中,自然语言处理(NLP)一直是备受关注的领域之一。ChatGPT作为OpenAI的开放AI平台上的一项重要技术&am…

【Duilib】通过xml文件布局界面

环境 VS版本:VS2013 概述 上一篇 【Duilib】入门 ,简单介绍了Duilib库的使用,这一篇测试一下通过xml布局界面。 步骤 1、创建工程 以 Win32\Win32项目 为模板创建TestByXml工程,步骤与上一篇基本一致。 2、创建MainWndFrame&a…

简单分享在微信上怎么实现分销功能

小程序分销开发怎么做?在如今的电商市场中,小程序分销成为了一种新兴的销售模式,通过分销模式,商家能够借助分销商的力量提高销售额,同时分销商也能不用投入大量资金和时间,就能在小程序上进行销售。那么&a…

FL Studio21中文版音频宿主软件下载教程

FL Studio是很适合新手上手的宿主软件,这得益于FL Studio独特的编曲逻辑。水果可以允许我们不使用音轨的思路来编曲。在FL Studio中我们创建一个pattern后,可以添加乐器或采样进去进行编写,编写完善后将Pattern拖进播放列表进行编排。不用区分…

高性能计算可以自学么?自学学完高性能计算能就业吗

随着超算互联网的认知越来越深,越来越多同学意识到高性能计算是未来有可能最好的就业方向之一。 高性能计算因其更偏底层、更不易被替代,因此广受广大大学生朋友和在职程序员的青睐。那么自学高性能计算可行吗?高性能计算可以自学吗&#xff…

2023年Java 毕业设计怎么选题,有哪些注意事项

个人简介:程序员徐师兄,7 年大厂程序员经历,擅长Java、微信小程序、Python、Android等,大家有这一块的问题可以一起交流! 各类成品java毕设 。javaweb,ssh,ssm,springboot等等项目框…

Microsoft Visual Studio × 出现错误,无去启动

Microsoft Visual Studio 出现错误,无去启动visual studio。 streamJsonRpc.RemotelnvocationException:cannot find service module info file ‘ldentitystorageservice.servicehub.service.json’ in ‘c:}ProgramFilesvicrosoft visual studio2022,Communitycommon7Servic…

【Java-15】反射知识总结

01_类的加载 类的加载过程类的加载时机 类的加载 当程序在运行后,第一次使用某个类的时候,会将此类的class文件读取到内存,并将此类的所有信息存储到一个Class对象中 说明:Class对象是指java.lang.Class类的对象,此类…

GaussDB WDR报告分析

标题 问题描述问题现象告警业务影响原因分析处理方法步骤 1步骤 2步骤 3步骤 4步骤 6步骤 7步骤 8步骤9步骤 10步骤 11步骤 12 问题描述 CPU使用率高。 问题现象 出现CPU使用率超过阈值,CPU使用率快速上涨或短时间持续较高水平等现象。 告警 CPU使用率告警。 …