FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~

news2025/1/17 13:54:59

本文介绍了不同源单表-单表同步,不同源多表-单表同步。

Flink版本:1.16

环境:Linux CentOS 7.0、jdk1.8

基础文件:flink-1.16.2-bin-scala_2.12.tgz、flink-connector-jdbc-3.0.0-1.16.jar、flink-sql-connector-mysql-cdc-2.3.0.jar
安装Flink步骤详见文章第二篇

支持的mysql版本: 

一、 数据源ip为***.50的源表,同步数据到数据源ip为***.134的目标表中,需要以下几个步骤:

1. 启动flink服务:

[root@localhost bin]#  ./start-cluster.sh

2. 停止flink服务:

[root@localhost bin]#  ./stop-cluster.sh

3. 启动FinkSQL:

[root@localhost bin]# ./sql-client.sh

4. 编写FlinkSql,创建临时表和job:

FlinkSql与mysql字段的类型映射

 把写好的Sql粘贴到FlinkSql客户端命令行中,分号'  ;  '是语句结束标识符,按回车创建:

 创建来源表结构:

来源表链接类型为'connector' = 'mysql-cdc'

Flink SQL> CREATE TABLE source_alarminfo51 (
>   id STRING NOT NULL,
>   AlarmTypeID STRING,
>   `Time` timestamp,
>   PRIMARY KEY (`id`) NOT ENFORCED
>  ) WITH (
>     'connector' = 'mysql-cdc',
>     'hostname' = '***',
>     'port' = '3306',
>     'username' = '***',
>     'password' = '***',
>     'database-name' = 'alarm',
>     'server-time-zone' = 'Asia/Shanghai',
>     'table-name' = 'alarminfo'
>  );

[INFO] Execute statement succeed.

 创建目标表结构(目标表结构可比来源表字段多,可使用视图指定字段默认值):

目标表链接类型为'connector' = 'jdbc',注意url需要跟后面以下属性值

Flink SQL> CREATE TABLE target_alarminfo134 (
>   id STRING NOT NULL,
>   AlarmTypeID STRING,
>   `Time` timestamp,
>   sourceLine int,
>   PRIMARY KEY (`id`) NOT ENFORCED
>  ) WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:mysql://***:3306/alarm?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
>     'username' = '***',
>     'password' = '****',
>     'table-name' = 'alarminfo',
>     'driver' = 'com.mysql.cj.jdbc.Driver',
>     'scan.fetch-size' = '1000'
>  );

[INFO] Execute statement succeed.

 'scan.fetch-size' = '1000'  的含义:

 在 Flink SQL 中,scan.fetch-size 属性用于配置批处理查询中的每次批量获取记录的大小。具体地说,它指定了每次从数据源读取的记录数。

如何设置:

以下是一些建议:
考虑数据源的吞吐量:如果你的数据源的吞吐量较高,网络延迟较低,可以适当增大 scan.fetch-size 的值,以减少网络往返次数和请求开销。
考虑网络环境和带宽限制:如果数据源位于远程服务器或网络环境较差,可以选择适当较小的 fetch size 值,以减少网络传输的负载,避免出现大量的网络超时和传输失败情况。
考虑内存开销:fetch size 值过大可能会占用较多的内存资源,特别是对于批处理查询。如果你的查询涉及大量的中间状态(intermediate state)或内存密集型操作,可以选择适当较小的 fetch size 值。
一般来说,可以先尝试将 scan.fetch-size 设置为一个较默认的值,例如 1000 或 5000。然后观察任务的性能和执行效果,根据实际情况进行微调。可以根据实际性能测试和系统资源情况,逐步调整 fetch size 值,以找到性能和资源利用的平衡点。
需要注意的是,scan.fetch-size 属性值是一个相对的配置,不同的数据源和查询场景可能有不同的最佳值。因此,针对具体的数据源和查询条件,最好进行一些实际的性能测试和调优,以获得最优的性能和资源使用。

 最后创建同步关系:

INSERT INTO target_alarminfo134 SELECT *,50 AS sourceLine FROM source_alarminfo50

 若目标表比源表结构少字段属性则执行完同步关系后如下:

