最新版Flink CDC MySQL同步MySQL(一)

news2024/7/7 17:21:32

1.概述

Flink CDC 是Apache Flink ®的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink 的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。
在这里插入图片描述

2.支持的连接器

连接器数据库驱动
mongodb-cdcMongoDB: 3.6, 4.x, 5.0MongoDB Driver: 4.3.4
mysql-cdcMySQL: 5.6, 5.7, 8.0.x、RDS MySQL: 5.6, 5.7, 8.0.x、PolarDB MySQL: 5.6, 5.7, 8.0.x、Aurora MySQL: 5.6, 5.7, 8.0.x、MariaDB: 10.x、PolarDB X: 2.0.1JDBC Driver: 8.0.28
oceanbase-cdcOceanBase CE: 3.1.x, 4.x、OceanBase EE: 2.x, 3.x, 4.xOceanBase Driver: 2.4.x
oracle-cdcOracle: 11, 12, 19, 21Oracle Driver: 19.3.0.0
postgres-cdcPostgreSQL: 9.6, 10, 11, 12, 13, 14JDBC Driver: 42.5.1
sqlserver-cdcSqlserver: 2012, 2014, 2016, 2017, 2019JDBC Driver: 9.4.1.jre8
tidb-cdcTiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0JDBC Driver: 8.0.27
db2-cdcDb2: 11.5Db2 Driver: 11.5.0.0
vitess-cdcVitess: 8.0.x, 9.0.xMySql JDBC Driver: 8.0.26

3.支持的 Flink 版本

下表显示了 Flink CDC Connectors 与 Flink ®的版本对应关系:

Flink CDC版本_Flink 版本_
1.0.01.11.*
1.1.01.11.*
1.2.01.12.*
1.3.01.12.*
1.4.01.13.*
2.0.*1.13.*
2.1.*1.13.*
2.2.*1.13.*、1.14.*
2.3.*1.13.*、1.14.*、1.15.*、1.16.0
2.4.*1.13.*、1.14*、1.15.*、1.16.*、1.17.0

4.特征

支持读取数据库快照,即使出现故障也能继续读取binlog,并进行Exactly-once处理。

DataStream API 的 CDC 连接器,用户可以在单个作业中使用多个数据库和表的更改,而无需部署 Debezium 和 Kafka。

Table/SQL API 的 CDC 连接器,用户可以使用 SQL DDL 创建 CDC 源来监视单个表上的更改。

5.表/SQL API 的用法

我们需要几个步骤来使用提供的连接器设置 Flink 集群。

首先我们安装了 1.17+ 版本的 Flink 集群(java 8+)。

注意: 如果需要安装Flink请查看笔者对应的博客 flink高可用集群搭建(Standalone模式)
本文用到的jar包flink-connector-jdbc-3.1.1-1.17.jar和flink-sql-connector-mysql-cdc-2.2.1.jar

下载 连接器 SQL jar (或自行构建)。

将下载的jar包放在FLINK_HOME/lib/.

重启Flink集群。

注意:目前2.4以上版本需要进行自行编译构建。本文笔者自行进行构建上传的

6.使用 Flink CDC 对 MySQL 进行流式 ETL

本教程将展示如何使用 Flink CDC 快速构建 MySQL的流式 ETL。

假设我们将产品数据存储在MySQL中,同步到另外一个MySQL中

在下面的章节中,我们将介绍如何使用 Flink Mysql CDC 来实现它。本教程中的所有练习均在 Flink SQL CLI 中进行,整个过程使用标准 SQL 语法,无需任何 Java/Scala 代码,也无需安装 IDE。

架构概述如下:
在这里插入图片描述

7.环境准备

需要准备安装好的MySQL数据库,具体MySQL数据怎么安装请查看笔者的博客Ubuntu数据库安装(mysql)

注意: 如果是其他操作系统请查看其他博客对应的数据库安装教程

8.在 Flink SQL CLI 中使用 Flink DDL 创建表

使用以下命令启动 Flink SQL CLI:

./bin/sql-client.sh

我们应该看到 CLI 客户端的欢迎屏幕。
在这里插入图片描述首先,每 3 秒启用一次检查点

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

编辑源数据库Flink Sql代码,如下所示:

