FlinkCDC详解

news2025/1/13 7:53:09

1、FlinkCDC是什么

1.1 CDC是什么

CDC是Chanage Data Capture(数据变更捕获)的简称。其核心原理就是监测并捕获数据库的变动(例如增删改),将这些变更按照发生顺序捕获,将捕获到的数据,写入数据库种如神策数据的核心kudu、doris、mysql、kakfa等。

1.2 CDC的实现方式

1.2.1 基于查询的CDC

  • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据
  • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
  • 不保障实时性,基于离线调度存在天然的延迟。

1.2.2 基于日志的CDC

  • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
  • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
  • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。

1.2.3 常见的开源的CDC方案比较

在这里插入图片描述

1.2.4 个人对于CDC领域的一些浅见

其实对于CDC领域在数仓行业中很常见,无论是离线数仓也好还是实时数仓也好,或者说是业务系统也好,例如京东就是使用CDC方案来同步优惠卷的。其实在很多的CDC的同步方案中,大部分公司其实选用的是第一种,查询同步方案,为什么这么做呢,很多人可能会问,实时同步不好吗,我想说的是实时的CDC太复杂,虽然一致性不高,但是其实运营或者其他人员并不需要这么高的实时性,可能某些领域需要,当然也有很多的表结构设计没有update_time字段,这样的话如果同步一张表,可能会有点麻烦,但是并非是不能同步,如果数据量不大的话,或者有其他自增键的话会很方便,但是如果没有的话就会很麻烦,也可以做,可以做整行的md5这里我就不一一赘述了,在进行查询cdc同步的一些情况。日志cdc呢,其实根本原理就是监控类似于mysql的binlog。可以让整个数据的增删改,进行捕获,从而可以达到两个数据的一致性,当然这个一致性并不是实时的,哪怕是mysql的主从都有可能延迟,更别提咱们监控binlog了,当然这种延迟几乎很少见,业务也不会发现,这种CDC虽然听上去很好,但是实现较为困难,限制比较大,例如下游的数据源要支持改,不像离线可以用拉链表来解决。但是这种方式真的很好,如果开发人员和架构设计人员以及数据设计人员的设计比较好,这种方式效果是最棒的,我司的mysql同步器就支持这两种方式,根据使用人员的喜好来进行选择。

2、Flink CDC的原理

2.1 1.x Flink CDC

Flink1.x的cdc依赖于Debezium组件,debezium为了保证数据的一致性,在全量读取时,会加锁。
此时呢会分为全局锁权限和无全局锁权限。

在这里插入图片描述那么为什么debezium为什么要这么做呢,要加上全局锁呢,因为数据一致性问题,这就涉及到数据库的全局锁和表锁了,数据库的全局锁,以mysql为例,全局锁就是对整个数据库实例加锁。MySQL 提供了一个加全局读锁的方法,命令是Flush tables with read lock (FTWRL)。
当你需要让整个库处于只读状态的时候,可以使用这个命令,之后其他线程的以下语句会被阻塞:数据更新语句(数据的增删改)、数据定义语句(包括建表、修改表结构等)和更新类事务的提交语句。一般全局锁的使用场景在数据库备份上,当然如果主库加锁的话,会导致一些问题。例如加锁后,这个数据库实例无法更新,业务基本就停止了。从库呢,也不能从binlog拉取数据,这就导致了主从延迟,假如有的业务使用的是从库的话就会出现问题。当然全局锁有问题,那么不加锁会导致什么问题呢,数据不一致问题:
比如手机卡,购买套餐信息

这里分为两张表 u_acount (用于余额表),u_pricing (资费套餐表)
步骤:
1. u_account 表中数据 用户A 余额:300
    u_pricing 表中数据 用户A 套餐:空
2. 发起备份,备份过程中先备份u_account表,备份完了这个表,这个时候u_account 用户余额是300
3. 这个时候套用户购买了一个资费套餐100,餐购买完成,写入到u_print套餐表购买成功,备份期间的数据。
4. 备份完成

可以看到备份的结果是,u_account 表中的数据没有变, u_pricing 表中的数据 已近购买了资费套餐100.