创建完表结构可使用下列语句查看和删除:

查看表:show tables;

删除表:drop table if exists  target_alarminfo; 

flink-UI页面效果:

 

数据同步效果:

源表:

目标表数据:首次数据全量,后面数据变更增量 

二、 数据源ip为***.50、***.51的两个源表,同步数据到数据源ip为***.134的目标表中,使用sourceLine 用于区分数据来源,需要以下几个步骤:

 1. 创建自定义初始化脚本文件 init.sql、flinkSqlInit.sql,flinkSqlInit.sql文件中包含了在FlinkSql中需要执行的语句,用于自动化创建临时表和视图,这两个放在flink的bin目录下:

init.sql内容如下:

SET execution.runtime-mode=streaming;
SET pipeline.name=my_flink_job;
SET parallism.default=4;

 SET execution.runtime-mode=streaming 设置了作业的运行模式为流处理模式。这表示作业将以流处理的方式运行,即实时处理每个输入事件,并根据输入数据的到达顺序进行处理。

SET pipeline.name=my_flink_job 设置了作业的流水线名称为 "my_flink_job"。流水线名称主要用于标识作业,以便在运行时进行管理和监控。

SET parallelism.default=4 设置了作业的默认并行度为 4。并行度表示同时执行作业任务的任务数量。通过设置并行度,可以控制作业在集群上使用的资源量和执行的并行度。默认并行度将应用于作业的所有算子,除非为某个算子单独指定了并行度。

这些设置属性可以在 Flink 的初始化脚本中使用,并在作业启动时生效。可以根据作业的需求和资源情况调整这些属性,以获得最佳的性能和资源利用率。

注:mysql-cdc和jdbc的区别:mysql-cdc 标注 数据来源的表,jdbc标注 同步到的目标表

flinkSqlInit.sql内容如下:

SET execution.checkpointing.interval = 60s;
drop table if exists  source_alarminfo50;
CREATE TABLE source_alarminfo50 (
  id STRING NOT NULL,
  AlarmTypeID STRING,
  `Time` timestamp,
  PRIMARY KEY (`id`) NOT ENFORCED
 ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '**',
    'port' = '3306',
    'username' = '**',
    'password' = '**',
    'database-name' = 'alarm',
    'server-time-zone' = 'Asia/Shanghai',
    'table-name' = 'alarminfo'
 );
drop table if exists  source_alarminfo51;
CREATE TABLE source_alarminfo51 (
  id STRING NOT NULL,
  AlarmTypeID STRING,
  `Time` timestamp,
  PRIMARY KEY (`id`) NOT ENFORCED
 ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '**',
    'port' = '3306',
    'username' = '**',
    'password' = '**',
    'database-name' = 'alarm',
    'server-time-zone' = 'Asia/Shanghai',
    'table-name' = 'alarminfo'
 );
drop table if exists  target_alarminfo134;
CREATE TABLE target_alarminfo134 (
  id STRING NOT NULL,
  AlarmTypeID STRING,
  `Time` timestamp,
  sourceLine int,
  PRIMARY KEY (`id`) NOT ENFORCED
 ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://***:3306/alarm?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
    'username' = '**',
    'password' = '**',
    'table-name' = 'alarminfo',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'scan.fetch-size' = '200'
 );

BEGIN STATEMENT SET;

INSERT INTO target_alarminfo134
SELECT *,50 AS sourceLine FROM source_alarminfo50
UNION ALL
SELECT *,51 AS sourceLine FROM source_alarminfo51;

END;

其中涉及flinksql的语法:

BEGIN STATEMENT SET 是 Flink SQL 中的一个特殊语法,用于将一组 SQL 语句作为一个事务进行处理。它用于将多个 SQL 语句作为一个原子操作执行,要么全部成功提交,要么全部回滚。
在 Flink SQL 中,可以使用 BEGIN STATEMENT SET 将多个 SQL 语句组合成一个事务,以确保这些语句的原子性。
以下是 BEGIN STATEMENT SET 的使用示例:
BEGIN STATEMENT SET;
-- SQL 语句 1
-- SQL 语句 2
-- ...
COMMIT;
在上述示例中,BEGIN STATEMENT SET 表示事务的开始,COMMIT 表示事务的提交。你可以在 BEGIN STATEMENT SET 和 COMMIT 之间编写需要执行的多个 SQL 语句。
如果在 BEGIN STATEMENT SET 和 COMMIT 之间的任何一条语句执行失败,整个事务将回滚,即已经执行的语句会被撤销。
需要注意的是,BEGIN STATEMENT SET 和 COMMIT 语句是 Flink SQL 的扩展语法,它们可能在某些特定的 Flink 版本或环境中才可用。在使用时,请确保你的 Flink 版本和环境支持该语法。 

检查点间隔设置:
SET execution.checkpointing.interval = 60s;
通过设置适当的检查点间隔,可以在容忍一定故障的同时,控制检查点的频率和资源使用。较短的检查点间隔可以提供更高的容错性,但也会增加系统开销。
检查点是 Flink 中用于实现容错性的机制,它会定期将作业的状态保存到持久化存储中,以便在发生故障时进行恢复。检查点间隔定义了两个连续检查点之间的时间间隔。 

 2. 重启Flink服务:

停止flink服务:

[root@localhost bin]#  ./stop-cluster.sh

启动flink服务:

[root@localhost bin]#  ./start-cluster.sh

启动FinkSQL:

[root@localhost bin]# ./sql-client.sh

 3.1 在flink的bin目录下执行初始化文件flinkSqlInit.sql:

有两种方式:

方式一:可设置job名称及资源参数配置

[root@localhost bin]#  ./sql-client.sh -i init.sql -f flinkSqlInit.sql  

 使用这个语句的好处是可以根据作业的需求和资源情况调整这些属性,以获得最佳的性能和资源利用率。

flink-UI页面效果:

 

方式二:不可设置job名称及资源参数配置

[root@localhost bin]#  ./sql-client.sh -f  flinkSqlInit.sql  

 

 4. 数据同步效果:

三、源表、目标表结构with下的属性介绍:

源表with下的属性:

chunk-key.even-distribution.factor.lower-bound:块键(Chunk Key)的均匀分布因子下限。

chunk-key.even-distribution.factor.upper-bound:块键的均匀分布因子上限。

chunk-meta.group.size:块元数据的分组大小。

connect.max-retries:连接重试的最大次数。

connect.timeout:连接的超时时间。

connection.pool.size:连接池的大小。

connector:使用的连接器的名称。

database-name:数据库的名称。

heartbeat.interval:心跳间隔时间。

hostname:主机名或 IP 地址。

password:连接到数据库或其他系统所需的密码。

port:连接的端口号。

property-version:属性版本。

scan.incremental.snapshot.chunk.key-column:增量快照的块键列。

scan.incremental.snapshot.chunk.size:增量快照的块大小。

scan.incremental.snapshot.enabled:是否启用增量快照。

scan.newly-added-table.enabled:是否启用新加入表的扫描。

scan.snapshot.fetch.size:从状态快照中获取的每次批量记录数。

scan.startup.mode:扫描启动模式。

scan.startup.specific-offset.file:指定启动位置的文件名。

scan.startup.specific-offset.gtid-set:指定启动位置的 GTID 集合。

scan.startup.specific-offset.pos:指定启动位置的二进制日志位置。

scan.startup.specific-offset.skip-events:跳过的事件数量。

scan.startup.specific-offset.skip-rows:跳过的行数。

scan.startup.timestamp-millis:指定启动时间戳(毫秒)。

server-id:服务器 ID。

server-time-zone:服务器时区。

split-key.even-distribution.factor.lower-bound:切分键(Split Key)的均匀分布因子下限。

split-key.even-distribution.factor.upper-bound:切分键的均匀分布因子上限。

table-name:表名。

username:连接到数据库或其他系统所需的用户名。

Sink目标表with下的属性:

connection.max-retry-timeout:连接重试的最大超时时间。