CREATE TABLE products (
 id INT NOT NULL,
 name STRING,
 description STRING,
 PRIMARY KEY(id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc', #引入的CDC jar包驱动,没有引入会报错提示需要引入
 'hostname' = '192.168.50.163',#源数据库连接host地址,可以根据自己的具体设置,此处为笔者本机的
 'port' = '3306', #源数据库端口
 'username' = 'root',#源数据库账号
 'password' = '*****',#源数据库密码
 'database-name' = 'mydb',#源数据库
 'table-name' = 'products'#源数据库表
);

在Flink SQL 执行以下语句创建从相应数据库表捕获更改数据的表

-- Flink SQL
Flink SQL> CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.50.163',
    'port' = '3306',
    'username' = 'root',
    'password' = '****',
    'database-name' = 'mydb',
    'table-name' = 'products'
  );

编辑目标数据库Flink Sql代码,如下所示:

CREATE TABLE product (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    #引入的jdbc jar包驱动,没有引入会报错提示需要引入 flink-connector-jdbc
    'connector' = 'jdbc',
    #目标数据库连接url地址,可以根据自己的具体设置,此处为笔者本机的。部分高版本的MySQL需要添加useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC
    'url' = 'jdbc:mysql://192.168.50.163:3306/mydb1?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
    #需要访问的数据库驱动
    'driver' = 'com.mysql.cj.jdbc.Driver',
    #目标数据库账号
    'username' = 'root',
    #目标据库密码
    'password' = '***',
    #目标数据库表
    'table-name' = 'product'
  );

在Flink SQL 执行以下语句创建捕获更改数据的表与目标数据库表的映射关系

-- Flink SQL
Flink SQL> CREATE TABLE product (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.50.163:3306/mydb1?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'product'
  );

9.将源数据表加载到目标MySQL

使用Flink SQL将表product与 表查询products表写入目标MySQL。

-- Flink SQL
Flink SQL> insert into product select * from products;

具体操作步骤如下所示:
在这里插入图片描述

这是源数据库,操作添加数据,如下图所示:
在这里插入图片描述
目标数据库同步操作如下图
在这里插入图片描述

10.flink可视化界面查看Running JOBS

红框勾选为运行的同步任务
在这里插入图片描述
至此Flink CDC MySQL同步MySQL第一节讲解完毕,后面会更新其复杂操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/723750.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度学习神经网络学习笔记-论文研读-transformer及代码复现参考

摘要 优势序列转导模型基于复杂的循环或包括一个编码器和一个解码器的卷积神经网络。最好的表现良好的模型还通过attention 连接编码器和解码器机制。我们提出了一种新的简单的网络架构,Transformer, 完全基于注意力机制,省去了递归和卷积完…

在一个呼号前+B1/是啥意思?有人知道吗?

电台呼号有什么意义?呼号指配意义在于,识别各个不同的具体的电台或同一固定电台内使用2个以上频率时(包括两个频率)来识别每个不同频率,另外电台的类别和性质也取决于其呼号的组成形式,所以呼号一经确定&am…

Mycat2 使用教程(三)原始数据导入分库分表【MySQL分库分库分表】

Mycat2 使用教程(三)原始数据导入分库分表【MySQL分库分库分表】 本文主要描述mycat2完成分库分别数据源配置后,将数据导入的过程mysql 分库分表如果是新项目,则不用考虑本文内容mycat2如何配置分库分表?见上文 1.计…

排序链表问题

给你链表的头结点 head ,请将其按 升序 排列并返回 排序后的链表 。 示例 1: 输入:head [4,2,1,3] 输出:[1,2,3,4] 示例 2: 输入:head [-1,5,3,4,0] 输出:[-1,0,3,4,5] 示例 3:…

Hadoop下载安装(物理机)

1、下载Hadoop安装包## http://archive.apache.org/dist/hadoop/common 2、解压安装Hadoop 将hadoop-2-7.4.tar.gz包上传到/root/export/software目录 cd /root/export/software mkdir /root/exprot/servers tar -zxvf hadoop-2.7.4.tar.gz -C /root/export/servers/3、配置…

第三届DeepModeling黑客松竞赛

今年的Hackathon难度梯度设置很广,有偏向硬核开发的,有偏向应用的,还有面向初学者的教学布道赛道!欢迎大家来围观! 参赛链接

【抽奖实现源码】原生js实现简单九方格抽奖实现(附源码下载)

文章目录 写在前面涉及知识效果图1、搭建抽奖页面2、设置抽奖样式1)奖项区块颜色2)开始按钮背景色3)启动初始块颜色 3、编写抽奖功能4、源码下载1) 百度网盘2)123云盘 总结 写在前面 之前在一次线下活动大屏上看到一个…

招商银行、江苏银行争相入局AIGC,“老银行”能否讲出“新故事”?