哪这时候用这个备份文件来恢复数据的话,用户A 赚了100 ,用户是不是很舒服啊。但是你得想想公司利益啊。  
也就是说,不加锁的话,备份系统备份的得到的库不是一个逻辑时间点,这个数据是逻辑不一致的。

当然mysql的备份工具,mysqldump可以在备份的时候支持更新,基于MVCC的机制。MVCC (Multiversion Concurrency Control),多版本并发控制。顾名思义,MVCC 是通过数据行的 多个版本 管理来实现数据库的 并发控制。这项技术使得在InnoDB的事务隔离级别下执行 一致性读操 作有了保证。换言之,就是为了查询一些正在被另一个事务更新的行,并且可以看到它们被更新之前的值,这样在做查询的时候就不用等待另一个事务释放锁。
不再深入解释mysql的核心机制了。
表锁是什么呢,顾名思义就是锁住了整张表。在加表锁的表上,无法进行DDL、DML操作。当然在mysql5.5以后,有一个表锁是MDL,MDL不需要显示的使用,在访问一个表的时候会被自动加上。MDL 的作用是,保证读写的正确性。你可以想象一下,如果一个查询正在遍历一个表中的数据,而执行期间另一个线程对这个表结构做变更,删了一列,那么查询线程拿到的结果跟表结构对不上,肯定是不行的。因此,在 MySQL 5.5 版本中引入了 MDL,当对一个表做增删改查操作的时候,加 MDL读锁;当要对表做结构变更操作的时候,加 MDL 写锁。

  • 读锁之间不互斥,因此你可以有多个线程同时对一张表增删改查。
  • 读写锁之间、写锁之间是互斥的,用来保证变更表结构操作的安全性。因此,如果有两个线程要同时给一个表加字段,其中一个要等另一个执行完才能开始执行。

MDL锁有一些问题,假如在多个读session中进行更改表结构操作的话,可能会卡死。

这个就是debezium在flink1.x中的应用。

2.2 2.x Flink CDC

Flink 2.x不仅引入了增量快照读取机制,还带来了一些其他功能的改进。以下是对Flink 2.x的主要功能的介绍:

增量快照读取:Flink 2.x引入了增量快照读取机制,这是一种全新的数据读取方式。该机制支持并发读取和以chunk为粒度进行checkpoint。在增量快照读取过程中,Flink首先根据表的主键将其划分为多个块(chunk),然后将这些块分配给多个读取器并行读取数据。这一机制极大地提高了数据读取的效率。
精确一次性处理:Flink 2.x引入了Exactly-Once语义,确保数据处理结果的精确一次性。MySQL CDC 连接器是Flink的Source连接器,可以利用Flink的checkpoint机制来确保精确一次性处理。
动态加表:Flink 2.x支持动态加表,通过使用savepoint来复用之前作业的状态,解决了动态加表的问题。
无主键表的处理:Flink 2.x对无主键表的读取和处理进行了优化。在无主键表中,Flink可以通过一些额外的字段来识别数据记录的唯一性,从而实现准确的数据读取和处理。

对于Flink 2.x的CDC方案呢,可以理解为全量读取时,在划分chunk块的时候,采用了查询读,他是将主键进行切分的。默认一个chunk8096条数据,知道这些就可以了。
2.x的 Flink cdc实现较为复杂,这里就不一一赘述了。

3、FlinkCDC的使用

3.1 导入依赖

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.49</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
              <!-- 可以将依赖打到jar包中 -->
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

3.2 代码实操

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class FlinkCDC {
    public static void main(String[] args) throws Exception {

        //1.获取Flink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //通过FlinkCDC构建SourceFunction
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("cdc_test")	//监控的数据库
                .tableList("cdc_test.user_info")	//监控的数据库下的表
                .deserializer(new StringDebeziumDeserializationSchema())//反序列化
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);

        //3.数据打印
        dataStreamSource.print();

        //4.启动任务
        env.execute("FlinkCDC");
    }
}

4、Flink CDC输出数据解析

4.1 数据的数据结构

在这里插入图片描述flink cdc的输出结果大概可以分为 before、after、
before代表变更前数据,after代表变更后数据。

