一文弄懂Flink CDC

news2024/11/22 14:49:22

文章目录

      • 1.CDC概述
      • 2.CDC 的实现原理
      • 3.为什么选 Flink
      • 4.支持的连接器
      • 5.支持的 Flink 版本
      • 6.Flink CDC特性
      • 7.用法实例
        • 7.1DataStream API 的用法(推荐)
        • 7.2Table/SQL API的用法

1.CDC概述

CDC(Change Data Capture)是一种用于捕获和处理数据源中的变化的技术。它允许实时地监视数据库或数据流中发生的数据变动,并将这些变动抽取出来,以便进行进一步的处理和分析。

传统上,数据源的变化通常通过周期性地轮询整个数据集进行检查来实现。但是,这种轮询的方式效率低下且不能实时反应变化。而 CDC 技术则通过在数据源上设置一种机制,使得变化的数据可以被实时捕获并传递给下游处理系统,从而实现了实时的数据变动监控。

Flink 作为一个强大的流式计算引擎,提供了内置的 CDC 功能,能够连接到各种数据源(如数据库、消息队列等),捕获其中的数据变化,并进行灵活的实时处理和分析。

通过使用 Flink CDC,我们可以轻松地构建实时数据管道,对数据变动进行实时响应和处理,为实时分析、实时报表和实时决策等场景提供强大的支持。

Flink_CDC

2.CDC 的实现原理