文 | 新熔财经 作者 | 和花 由ChatGPT引发的“大语言模型热潮”还没有过去。 六月,A股市场ChatGPT概念指数入选后股价涨幅超过20%的就超过30支,涨幅超过50%也有将近20支,像昆仑万维、万兴科技、神州泰岳、汤姆猫等公司,更是借着…

Qt扫盲-QMouseEvent 鼠标事件

QMouseEvent 鼠标事件理论 一、概述二、鼠标事件的传递三、组合修饰符四、鼠标坐标位置五、使用方式 一、概述 当在QWidget窗口内的鼠标按钮被按下或释放,或者鼠标光标被移动时,就会发生鼠标事件。 鼠标按下释放没有什么特殊的,但是鼠标移动…

数据可视化利器:五个常见组件助您洞察数据

数据可视化组件是在数据分析展示中我们离不开的工具,可以帮助我们更直观地理解和分析数据。不同的数据可视化组件适用于不同类型的数据,本文介绍五个常见的可视化组件以及它们适合展示的数据类型。 柱状图(Bar Chart)&#xff1a…

美团太狠:100亿级分库分表,不停机迁移,如何处理?

说在前面 在40岁老架构师 尼恩的读者交流群(50)中,最近有小伙伴拿到了一线互联网企业如腾讯、美团、阿里、拼多多、极兔、有赞、希音的面试资格,遇到一几个很重要的面试题: 数据库如何不停机迁移?100亿级库表,如何不停机迁移&a…

开放式耳机和封闭式耳机有什么区别,那开放式耳机对耳朵伤害大吗?

开放式耳机从字面意思可以理解为:开放耳朵,不需要入耳就可以听见声音的耳机。这种耳机最大的优点就是不压迫不封闭耳道,而且在听耳机音的同时能够及时注意到周围环境的声音,从而可以避免到一些安全事故的发生。 第一点&#xff1a…

台灯选a级还是aa级?科普a级台灯和aa级台灯的区别

我们经常能听到a级台灯aa级台灯,那么里面所说的a级、aa级到底是什么意思呢?其实这里所说的是国A级照度和国AA级照度标准的台灯,是根据国家颁布的《读写作业台灯性能要求》区别的,只有符合其标准的台灯才能被称为护眼台灯&#xff…

学习笔记整理-UML建模与应用复习3-动态模型

动态模型是用来描述系统的动态行为,分为状态模型和交互模型。 其中状态模型分为状态图和活动图;交互模型分为时序图和协作图。 一、交互模型 时序图是描述信息交换时的时间顺序,而协作图是描述系统对象之间共同完成系统功能的要求。 1、时序…

【网络技术】TCP详解

1 TCP是什么 TCP是Transmission Control Protocol的缩写,即传输控制协议。 TCP是一种面向连接的、可靠的、基于字节流的传输协议,是互联网通信协议TCP/IP中的一个重要组成部分。 2 三次握手 三次握手的过程可以用以下图示表示: 2.1 详细介绍…

php命令执行漏洞加固

首先需要去看一下命令执行漏洞的代码: 就像这样,我们可以利用管道符来进行执行一些命令。 我们在网站中渗透一下: 配合上管道符,我们就可以进行执行命令。 这样的话我们总么进行加固呢,我们可以利用if语句来进行加固…

Hadoop启动后没有NameNode进程,日志报ulimit -a for user root

环境:CentOS7、Hadoop2.6.4 背景:安装后启动正常,一段时间在来启动发生这种情况。 现象: 启动后没有NameNode进程,于是查看日志,日志显示ulimit -a for user root,潜意识还以为文件不能读。&a…

期货反向跟单系统能精准捕捉买卖点嘛?什么人不适合做期货反向跟单?

期货反向跟单,就是指在期货市场上,根据某些大户或者专业机构的交易动向,采取与之相反的操作策略,以期获得利润。例如,当发现某个大户在买入某种期货合约时,就立即卖出同样数量的合约,反之亦然。…

MQTT协议学习

前言 最近在学习mqtt协议,看的是官方英文版的,写这篇博客就是为了将一些关键内容提取出来,以便日后的查询和复习,有需要的可以参考。官方的文档在这: MQTT Essentials - All Core Concepts explained (hivemq.com) …

svn下载及使用

下载 链接:https://pan.baidu.com/s/1RV6vAujA1anHhXJuKbItUQ?pwdflzx 提取码:flzx 这是svn安装包以及中文安装包,放在百度网盘了,有需要可以下载 注意:svn中文语言包安装的版本需要和svn的版本一致 他有两种安装…