Flink CDC MySQL同步MySQL错误记录

news2025/1/23 12:14:55

1、启动 Flink SQL

[appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh

在这里插入图片描述

2、新建源表

问题1:Encountered “(”
处理方法:去掉int(11),改为int

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int(11) NOT NULL AUTO_INCREMENT COMMENT 'user id',
>   `did` int(11) DEFAULT NULL COMMENT 'dept id',
>   `username` varchar(14) DEFAULT NULL,
>   `add_time` datetime DEFAULT NULL,
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "(" at line 2, column 12.
Was expecting one of:
    "CONSTRAINT" ...
    "NOT" ...
    "NULL" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "COMMENT" ...
    "METADATA" ...
    ")" ...
    "," ...
    "MULTISET" ...
    "ARRAY" ...

Flink SQL> 

问题2:Encountered “AUTO_INCREMENT”
处理方法:删除AUTO_INCREMENT

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int NOT NULL AUTO_INCREMENT COMMENT 'user id',
>   `did` int DEFAULT NULL COMMENT 'dept id',
>   `username` varchar(14) DEFAULT NULL,
>   `add_time` datetime DEFAULT NULL,
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "AUTO_INCREMENT" at line 2, column 22.
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "COMMENT" ...
    "METADATA" ...
    ")" ...
    "," ...
    "MULTISET" ...
    "ARRAY" ...

Flink SQL> 

问题3:Encountered “DEFAULT”
处理方法:删去DEFAULT

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int NOT NULL COMMENT 'user id',
>   `did` int DEFAULT NULL COMMENT 'dept id',
>   `username` varchar(14) DEFAULT NULL,
>   `add_time` datetime DEFAULT NULL,
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "DEFAULT" at line 3, column 13.
Was expecting one of:
    "CONSTRAINT" ...
    "NOT" ...
    "NULL" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "COMMENT" ...
    "METADATA" ...
    ")" ...
    "," ...
    "MULTISET" ...
    "ARRAY" ...

Flink SQL> 

问题4:Unknown identifier ‘datetime’
处理方法:改用 TIMESTAMP(3)

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int NOT NULL COMMENT 'user id',
>   `did` int COMMENT 'dept id',
>   `username` varchar(14) ,
>   `add_time` datetime ,
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'datetime'

Flink SQL> 

创建成功:

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int NOT NULL COMMENT 'user id',
>   `did` int COMMENT 'dept id',
>   `username` varchar(14) ,
>   `add_time` TIMESTAMP(3),
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[INFO] Execute statement succeed.

Flink SQL> 

3、创建目标表

Flink SQL> CREATE TABLE `ods_t_user` (
>     `uid` int NOT NULL COMMENT 'user id',
>     `did` int COMMENT 'dept id',
>     `username` varchar(14) ,
>     `add_time` TIMESTAMP(3),
>     PRIMARY KEY (`uid`) NOT ENFORCED
>  ) WITH (
>      'connector' = 'jdbc',
>      'url' = 'jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
>      'driver' = 'com.mysql.cj.jdbc.Driver',
>      'username' = '*******',
>      'password' = '*******',
>      'table-name' = 'ods_t_user'
> );

4、将源表加载到目标表

错误1:Connector ‘mysql-cdc’ can only be used as a source. It cannot be used as a sink.

Flink SQL> insert into t_user select * from ods_t_user;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Connector 'mysql-cdc' can only be used as a source. It cannot be used as a sink.

Flink SQL> 

原因:方向搞反了,插入表应该是目标表

Flink SQL> insert into ods_t_user select * from t_user;
[ERROR] Could not execute SQL statement. Reason:
java.io.StreamCorruptedException: unexpected block data

Flink SQL> 

错误2:unexpected block data
解决办法:
(1)更新jar包如下

[appuser@whtpjfscpt01 flink-1.17.1]$ ll lib/
total 223320
-rw-r--r-- 1 appuser appuser    196491 May 19 18:56 flink-cep-1.17.1.jar
-rw-r--r-- 1 appuser appuser    542620 May 19 18:59 flink-connector-files-1.17.1.jar
-rw-r--r-- 1 appuser appuser    266420 Sep 25 14:21 flink-connector-jdbc-3.1.1-1.17.jar
-rw-r--r-- 1 appuser appuser    345711 Sep 25 15:45 flink-connector-mysql-cdc-2.4.1.jar
-rw-r--r-- 1 appuser appuser    102472 May 19 19:02 flink-csv-1.17.1.jar
-rw-r--r-- 1 appuser appuser 135975541 May 19 19:13 flink-dist-1.17.1.jar
-rw-r--r-- 1 appuser appuser   8452171 Sep 19 10:20 flink-doris-connector-1.17-1.4.0.jar
-rw-r--r-- 1 appuser appuser    180248 May 19 19:02 flink-json-1.17.1.jar
-rw-r--r-- 1 appuser appuser  21043319 May 19 19:12 flink-scala_2.12-1.17.1.jar
-rw-r--r-- 1 appuser appuser  15407424 May 19 19:13 flink-table-api-java-uber-1.17.1.jar
-rw-r--r-- 1 appuser appuser  38191226 May 19 19:08 flink-table-planner-loader-1.17.1.jar
-rw-r--r-- 1 appuser appuser   3146210 May 19 18:56 flink-table-runtime-1.17.1.jar
-rw-r--r-- 1 appuser appuser    208006 May 17 18:07 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 appuser appuser    301872 May 17 18:07 log4j-api-2.17.1.jar
-rw-r--r-- 1 appuser appuser   1790452 May 17 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 appuser appuser     24279 May 17 18:07 log4j-slf4j-impl-2.17.1.jar
-rw-r--r-- 1 appuser appuser   2462364 Sep 19 11:30 mysql-connector-java-8.0.26.jar
[appuser@whtpjfscpt01 flink-1.17.1]$

(2)重启flink

[appuser@whtpjfscpt01 flink-1.17.1]$ bin/stop-cluster.sh 
Stopping taskexecutor daemon (pid: 41993) on host whtpjfscpt01.
Stopping standalonesession daemon (pid: 41597) on host whtpjfscpt01.
[appuser@whtpjfscpt01 flink-1.17.1]$ bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host whtpjfscpt01.
Starting taskexecutor daemon on host whtpjfscpt01.
[appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh

(3)重新执行

Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE `t_user` (
>     `uid` int NOT NULL COMMENT 'user id',
>     `did` int COMMENT 'dept id',
>     `username` varchar(14) ,
>     `add_time` TIMESTAMP(3),
>     PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
>  );
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE `ods_t_user` (
>     `uid` int NOT NULL COMMENT 'user id',
>     `did` int COMMENT 'dept id',
>     `username` varchar(14) ,
>     `add_time` TIMESTAMP(3),
>     PRIMARY KEY (`uid`) NOT ENFORCED
>  ) WITH (
>      'connector' = 'jdbc',
>      'url' = 'jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
>      'driver' = 'com.mysql.cj.jdbc.Driver',
>      'username' = '*******',
>      'password' = '*******',
>      'table-name' = 'ods_t_user'
> );
[INFO] Execute statement succeed.

Flink SQL>

(4)成功执行

Flink SQL> insert into ods_t_user select * from t_user;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c2e69d061f3777c031b0acb4ec03d13a

在这里插入图片描述

错误3:无目标表
在这里插入图片描述

 CREATE TABLE demo.ods_t_user (
  `uid` int(11) NOT NULL AUTO_INCREMENT COMMENT 'user id',
  `did` int(11) DEFAULT NULL COMMENT 'dept id',
  `username` varchar(14) DEFAULT NULL,
   `add_time` datetime DEFAULT NULL,
  PRIMARY KEY (`uid`) 
) 

在这里插入图片描述
源表添加新纪录

INSERT INTO test.t_user(did,username)values('3','test'); 

目标表自动同步数据
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1041211.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于DDR协议的一些操作的理解4

address 1.DDR中的地址,下表中的*4/ *8/ *16表示的是颗粒位宽。不同位宽的颗粒的行列地址的分步是不一样的。图中的page size表示的就是一行所存储的内容,以64MB*16格式为例,一行一共有10列,每一列存储16bit,也就是2B…

每日一题 146. LRU 缓存

难度:中等 由于周日没做,今天又是困难题,所以假装今天是周日 思路: 在字典结构的基础之上完成三个要求显然题目要求构建一个有序字典(当然不使用OrderedDict),由于 key 是唯一的,…

TLS/SSL(六) 非对称密码应用 PKI 证书体系

一 PKI 证书体系 概念: PKI、CA、数字证书、证书链、数字签名之前讲解的公钥不同于https站点所获取的证书,公钥只是数字证书的一部分信息说明: 以下内容仅作为个人笔记 华为云证书管理服务 CCM ① 基础 PKI目前有一系列标准规范定义,主要包括: ② …

lv5 嵌入式开发-6 线程的取消和互斥

目录 1 线程通信 – 互斥 2 互斥锁初始化 – pthread_mutex_init 3 互斥锁销毁 pthread_mutex_destroy 4 申请锁 – pthread_mutex_lock 5 释放锁 – pthread_mutex_unlock 6 读写锁 7 死锁的避免 8 条件变量(信号量) 9 线程池概念和实现 9.1 …

全栈工程师必须要掌握的前端JavaScript技能

作为一名全栈工程师,在日常的工作中,可能更侧重于后端开发,如:C#,Java,SQL ,Python等,对前端的知识则不太精通。在一些比较完善的公司或者项目中,一般会搭配前端工程师&a…

暗月中秋靶场活动writeup

前言 暗月在中秋节搞了个靶场活动,一共有4个flag,本着增长经验的想法参加了本次活动,最终在活动结束的时候拿到了3个flag,后面看了其他人的wp也复现拿到第四个flag。过程比较曲折,所以记录一下。 靶场地址 103.108.…

【sgUploadTileImage】自定义组件:浏览器端生成瓦片图,并转换为File文件序列上传瓦片图

特性&#xff1a; 支持自定义瓦片图尺寸支持显示预览最小尺寸100x100像素大小&#xff0c;切换为实际切割尺寸支持获取切割后的文件Files数组 sgUploadTileImage源码 <template><div :class"$options.name"><div class"sg-ctrl"><di…

使用datax将数据从InfluxDB抽取到TDengine过程记录

1. 编写InfluxDB数据查询语句 select time as ts,device as tbname, ip,device as district_code from "L2_CS" limit 1000 2. 创建TDengine表 create database if not exists sensor; create stable if not exists sensor.water(ts timestamp, ip varchar(50), …

App Inventor 2 模拟sleep函数

App Inventor 2 原生没有 sleep 及相关函数&#xff0c;需要模拟实现&#xff0c;经过测试这里给出一个既简单又相对高效率的实现方案&#xff1a; 需要用到计时器组件&#xff1a; 实现代码如下&#xff1a; 代码原理非常简单&#xff0c;就是计算好要 sleep 到的时刻&#x…

MySQL - 关于约束类型和作用的介绍

约束的概念&#xff1a;约束是作用于表中字段上的规则&#xff0c;用于限制存储在表中的数据。 约束的作用&#xff1a;用于保证数据库中数据的正确性、完整性和一致性。 约束分类&#xff1a; 约束类型作用关键字非空约束限制该字段的数据不能为nullnot null唯一约束保证该…

UE5 ChaosVehicles载具研究

一、基本组成 载具Actor类名称&#xff1a;WheeledVehiclePawn Actor最原始的结构 官方增加了两个摇臂相机&#xff0c;可以像驾驶游戏那样切换多机位、旋转观察 选择骨骼网格体、动画蓝图类、开启物理模拟 二、SportsCar_Pawn 角阻尼&#xff1a;物体旋转的阻力。数值越大…

C# OpenCvSharp 基于直线检测的文本图像倾斜校正

效果 项目 代码 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms; using OpenCvSharp;namespace OpenCvSharp_基于直线检测的文…

.balckhoues-V-XXXXXXX勒索病毒数据怎么处理|数据解密恢复

引言&#xff1a; 随着网络犯罪的不断演进&#xff0c;勒索病毒已成为当前数字时代的威胁之一&#xff0c;其中包括.balckhoues-V-XXXXXXX勒索病毒。本文将深入介绍.balckhoues-V-XXXXXXX勒索病毒的特点、数据恢复方法以及预防措施&#xff0c;以帮助您更好地理解和应对这一威…

【区块链 | DID】白话数字身份

《十四五数字经济发展规划》提出建立健全政务数据共享协调机制&#xff0c;加快数字身份统一认证和电子证照、电子签章、电子公文等互信互任&#xff0c;推进发票电子化改革&#xff0c;促进政务数据共享、流程优化和业务协同。在数字经济逐渐成形的背景下&#xff0c;推进数字…

【RabbitMQ实战】05 RabbitMQ后台管理

一、多租户与权限 1.1 vhost的概念 每一个 RabbitMQ服务器都能创建虚拟的消息服务器&#xff0c;我们称之为虚拟主机(virtual host),简称为 vhost。每一个 vhost本质上是一个独立的小型RabbitMQ服务器&#xff0c;拥有自己独立的队列、交换器及绑定关系等&#xff0c;并且它拥…

高级时钟项目(2)Json文件解析学习---C语言版本

笔者来介绍一下json文件解析 1、背景介绍 笔者在获取天气数据的时候&#xff0c;是通过MCU的WIFI去获取&#xff0c;但是获取到的数据json数据&#xff0c;需要解析&#xff0c;C语言没那么解析库&#xff0c;所以就需要找一些开源的解析库。 笔者找到cjson这个适用于C语言…

洗衣行业在线预约小程序系统源码搭建 支持直播功能+在线预约下单+上门取件

目前&#xff0c;人们对生活品质的追求不断提高&#xff0c;但生活节奏却也不断加快。对品质的追求遇到了忙碌的生活节奏&#xff0c;人们更渴望以最简单、便捷的方式达到追求品质的目的。同时&#xff0c;由于线上支付的普及&#xff0c;大家更希望足不出户就可以解决自己生活…

基于规则架构-架构案例2019(三十九)

电子商务 某电子商务公司为了更好地管理用户&#xff0c;提升企业销售业绩&#xff0c;拟开发一套用户管理系统。该系统的基本功能是根据用户的消费级别、消费历史、信用情况等指标将用户划分为不同的等级&#xff0c;并针对不同等级的用户提供相应的折扣方案。在需求分析与架…

AGV小车、机械臂协同作业实战06-任务分配算法(图解蚁群算法)代码示例java

什么是蚁群算法&#xff1f; 蚁群系统(Ant System(AS)或Ant Colony System(ACS))是由意大利学者Dorigo、Maniezzo等人于20世纪90年代首先提出来的。他们在研究蚂蚁觅食的过程中&#xff0c;发现蚁群整体会体现一些智能的行为&#xff0c;例如蚁群可以在不同的环境下&#xff0c…

计算机竞赛 深度学习乳腺癌分类

文章目录 1 前言2 前言3 数据集3.1 良性样本3.2 病变样本 4 开发环境5 代码实现5.1 实现流程5.2 部分代码实现5.2.1 导入库5.2.2 图像加载5.2.3 标记5.2.4 分组5.2.5 构建模型训练 6 分析指标6.1 精度&#xff0c;召回率和F1度量6.2 混淆矩阵 7 结果和结论8 最后 1 前言 &…