还有个op,这个op代表的是事务的操作:
r:读取历史
d:删除
c:创建
u:更新

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1459365.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue | (二)Vue组件化编程 | 尚硅谷Vue2.0+Vue3.0全套教程

文章目录 &#x1f4da;模块与组件、模块化与组件化&#x1f4da;非单文件组件&#x1f407;基本使用&#x1f407;关于组件的几个注意点&#x1f407;组件的嵌套 &#x1f4da;单文件组件&#x1f407;一个.vue 文件的组成&#x1f407;实例 学习链接&#xff1a;尚硅谷Vue2.0…

多线程、分布式运行用例

python多线程 threading模块 多线程实例 # -*- coding: utf-8 -*- # Time : 2024/2/7 15:50 # Author : 居里夫人吃橘子 # File : class01.py # Software: PyCharm import threading from time import sleepdef run(name):print(name 该起床了)sleep(2)print(name …

EXCEL使用VBA一键批量转换成PDF

EXCEL使用VBA一键批量转换成PDF 上图是给定转换路径 Sub 按钮1_Click() Dim a(1 To 1000) As String Dim a2 As String Dim myfile As String Dim wb As Workbook a2 Trim(Range("a2"))myfile Dir(a2 & "\" & "*.xls")k 0Do While m…

【LeetCode】树的BFS(层序遍历)精选6题

目录 1. N 叉树的层序遍历&#xff08;中等&#xff09; 2. 二叉树的锯齿形层序遍历&#xff08;中等&#xff09; 3. 二叉树的最大宽度&#xff08;中等&#xff09; 4. 在每个树行中找最大值&#xff08;中等&#xff09; 5. 找树左下角的值&#xff08;中等&#xff09…

2024.2.20

使用多进程完成两个文件的拷贝&#xff0c;父进程拷贝前一半&#xff0c;子进程拷贝后一半&#xff0c;父进程回收子进程的资源 #include<myhead.h> int main(int argc, const char *argv[]) {char str[100]"";puts("please input str:");//从终端读…

手动实现new操作符

<script>//前置知识// 每一个函数在创建之初就会有一个prototype属性&#xff0c;这个属性指向函数的原型对象// function abc(){// }// abc.prototype--> {constructor: f}// 在JS中任意的对象都有内置的属性叫做[[prototype]]这是一个私有属性&#xff0c;这个私有属…

GEE数据集——美国两个主要石油和天然气(OG)产区内与石油和天然气(OG)相关的基础设施的位置

该数据集提供了美国两个主要石油和天然气&#xff08;O&G&#xff09;产区内与石油和天然气&#xff08;O&G&#xff09;相关的基础设施的位置&#xff1a;德克萨斯州西部和新墨西哥州南部二叠纪盆地的特拉华子盆地以及犹他州的乌因塔盆地。前言 – 人工智能教程 石油…

deep learning 代码笔记

1. pandas数据读取和预处理 # import pandas and load dataset import pandas as pd names [Sex, Length, Diameter, Height, Whole_weight, Shucked_weight, Viscera_weight, Shell_weight, Rings] data pd.read_csv(data_file, headerNone, namesnames) print(data) …

【前沿】头戴式光场显示技术研究进展

摘要&#xff1a;光场显示器旨在通过重建三维场景在不同方向发出的几何光线来渲染三维场景的视觉感知&#xff0c;从而为人的视觉系统提供自然舒适的视觉体验&#xff0c;解决传统平面立体三维显示器中的聚散调节冲突问题。近年来&#xff0c;多种光场显示方法被尝试应用到头戴…

沁恒CH32V30X学习笔记11---使用外部时钟模式2采集脉冲计数

使用外部时钟模式2采集脉冲计数 使用外部触发模式 2 能在外部时钟引脚输入的每一个上升沿或下降沿计数。将 ECE 位置位时,将使用外部时钟源模式 2。使用外部时钟源模式 2 时,ETRF 被选定为 CK_PSC。ETR 引脚经过可选的反相器(ETP),分频器(ETPS)后成为 ETRP,再经过滤波…

Stable Diffusion——stable diffusion基础原理详解与安装秋叶整合包进行出图测试

前言 在2022年&#xff0c;人工智能创作内容&#xff08;AIGC&#xff09;成为了AI领域的热门话题之一。在ChatGPT问世之前&#xff0c;AI绘画以其独特的创意和便捷的创作工具迅速走红&#xff0c;引起了广泛关注。随着一系列以Stable Diffusion、Midjourney、NovelAI等为代表…

【Linux权限】 Linux权限管理 | 粘滞位

文章目录 Linux权限管理什么是权限 ❓&#x1f4a6; 文件访问者的分类(人)&#x1f4a6; 文件类型和访问权限(事物属性) &#x1f4a6; 文件访问权限的相关设置方法目录的权限 粘滞位 Linux权限管理 什么是权限 ❓ 权限本质上是决定某件事情&#xff0c;某人能否做。 Linux下…

【GameFramework框架内置模块】2、数据节点(Data Node)

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址 大家好&#xff0c;我是佛系工程师☆恬静的小魔龙☆&#xff0c;不定时更新Unity开发技巧&#xff0c;觉得有用记得一键三连哦。 一、前言 【GameFramework框架】系列教程目录&#xff1a; https://blog.csdn.net/q7…

AMD FPGA设计优化宝典笔记(1)触发器

高亚军老师的这本书《AMD FPGA设计优化宝典》&#xff0c;他主要讲了两个东西&#xff1a; 第一个东西是代码的良好风格&#xff1b; 第二个是设计收敛等的本质。 这个书的结构是一个总论&#xff0c;加上另外的9个优化&#xff0c;包含的有&#xff1a;时钟网络、组合逻辑、触…

从starrocks安装说起和Oracle的OLAP殊途同归

StarRocks是一款分析型数据库&#xff08;他的定语也很多&#xff0c;分布式存算分离等等&#xff09;。作为数据库他的运行和维护迟早也是落在我这里的。对于做数据库的人都知道什么是交易型数据库&#xff0c;什么是分析型数据库。以及什么是事务分析混合型数据库。但是对于非…

UE5 C++ 静态加载资源和类

一.上篇文章创建组件并绑定之后 在Actor中加载初始化了组件&#xff0c;现在在组件中赋值。使用static ConstructorHelpers::FObjectFinder<T>TempName(TEXT("Copy Reference"))&#xff1b;再用TempName.Object //静态加载资源static ConstructorHelpers::FOb…

2024.2.10 HCIA - Big Data笔记

1. 大数据发展趋势与鲲鹏大数据大数据时代大数据的应用领域企业所面临的挑战和机遇华为鲲鹏解决方案2. HDFS分布式文件系统和ZooKeeperHDFS分布式文件系统HDFS概述HDFS相关概念HDFS体系架构HDFS关键特性HDFS数据读写流程ZooKeeper分布式协调服务ZooKeeper概述ZooKeeper体系结构…

拿捏c语言指针(下)

前言 此篇讲解的主要是函数与指针的那些事~ 书接上回 拿捏c语言指针&#xff08;上&#xff09;和 拿捏c语言指针&#xff08;中&#xff09; ​​​​​​没有看的小伙伴要抓紧喽~ 欢迎关注​​个人主页&#xff1a;逸狼 创造不易&#xff0c;可以点点赞吗~ 如有错误&#x…

GNU 图像处理程序 (GIMP) - 颜色拾取工具

GNU 图像处理程序 [GIMP] - 颜色拾取工具 References 选择 颜色拾取工具 在图片上选取要拾取位置的颜色&#xff0c;前景颜色会跟着改变 选择画笔工具&#xff0c;在图片上绘制的是选取的颜色 工具选项 -> 设置背景颜色&#xff0c;在图片上拾取为背景颜色 使用 橡皮工具&am…

SQL Developer 小贴士:显示Trace文件

SQL Developer可以识别trace文件&#xff0c;而无需利用tkprof进行转换。 在数据库服务器上生产trace文件。例如&#xff1a; alter session set tracefile_identifierdemo01_02;alter session set sql_tracetrue;-- your SQL here, for example select * from hr.employees;a…