connector:使用的连接器的名称。

driver:JDBC 连接器中使用的数据库驱动程序的类名。

lookup.cache:查找表的缓存配置。

lookup.cache.caching-missing-key:是否缓存查找表中的缺失键。

lookup.cache.max-rows:查找表缓存中允许的最大行数。

lookup.cache.ttl:查找表缓存中行的生存时间。

lookup.max-retries:查找操作的最大重试次数。

lookup.partial-cache.cache-missing-key:是否缓存查找表部分缺失的键。

lookup.partial-cache.expire-after-access:查找表部分缓存中行的访问到期时间。

lookup.partial-cache.expire-after-write:查找表部分缓存中行的写入到期时间。

lookup.partial-cache.max-rows:查找表部分缓存中允许的最大行数。

password:连接到数据库或其他系统所需的密码。

property-version:属性版本。

scan.auto-commit:是否自动提交扫描操作。

scan.fetch-size:每次批量获取记录的大小。

scan.partition.column:用于分区的列名。

scan.partition.lower-bound:分区的下限值。

scan.partition.num:要扫描的分区数量。

scan.partition.upper-bound:分区的上限值。

sink.buffer-flush.interval:将缓冲区的数据刷新到目标系统的时间间隔。

sink.buffer-flush.max-rows:缓冲区中的最大行数,达到此值时将刷新数据。

sink.max-retries:写入操作的最大重试次数。

sink.parallelism:写入任务的并行度。

table-name:表名。

url:连接到数据库或其他系统的 URL。

username:连接到数据库或其他系统所需的用户名。

 

 最后FlinkCDC目前不支持整库同步:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/724008.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

手机也能做3D建模人物,你知道吗?

3D建模人物是当今设计界越来越流行的一个领域。3D建模人物可以指的是使用3D技术来建立和表现各种类型的人物模型。无论是在游戏设计、电影特效还是虚拟现实等领域,3D建模人物都扮演着重要的角色。 与传统的手工绘图相比,现代设计师可以使用各种3D建模软…

全网最细,Fiddler抓包实战教程-辅助接口测试(三)

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 AutoResponder 请…

计算机丢失msvcr110.dll解决办法,那种更加简单

"msvcr110.dll" 是一个动态链接库文件,用于支持 Microsoft Visual C 运行时库(Runtime Library)版本 11.0。它包含了在 Visual C 程序中使用的函数和变量。当一个程序编译完成后,它仍然需要一些运行时库来在操作系统上运…

LLM模型中英文评测基准

文章目录 中文评测基准C-EvalGaokaoAGIEvalPromptCBLUE 英文评测基准MMLUOpen LLM Leaderboard 中文评测基准 Awesome-Chinese-LLM:https://github.com/HqWu-HITCS/Awesome-Chinese-LLM 该项目收集和梳理中文LLM相关的开源模型、应用、数据集及教程等资料&#xff…

软件测试常用设计模式