通常来讲,CDC 分为主动查询和事件接收两种技术实现模式。对于主动查询而言,用户通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动,是否需要同步。这种方式优点是不涉及数据库底层特性,实现比较通用;缺点是要对业务表做改造,且实时性不高,不能确保跟踪到所有的变更记录,且持续的频繁查询对数据库的压力较大。事件接收模式可以通过触发器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)来实现。当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。这种方式的优点是实时性高,可以精确捕捉上游的各种变动;缺点是部署数据库的事件接收和解析器(例如 Debezium、Canal 等),有一定的学习和运维成本,对一些冷门的数据库支持不够。综合来看,事件接收模式整体在实时性、吞吐量方面占优,如果数据源是 MySQL、PostgreSQL、MongoDB 等常见的数据库实现,建议使用Debezium来实现变更数据的捕获(下图来自Debezium 官方文档如果使用的只有 MySQL,则还可以用Canal。

在这里插入图片描述

3.为什么选 Flink

从上图可以看到,Debezium 官方架构图中,是通过 Kafka Streams 直接实现的 CDC 功能。而我们这里更建议使用 Flink CDC 模块,因为 Flink 相对 Kafka Streams 而言,有如下优势:

  • 强大的流处理引擎: Flink 是一个强大的流处理引擎,具备高吞吐量、低延迟、Exactly-Once 语义等特性。它通过基于事件时间的处理模型,支持准确和有序的数据处理,适用于实时数据处理和分析场景。这使得 Flink 成为实现 CDC 的理想选择。

  • 内置的 CDC 功能: Flink 提供了内置的 CDC 功能,可以直接连接到各种数据源,捕获数据变化,并将其作为数据流进行处理。这消除了我们自行开发或集成 CDC 解决方案的需要,使得实现 CDC 变得更加简单和高效。

  • 多种数据源的支持: Flink CDC 支持与各种数据源进行集成,如关系型数据库(如MySQL、PostgreSQL)、消息队列(如Kafka、RabbitMQ)、文件系统等。这意味着无论你的数据存储在哪里,Flink 都能够轻松地捕获其中的数据变化,并进行进一步的实时处理和分析。

  • 灵活的数据处理能力: Flink 提供了灵活且强大的数据处理能力,可以通过编写自定义的转换函数、处理函数等来对 CDC 数据进行各种实时计算和分析。同时,Flink 还集成了 SQL 和 Table API,为用户提供了使用 SQL 查询语句或 Table API 进行简单查询和分析的方式。

  • 完善的生态系统: Flink 拥有活跃的社区和庞大的生态系统,这意味着你可以轻松地获取到丰富的文档、教程、示例代码和解决方案。此外,Flink 还与其他流行的开源项目(如Apache Kafka、Elasticsearch)深度集成,提供了更多的功能和灵活性。

4.支持的连接器

连接器数据库Driver
mongodb-cdcMongoDB: 3.6, 4.x, 5.0MongoDB Driver: 4.3.4
mysql-cdcMySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
PolarDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1
JDBC Driver: 8.0.28
oceanbase-cdcOceanBase CE: 3.1.x, 4.x
OceanBase EE: 2.x, 3.x, 4.x
OceanBase Driver: 2.4.x
oracle-cdcOracle: 11, 12, 19, 21Oracle Driver: 19.3.0.0
postgres-cdcPostgreSQL: 9.6, 10, 11, 12, 13, 14JDBC Driver: 42.5.1
sqlserver-cdcSqlserver: 2012, 2014, 2016, 2017, 2019JDBC Driver: 9.4.1.jre8
tidb-cdcTiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0JDBC Driver: 8.0.27
db2-cdcDb2: 11.5Db2 Driver: 11.5.0.0
vitess-cdcVitess: 8.0.x, 9.0.xMySql JDBC Driver: 8.0.26

5.支持的 Flink 版本

Flink CDC版本Flink 版本_
1.0.01.11.*
1.1.01.11.*
1.2.01.12.*
1.3.01.12.*
1.4.01.13.*
2.0.*1.13.*
2.1.*1.13.*
2.2.*1.13.* , 1.14.*
2.3.*1.13.* , 1.14.* , 1.15.* , 1.16.0
2.4.*1.13.* , 1.14.* , 1.15.* , 1.16.* , 1.17.0

6.Flink CDC特性

  1. 支持读取数据库快照,即使出现故障也能继续读取binlog,并进行Exactly-once处理
  2. DataStream API 的 CDC 连接器,用户可以在单个作业中使用多个数据库和表的更改,而无需部署 Debezium 和 Kafka
  3. Table/SQL API 的 CDC 连接器,用户可以使用 SQL DDL 创建 CDC 源来监视单个表上的更改

下表显示了连接器的当前特性:

连接器无锁读并行读一次性语义读增量快照读
MongoDB-CDC
mysql-cdc
Oracle-CDC
Postgres-CDC
sqlserver-cdc
Oceanbase-CDC
TiDB-CDC
db2-cdc
vitess-cdc

7.用法实例

7.1DataStream API 的用法(推荐)

请严格按照上面的《5.支持的 Flink 版本》搭配来使用Flink CDC

<properties>
	<flink.version>1.13.0</flink.version>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependency>
	<groupId>com.ververica</groupId>
	<artifactId>flink-connector-mysql-cdc</artifactId>
	<version>${flinkcdc.version}</version>
</dependency>
<!-- flink核心API -->
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-clients_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-java</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-scala_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-java_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-scala_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-table-common</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-table-planner-blink_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-table-api-java-bridge_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>

请提前开启MySQL中的binlog,配置my.cnf文件,重启mysqld服务即可

my.cnf

[client]
default_character_set=utf8
[mysqld]
server-id=1
collation_server=utf8_general_ci
character_set_server=utf8
log-bin=mysql-bin
binlog_format=row
expire_logs_days=30

ddl&dml.sql

create table test_cdc
(
    id   int          not null
        primary key,
    name varchar(100) null,
    age  int          null
);

INSERT INTO flink.test_cdc (id, name, age) VALUES (1, 'Daniel', 25);
INSERT INTO flink.test_cdc (id, name, age) VALUES (2, 'David', 38);
INSERT INTO flink.test_cdc (id, name, age) VALUES (3, 'James', 16);
INSERT INTO flink.test_cdc (id, name, age) VALUES (4, 'Robert', 27);

FlinkDSCDC.java

package com.daniel.util;

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Author Daniel
 * @Date: 2023/7/25 10:03
 * @Description DataStream API CDC
 **/
public class FlinkDSCDC {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("flink")
                // 这里一定要是db.table的形式
                .tableList("flink.test_cdc")
                .deserializer(new StringDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();

        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
        dataStreamSource.print();
        env.execute("FlinkDSCDC");
    }
}

UPDATE flink.test_cdc t SET t.age = 24 WHERE t.id = 1;
UPDATE flink.test_cdc t SET t.name = 'Andy' WHERE t.id = 3;

打印出的日志

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=1,name=Daniel,age=25},after=Struct{id=1,name=Daniel,age=24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=7989,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=4}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=3,name=James,age=16},after=Struct{id=3,name=Andy,age=16},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=8113,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

可以得出的结论:

  1. 日志中的数据变化操作类型(op)可以表示为 ‘u’,表示更新操作。在第一条日志中,发生了一个更新操作,对应的记录的 key 是 id=1,更新前的数据是 {id=1, name=Daniel, age=25},更新后的数据是 {id=1, name=Daniel, age=24}。在第二条日志中,也发生了一个更新操作,对应的记录的 key 是 id=3,更新前的数据是 {id=3, name=James, age=16},更新后的数据是 {id=3, name=Andy, age=16}。
  2. 每条日志还提供了其他元数据信息,如数据源(source)、版本号(version)、连接器名称(connector)、时间戳(ts_ms)等。这些信息可以帮助我们追踪记录的来源和处理过程。
  3. 日志中的 sourceOffset 包含了一些关键信息,如事务ID(transaction_id)、文件名(file)、偏移位置(pos)等。这些信息可以用于确保数据的准确顺序和一致性。

