Flink CDC 实时mysql到mysql

news2025/1/5 8:41:46

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。

mysqlcdc需要mysql开启binlog,找到my.cnf,在[mysqld]中加入如下信息

[mysqld]

server-id=1

log-bin=mysql-bin

binlog-format=row

重启数据库。

2.创建springboot项目,pom添加依赖

<properties>
<java.version>1.8</java.version>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.13.6</version>
<!-- <scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.17</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.1</version>
</dependency>

</dependencies>

<build>
<plugins>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Flink cdc实现mysql到mysql代码

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkMysqlToMysql {

public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// 注册源表和目标表
tEnv.executeSql("create table sourceTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc
"'connector' = 'mysql-cdc'," +
"'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'database-name' = 'quarant_db',\n" +
" 'table-name' = 'organization_info',\n" +
" 'username' = 'root',\n" +
" 'password' = 'admin'\n" +
")");
// Table result = tEnv.sqlQuery("SELECT id, name,card_num,phone,address FROM quarantine");
// tEnv.registerTable("sourceTable",result);
tEnv.executeSql("create table targetTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +
" 'table-name' = 'organization_info',\n" +
" 'username' = 'root',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'password' = 'admin'\n" +
")");
// 执行CDC过程
String query = "INSERT INTO targetTable SELECT * FROM sourceTable";
tEnv.executeSql(query).print();
}
}

运行Main方法

Flink会同步源表数据到目标表,后续源表的增删改都会实时同步至目标表中。

3.将程序打包成flink jar

idea使用快捷键control+alt+shift+s,点击Artifacts->JAR

 选择Main class,点击ok

 然后选择上面菜单栏Build Artifacts

 点击build

 生成的jar在项目目录下面有个out目录

至此,flink jar程序就写好了,可以把jar丢到flink上运行了 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/627999.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TCP报文段结构

TCP报文段结构 源端口号和目的端口号&#xff1a;含义从名字就能看出来。 序号和确认号&#xff1a;这二个字段被 TCP 发送方和接收方用来实现可靠数据传输服务&#xff0c;每个字段都是32比特。 接收窗口&#xff1a;该字段用于流量控制&#xff0c;大小为16比特。 首部长度…

VTK学习之vtkProp

vtkProp。渲染场景中数据的可视表达&#xff08;Visible Depictions&#xff09;是由vtkProp的子类负责。 也就是说&#xff0c;数据想要进行可视化显示&#xff0c;需要一个转换过程&#xff0c;这个过程就是转换为vtkProp 这样才能进行渲染展示出来。 而vtkProp子类是vtkA…

一阶电路和二阶电路的时域分析(1)——“电路分析”

小雅兰期末加油冲冲冲&#xff01;&#xff01;&#xff01; 动态电路的方程及其初始条件 动态电路&#xff0c;物理学名词&#xff0c;是指含有储能元件L、C的电路&#xff0c;动态电路方程的阶数通常等于电路中动态元件的个数。 动态电路是指含有储能元件的电路。当动态电路状…

openpnp - 底部相机矫正(subject not found)的原因总结

文章目录 openpnp - 底部相机矫正(subject not found)的原因总结概述问题的由来相机的选择相机焦距的选择相机初始安装距离位置的选择相机安装支柱接触面过大会影响相机模组PCB的安装相机支柱的绝缘问题安装相机模组时的平整度问题相机轴垂直度的问题相机成像时间矫正时的Z轴位…

全球顶尖科学家陈松蹊院士出任百分点数据科学研究院名誉院长

近日&#xff0c;百分点科技正式宣布聘请北京大学数学科学学院、光华管理学院教授&#xff0c;中国科学院院士陈松蹊担任百分点数据科学研究院名誉院长。公司将以此深化布局数据科学领域&#xff0c;助推数字中国建设。 全球顶尖科学家 陈松蹊院士主要从事超高维大数据统计分析…

卷S人的Java岗!全靠这份1000页的面试手册,拿了28K的offer

大家好&#xff0c;最近有不少朋友给鄙人留言&#xff0c;说今年面试实在是太卷了&#xff0c;不知道从何下手&#xff01; 不论是跳槽涨薪&#xff0c;还是学习提升&#xff01;先给自己定一个小目标&#xff0c;然后再朝着目标去努力就完事儿了&#xff01;为了帮大家节约时…

JVM内存结构及程序执行的内存分析过程

一. JVM内存结构 1. JVM的内存结构大概分为 堆&#xff08;Heap&#xff09; 线程共享。所有的对象实例以及数组都要在堆上分配。回收器主要管理的对象。 方法区&#xff08;Method Area&#xff09; 线程共享。存储类信息、常量、静态变量、即时编译器编译后的代码。 方…

机器学习-12 卷积神经网络简介

卷积神经网络 引言深度学习发展历程深度应用领域深度学习vs传统机器学习深度神经网络vs浅层神经网络深度学习概述 卷积神经网络CNNBP神经网络CNN概述卷积神经网络大致结构卷积神经网络大致过程 局部连接权值共享非线性映射ReLU&#xff08;Rectified Linear Units&#xff09;池…