设计模式的重要原则就是:高内聚、低耦合;通常程序结构中各模块的内聚程度越高,模块间的耦合程度就越低。 数据驱动测试:Data Driven Testing,简称DDT; 数据驱动指的是从数据文件(如数据库、Ex…

RabbitMQ-基础学习

在虚拟机上安装Erlang的GCC环境,装erlong,然后安装rabbitmq 参考:安装说明链接 安装web端面板 创建交换机 先学习一下工作模式(详细介绍可见官网) 上代码 1.Hello Word模式 写在测试类中: Providucer T…

3 springboot更改tomcat的端口和启动时的banner

3.1 更改tomcat端口 点击resources下的application.properties。 然后,添加以下信息,即可把端口号更改为8081。 # 更改项目的端口号 server.port80813.2 更改启动时的banner 首先,进入网站:https://www.bootschool.net/ascii-art…

git切换账户问题

之前一直用另一个github账户提交代码 今天新创建了一个github账户 用这个账户git项目修改后,push时有问题 1 先执行下面命令,切换了用户 git config --local user.name “xxx” git config --local user.email “xxx” 执行 git config user.name 查看…

电脑端anconda的安装和配置

1.下载官网Anaconda | The World’s Most Popular Data Science Platform 1.1如果上述不行就去清华园源下载镜像Index of / 2.点击下载软件安装,按照图的安装步骤就可以了 安装完毕后点击next就可以了 3.测试是否安装配置成功 WINR键调出运行窗口,输入…

AI免费写作

随着科技的不断发展,人工智能(AI)正逐渐渗透进各个领域,包括以前我们认为只有人类才能胜任的创作型任务——写作。本文将通过深入浅出的方式,为大家剖析AI写作的具体运作机制,并结合案例,带大家一起探索AI写作的无穷可…

方向盘脱手检测原理及主流方案

随着高阶辅助驾驶逐渐普及,逐渐从驾驶员驾驶过渡到人机共驾最终到自动驾驶。而目前阶段受限于技术以及发规等,主要还是人机共驾,由于车辆是辅助人来进行驾驶,因此驾驶员还需要起到主要的监测作用,此时对驾驶员的监控变…

凝心聚力,奋楫启程—易我文化系列课《战略方向定位》讲座圆满举行

易我文化系列课自开课以来,受到了易我员工的一致好评和热烈欢迎。2023年6月20日,易我文化系列课再次发力,《战略方向定位》讲座如期举行,并且取得了圆满的成功。 本次讲座特别邀请易我总经理——万建华先生为大家授课&#xff0c…

Github Pages 快速搭建个人网站教程

官方教程&#xff1a;https://pages.github.com/ 1 创建仓库 命名为 你的名字.github.io 克隆项目 git clone https://github.com/username/username.github.io加入index.html页面 在克隆的项目中&#xff0c;加入一个index.html html文件简单写几个dom <!DOCTYPE html…

最牛,python接口自动化测试-fixtures固件使用详细(实战)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 全局设置请求头部…

2023年7月实时获取地图边界数据方法,省市区县街道多级联动【附实时geoJson数据下载】

首先&#xff0c;来看下效果图 在线体验地址&#xff1a;https://geojson.hxkj.vip&#xff0c;并提供实时geoJson数据文件下载 可下载的数据包含省级geojson行政边界数据、市级geojson行政边界数据、区/县级geojson行政边界数据、省市区县街道行政编码四级联动数据&#xff0…

@Data失效 Lombok使用与失效

Data失效 1注入pom </dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency>2下载插件&#xf…

极速冲浪:影视网站推荐

在快节奏的现代生活中&#xff0c;影视娱乐成为了人们放松和娱乐的重要方式。随着高速互联网的普及&#xff0c;极速冲浪在各种影视网站上成为了我们追逐电影、剧集和综艺节目的常态。这些影视网站不仅提供了海量的内容资源&#xff0c;还通过便捷的在线观看和下载功能&#xf…

Go语言程序设计(五)切片

一、切片的定义 在Go语言中,切片(Slice)是数组的一个引用,它会生成一个指向数组的指针,并通过切片长度关联到底层数组部分或者全部元素。切片还提供了一系列对数组的管理功能(append、copy)&#xff0c;可以随时动态扩充存储空间&#xff0c;并且可以被随意传递而不会导致所管理…

[RapidVideOCR周边] RapidVideOCR初级教程(界面版 下载解压即可使用)

引言 考虑到提取视频字幕的小伙伴大多不是程序员行当&#xff0c;为了降低使用门槛&#xff0c;特此推出界面版的RapidVideOCR Desktop.RapidVideOCR Desktop需要搭配VideoSubFinder使用。它们两个关系如下图所示&#xff1a; #mermaid-svg-keuknVOG1YkfjOkw {font-family:&qu…

在Linux中部署Ansible

Ansible是自动化运维工具&#xff0c;基于模块化工作&#xff0c;本身没有批量部署的能力。 Ansible只是提供一种框架&#xff0c;Ansible运行的模块才有批量部署的能力。 Ansible使用SSH协议对设备进行管理&#xff0c;只需在主控端部署Ansible环境&#xff0c;被控端无需做…