7.2Table/SQL API的用法

FlinkSQLCDC.java

package com.daniel.util;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @Author Daniel
 * @Date: 2023/7/25 15:25
 * @Description
 **/
public class FlinkSQLCDC {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql("CREATE TABLE test_cdc (" +
                " id int primary key," +
                " name STRING," +
                " age int" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'scan.startup.mode' = 'latest-offset'," +
                " 'hostname' = 'localhost'," +
                " 'port' = '3306'," +
                " 'username' = 'root'," +
                " 'password' = '123456'," +
                " 'database-name' = 'flink'," +
                " 'table-name' = 'test_cdc'" +
                ")");

        Table table = tableEnv.sqlQuery("select * from test_cdc");
        DataStream<Tuple2<Boolean, Row>> dataStreamSource = tableEnv.toRetractStream(table, Row.class);
        dataStreamSource.print();
        env.execute("FlinkSQLCDC");
    }
}

UPDATE flink.test_cdc t SET t.age = 55 WHERE t.id = 2;
UPDATE flink.test_cdc t SET t.age = 22 WHERE t.id = 3;
UPDATE flink.test_cdc t SET t.name = 'Alice' WHERE t.id = 4;
UPDATE flink.test_cdc t SET t.age = 18 WHERE t.id = 1;
INSERT INTO flink.test_cdc (id, name, age) VALUES (5, 'David', 29);

打印出的日志

(false,-U[2, David, 38])
(true,+U[2, David, 55])
(false,-U[3, Andy, 16])
(true,+U[3, Andy, 22])
(false,-U[4, Robert, 27])
(true,+U[4, Alice, 27])
(false,-U[1, Daniel, 24])
(true,+U[1, Daniel, 18])
(true,+I[5, David, 29])

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/786078.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

刘铁猛C#语言教程——语句1

语句的定义 以下是对该文档的翻译 一条语句对应着一条汇编语言指令或者一条语句对应着一系列有着内在逻辑关联的汇编指令&#xff0c;对于这句话的理解&#xff0c;我们可以观察C#编译器编译的C#程序后得到的汇编语言代码&#xff0c;这样便可以看到语句与指令的关系&#xff…

【Arduino】Teensy® USB Development Board 板子介绍

文章目录 1. Features2. Pins Name3. Getting started Teesy by Arduino1. Install Arduino IDE Software2. Install Teensyduino Software3. Running Blink Program 4. IMPORTANT INFORMATION BEFORE GOING FURTHER WITH USING TEENSY 4.11. I/O 仅耐受 3.3V&#xff01;2. 电…

【MyBatis 学习一】认识MyBatis 第一个MyBatis查询

目录 一、认识MyBatis 1、MyBatis是什么&#xff1f; 2、为什么要学习MyBatis? 二、配置MyBatis环境 1、建库与建表 2、创建新项目 3、xml文件配置 &#xff08;1&#xff09;配置数据库连接 &#xff08;2&#xff09;配置 MyBatis 中的 XML 路径 三、测试&#x…

基于4G网络的嵌入式设备远程升级系统设计与实现(学习一)

摘要 随着无线通信技术的不断更新发展&#xff0c;嵌入式设备的联网应用领域得以大规模扩大&#xff0c;远程升级功能成为产品开发中必不可少的一部分。 本文对嵌入式设备远程升级进行了研究&#xff0c;在不改变设备硬件集成度基础上&#xff0c;设计实现了分离式升级的远程…

在Vue-Element中引入jQuery的方法

一、在终端窗口执行安装命令 npm install jquery --save执行完后&#xff0c;npm会自动在package.json中加上jquery 二、在main.js中引入&#xff08;或者在需要使用的页面中引入即可&#xff09; import $ from jquery三、使用jquery

结构型设计模式之亨元模式【设计模式系列】

系列文章目录 C技能系列 Linux通信架构系列 C高性能优化编程系列 深入理解软件架构设计系列 高级C并发线程编程 设计模式系列 期待你的关注哦&#xff01;&#xff01;&#xff01; 现在的一切都是为将来的梦想编织翅膀&#xff0c;让梦想在现实中展翅高飞。 Now everythi…

第16章 控制脚本

CtrlC组合键会发送SIGINT信号&#xff0c;停止shell中当前运行的进程。 CtrlZ组合键会生成一个SIGTSTP信号&#xff0c;停止shell中运行的任何进程。停止进程会让程序继续保留在内存中&#xff0c;并能从上次停止的位置继续运行。 方括号中的数字是shell分配的作业号&#xff0…

PHP之Smarty使用以及框架display和assign原理