盘点一下架构师主流的画图工具(附地址)

盘点一下架构师主流的画图工具&#xff08;附地址&#xff09; 转发我个人微信公众号的内容&#xff0c;后续优先公众号。 一、文章来源 写这篇文章的目的是做个关于常用画图工具的总结。 起源是在架构组时为了降低沟通成本和提高作战效率&#xff0c;我们频繁用图交流&…

13. 精灵动画Sprite和SpriteSequence的基本使用

1. 说明&#xff1a; 在unity二维游戏开发中&#xff0c;有一种精灵类的玩家角色&#xff0c;通过一系列动作的静态图片可以合成该精灵的某一个动作。在QML当中也有一个控件可以实现这种精灵类动画的制作&#xff0c;主要使用到三个控件&#xff1a;Sprite和SpriteSequence和A…

一文解释python中的实例方法,类方法和静态方法作用和区别是啥?该如何使用

我们都知道 &#xff0c;python类中有三种常见的方法 &#xff0c;分别是实例方法 &#xff0c;类方法和静态方法 。那么这几个方法到底有什么作用 &#xff1f; 它们之间有什么区别 &#xff1f;该如何使用 &#xff1f; 带着这些问题 &#xff0c;下面我们就来了解下这三种方…

windows平台python脚本执行环境搭建笔记

1.python脚本环境下载 这里是原始发布源&#xff1a; https://www.python.org/downloads/release/python-3114/https://www.python.org/downloads/release/python-3114/安装时记得添加进系统path&#xff0c;这样你可以随时调用python环境。 2.扩展模块的安装 step1.找到py…

35款优秀的 SpringBoot/SpringCloud 开源项目,开发脚手架,总有一款适合你...

简介 SpringBoot 是一个非常流行的 Java 框架&#xff0c;它可以帮助开发者快速构建应用程序。他不仅继承了 Spring 框架原有的优秀特性&#xff0c;而且还通过简化配置来进一步简化了 Spring 应用的整个搭建和开发过程。 最近&#xff0c;小编蹲点各大开源网站、社区等&#x…

iOS App的打包和上架流程

转载&#xff1a;iOS App的打包和上架流程 - 掘金 1. 创建账号 苹果开发者账号几种开发者账号类型 个人开发者账号 费用&#xff1a;99 美元/年&#xff08;688.00元&#xff09;协作人数&#xff1a;仅限开发者自己不需要填写公司的邓百氏编码&#xff08; D-U-N-S Number…

源码分析spring容器启动销毁资源

文章目录 一、InitializingBean二、SmartInitializingSingleton三、PostConstruct四、DisposableBean五、PreDestroy六、BeanPostProcessor七、ApplicationContextAware八、Bean初始化销毁过程 spring项目启动时&#xff0c;在bean的生命周期内&#xff0c;可以添加一些前置、后…

ATE测试工程师的前景怎么样?能转DFT工程师吗?

最近后台不少同学私信想要咨询ATE这个岗位&#xff0c;想了解这个岗位的薪资&#xff0c;前景&#xff0c;以及相关的技能&#xff0c;下面就来一起了解一下~ 什么是ATE&#xff1f; ATE是&#xff08;Automatic Test Equipment&#xff09;的缩写&#xff0c; 于半导体产业意…

OOD 使用基于提示的特征映射生成用于视频异常检测

paper link 本文提出了使用提示引导特征映射的生成式视频异常检测框架&#xff0c;作者来自中山大学&#xff0c;文章发表在cvpr2023 作者首先分析了现有方法并指出当前面临的两个问题 两个关键挑战 大多数视频异常检测方法通过在训练阶段学习正常事件的分布,并在测试阶段检…

别找了!前端那些好用的网站都在这里了!【文末送书】

&#x1f340;前言 好用的网站千千万万&#xff0c;如果你还发现好用的网站&#xff0c;欢迎在评论区中留言分享&#x1f601;&#xff0c;赠书活动在文末哟&#xff0c;中奖者可以从给出的五本书中任意挑选自己喜欢的那本 文章目录 &#x1f340;前言 &#x1f340;一、渐变…

简化 Hello World:Java 新写法要来了

OpenJDK 的 JEP 445 提案正在努力简化 Java 的入门难度。 这个提案主要是引入 “灵活的 Main 方法和匿名 Main 类” &#xff0c;希望 Java 的学习过程能更平滑&#xff0c;让学生和初学者能更好地接受 Java 。 提案的作者 Ron Pressler 解释&#xff1a;现在的 Java 语言非常…

快速上手Flutter

目录 一、Flutter介绍 1.高效开发 2.优异的性能 3.较低的开发成本 4.社区活跃 二、Flutter使用 1.Dart 语言 2.什么是Dart语言 3.Flutter 组件库 4.Layout 布局 5.Flutter 工具 6.Flutter社区 三、Flutter使用技巧 四、总结 一、Flutter介绍 Flutter是谷歌的移动…