一、Smarty的下载 进入Smarty官网下载&#xff0c;复制目录libs目录即可http://www.smarty.net/http://www.smarty.net/ 二、使用Smarty&#xff0c;创建目录demo,把libs放进去改名为Smarty 三、引入Smarty配置,创建目录&#xff0c;index.php文件配置 <?php…

VuePress在生产环境跳转子页报错 Failed to execute ‘appendChild‘ on ‘Node‘

记录一个使用VuePress时遇到的问题 使用VuePress做了一个文档网页&#xff0c;在开发环境的时候一切正常&#xff0c;但是发布到生产环境后&#xff0c;直接跳转二级页面会报错Failed to execute appendChild on Node 比如主页是http://sun/docs/.vuepress/dist/index.html#/…

【算法】递增序列

对于一个字母矩阵&#xff0c;我们称矩阵中的一个递增序列是指在矩阵中找到两个字母&#xff0c;它们在同一行&#xff0c;同一列&#xff0c;或者在同一 45 度的斜线上&#xff0c;这两个字母从左向右看、或者从上向下看是递增的 对于下面的 30 行 50列的矩阵&#xff0c;请问…

AtcoderABC234场

A - Weird FunctionA - Weird Function 题目大意 要求计算 f(f(f(t)t)f(f(t))) &#xff0c;其中 t 是一个给定的整数。 函数 f(x) 定义为 f(x) x^2 2x 3。 思路分析 定义实现函数 f(int t)&#xff0c;并嵌套调用。 时间复杂度 O(1) AC代码 #include<bits/stdc.…

STN:Spatial Transformer Networks

1.Abstract 卷积神经网络缺乏对输入数据保持空间不变的能力&#xff0c;导致模型性能下降。作者提出了一种新的可学习模块&#xff0c;STN。这个可微模块可以插入现有的卷积结构中&#xff0c;使神经网络能够根据特征图像本身&#xff0c;主动地对特征图像进行空间变换&#x…

Toyota Programming Contest 2023#4(AtCoder Beginner Contest 311)(A-G)

Contest Duration: 2023-07-22(Sat) 20:00 - 2023-07-22(Sat) 21:40 (local time) (100 minutes) 头文件和宏 #include<iostream> #include<string> #include<vector> using namespace std; #define int long long #define fer(i,a,b) for(int ia;i<b;i…

Python实现HBA混合蝙蝠智能算法优化随机森林回归模型(RandomForestRegressor算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 蝙蝠算法是2010年杨教授基于群体智能提出的启发式搜索算法&#xff0c;是一种搜索全局最优解的有效方法…

信息安全与网络空间安全 - 保障您的在线安全

数据参考&#xff1a;CISP官方 目录&#xff1a; 信息与信息安全 信息安全属性 网络安全发展阶段 网络空间安全保障 一、信息与信息安全 1、什么是信息&#xff1f; 定义&#xff1a;信息是通过传递和处理的方式&#xff0c;用于传达知识、事实、数据或观点的内容。形…

k8s安装prometheus

安装 在目标集群上&#xff0c;执行如下命令&#xff1a; kubectl apply -f https://gitee.com/i512team/dhorse/raw/main/conf/kubernetes-prometheus.yml使用 1.在浏览器访问地址&#xff1a;http://master_ip:30000&#xff0c;如下图所示&#xff1a; 2.查看k8s自带的…

模拟Stevens Lewis描述的小型飞机纵向动力学的非线性动态反演控制器研究(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f308;4 Matlab代码实现 &#x1f4a5;1 概述 针对Stevens和Lewis描述的小型飞机纵向动力学的非线性动态&#xff0c;研究非线性动态反演控制器可以是一个有趣的课题。动态反演控制器的目标…

智慧供水调度工控平台-业务数据化,数据模型化

平台概述 智慧供水调度工控平台是以物联感知技术、大数据、智能控制、云计算、人工智能、数字孪生、AI算法、虚拟现实技术为核心&#xff0c;以监测仪表、通讯网络、数据库系统、数据中台、模型软件、前台展示、智慧运维等产品体系为支撑&#xff0c;以城市水资源、水生态、水…

微信小程序quickstartFunctions中云函数的应用

1、在quickstartFunctions文件中新建文件夹和文件 2、index.js 文件书写 const cloud require(wx-server-sdk);cloud.init({env: cloud.DYNAMIC_CURRENT_ENV }); const db cloud.database();// 链表查询试卷和对应的题库 exports.main async (event, context) > {retu…

现货白银投资如何挂单

如果现货白银投资者不想时时刻刻都在盯盘&#xff0c;盯紧进场的机会&#xff0c;可以采用提前挂单的方式来交易&#xff0c; 这样做的好处很多&#xff0c;尤其是在市场行情波动大的时候&#xff0c;投资者如何手动下单&#xff0c;很难在自己期望的价格成交以&#xff0c;导致…