【大数据】Flink CDC 的概览和使用

news2025/1/11 17:09:56

Flink CDC 的概览和使用

  • 1.什么是 CDC
  • 2.什么是 Flink CDC
  • 3.Flink CDC 前生今世
    • 3.1 Flink CDC 1.x
    • 3.2 Flink CDC 2.x
    • 3.3 Flink CDC 3.x
  • 4.Flink CDC 使用
  • 5.Debezium 标准 CDC Event 格式详解

1.什么是 CDC

CDCChange Data Capture数据变更抓取)是一种用于跟踪数据库中数据更改的技术。它用于监视数据库中的变化,并捕获这些变化,以便实时或定期将变化的数据同步到其他系统、数据仓库或分析平台。CDC 技术通常用于数据复制、数据仓库更新、实时报告和数据同步等场景。

CDC 可以捕获数据库中的以下类型的数据变化:

  • ✅ 插入(Insert):当新数据被插入到数据库表中时。
  • ✅ 更新(Update):当数据库表中的现有数据被修改时。
  • ✅ 删除(Delete):当数据从数据库表中被删除时。

2.什么是 Flink CDC

Flink CDC 是一个开源的数据库变更日志捕获和处理框架,它可以实时地从各种数据库(如 MySQL、PostgreSQL、Oracle、MongoDB 等)中捕获数据变更并将其转换为流式数据。Flink CDC 可以帮助实时应用程序实时地处理和分析这些流数据,从而实现 数据同步数据管道实时分析实时应用 等功能。

本质上是一系列的 Flink Source Connector 集合,用于来获取数据库的实时变更,底层基于 Debezium 实现。

🚀 https://github.com/ververica/flink-cdc-connectors

3.Flink CDC 前生今世

3.1 Flink CDC 1.x

Flink CDC 1.x 开启了 Flink 在 CDC 上的实践之路,Flink CDC 1.x 第一次引入了 Debezium 框架,利用 Debezium 已有的能力将数据库实时变更接入到 Flink 流计算框架中,利用 Flink 丰富的生态对数据进行加工处理,满足不同的业务需求,在功能层面上而言,Flink CDC 1.x 只能说是可以用,但不能生产上用,为什么:

  • 1.x 版本全增量切换时会对表加锁,在同步过程中有段时间业务会处于暂停状态。
  • 各方面功能还不够完善,比如自动加表、DDL 事件传递等。

在这里插入图片描述

总体而言 Flink CDC 1.x 只能说是一个比较有趣的小玩具,还不具备大规模商业盈利的价值。

在这里插入图片描述

3.2 Flink CDC 2.x

2.x 版本中,Flink CDC 引入了 Netfix DBLog 中的无锁算法,彻底解决了全增量切换上业务停滞的问题,同时得益于 FLIP-27 对 Flink Source API 的重构,Flink CDC 也基于 FLIP-27 升级到了新的框架设计,至此,Flink CDC 被大规模公司使用并投入到生产中。

在这里插入图片描述

3.3 Flink CDC 3.x

近期,Flink CDC 发布了全新的 3.0 版本,并宣布捐赠回 Flink 主项目,在新的 3.0 版本中,Flink CDC 对于接口和架构上做了很大的升级和调整,对于整体项目的定位也从之前的 Flink Source Connector 转变为了 Data Integration Engine,未来将与 SeaTunnelDataXChunjun 等一系列老牌数据集成项目同台竞技,让我们拭目以待。

在这里插入图片描述

4.Flink CDC 使用

在本地启动一个 MySQL 的 Docker 环境。

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw -e TZ=Asia/Shanghai quay.io/debezium/example-mysql:2.4

创建表:

create database cdc_test;
use cdc_test;

create table cdc_table (
    id int primary key auto_increment,
    name varchar(1000),
    age int
);

在 IDEA 中新建一个Java 项目。

导入依赖:

<flink-cdc.version>2.4.2</flink-cdc.version>
<flink.version>1.16.3</flink.version>
<logback.version>1.2.7</logback.version>

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>${flink-cdc.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>${logback.version}</version>
</dependency>

编写代码:

public class FlinkCDCApplication {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.enableCheckpointing(60000L);

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("cdc_test") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
            .tableList("cdc_test.cdc_table") // set captured table
            .username("root")
            .password("debezium")
            .includeSchemaChanges(true)
            .startupOptions(StartupOptions.latest())
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-CDC")
            .print();
        env.execute();
    }
}

添加日志配置:

<!--
  ~ Licensed to the Apache Software Foundation (ASF) under one or more
  ~ contributor license agreements.  See the NOTICE file distributed with
  ~ this work for additional information regarding copyright ownership.
  ~ The ASF licenses this file to You under the Apache License, Version 2.0
  ~ (the "License"); you may not use this file except in compliance with
  ~ the License.  You may obtain a copy of the License at
  ~
  ~    http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~ Unless required by applicable law or agreed to in writing, software
  ~ distributed under the License is distributed on an "AS IS" BASIS,
  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ~ See the License for the specific language governing permissions and
  ~ limitations under the License.
  -->

<configuration>
       <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
          <encoder>
             <pattern>%d{yyyy-MM-dd HH:mm:ss} %p %c - %msg %n</pattern>
          </encoder>
       </appender>

       <root level="INFO">
          <appender-ref ref="STDOUT" />
       </root>
</configuration>

5.Debezium 标准 CDC Event 格式详解

{
    "before": null,
    "after": {
        "id": 1,
        "name": "xing.yu",
        "age": 26,
        "new_column": "dewu"
    },
    "source": {
        "version": "1.9.7.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1702723640000,
        "snapshot": "false",
        "db": "cdc_test",
        "sequence": null,
        "table": "cdc_table",
        "server_id": 223344,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 2394,
        "row": 0,
        "thread": 39,
        "query": null
    },
    "op": "c",
    "ts_ms": 1702723640483,
    "transaction": null
}
{
    // 表数据更新前的值,update/delete
    "before": {},
    // 表数据更新后的值,create/update
    "after": {},
    // 元数据信息
    "source": {},
    // 操作类型 c/d/u
    "op": "",
    // 记录解析时间
    "ts_ms": "",
    "transaction": ""
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1363712.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

汽车电子行业的 C 语言编程标准

前言 之前分享了一些编程规范相关的文章&#xff0c;有位读者提到了汽车电子行业的MISRA C标准&#xff0c;说这个很不错。 本次给大家找来了一篇汽车电子行业的MISRA C标准的文章一同学习下。 什么是MISRA&#xff1f; MISRA (The Motor Industry Software Reliability Ass…

Linux-文件系统管理实验2

1、将bin目录下的所有文件列表放到bin.txt文档中&#xff0c;并将一共有多少个命令的结果信息保存到该文件的最后一行。统计出文件中以b开头的所有命令有多少个&#xff0c;并将这些命令保存到b.txt文档中。将文档中以p结尾的所有命令保存到p.txt文件中&#xff0c;并统计有多少…

【linux】Ubuntu 22.04.3 LTS截屏

一、快捷键 交互式录屏 ShiftCtrltAltR 交互式截图 Print 对窗口进行截图 AltPrint 截图 ShiftPrint 快捷键可能取决于使用的桌面环境和个人的键盘快捷键设置。如果上述快捷键不起作用&#xff0c;可能需要检查系统设置中的键盘快捷键部分&#xff0c;以了解系统中截图的…

法线变换矩阵的推导

背景 在冯氏光照模型中&#xff0c;其中的漫反射项需要我们对法向量和光线做点乘计算。 从顶点着色器中读入的法向量数据处于模型空间&#xff0c;我们需要将法向量转换到世界空间&#xff0c;然后在世界空间中让法向量和光线做运算。这里便有一个问题&#xff0c;如何将法线…

问答领域的基本了解

问答领域是人工智能领域中的一个重要研究方向&#xff0c;旨在让计算机能够理解人类提出的问题&#xff0c;并以自然语言形式回答这些问题。问答系统可以应用于各种场景&#xff0c;包括搜索引擎、虚拟助手、智能客服等。 一.目标 目标&#xff1a; 问答系统的主要目标是使计…

逆向一个Go程序

前奏 事先声明&#xff0c;自导自演&#xff0c;纯属为了演示基本的逆向思维 用Go写一段模拟登录的代码&#xff1a; package mainimport ("fmt" )func main() {pass : ""fmt.Print("input password:")fmt.Scan(&pass)if pass "hel…

基于YOLOv3开发构建道路交通场景下CCTSDB2021交通标识检测识别系统

交通标志检测是交通标志识别系统中的一项重要任务。与其他国家的交通标志相比&#xff0c;中国的交通标志有其独特的特点。卷积神经网络&#xff08;CNN&#xff09;在计算机视觉任务中取得了突破性进展&#xff0c;在交通标志分类方面取得了巨大的成功。CCTSDB 数据集是由长沙…

柠檬Lemon测评机的配置和测试方法

柠檬Lemon测评机的配置和测试方法 只需3步,即可配置好柠檬 第一步:选择g++,点击下一步 第二步:找到g++的目录,添加编译器,点击下一步 第三步:检查结果,点击完成。(此时,配置完成) 只需3步,即可用柠檬做考试测试 第一步:新建比赛

如何实现公网访问GeoServe Web管理界面共享空间地理信息【内网穿透】

文章目录 前言1.安装GeoServer2. windows 安装 cpolar3. 创建公网访问地址4. 公网访问Geo Servcer服务5. 固定公网HTTP地址 前言 GeoServer是OGC Web服务器规范的J2EE实现&#xff0c;利用GeoServer可以方便地发布地图数据&#xff0c;允许用户对要素数据进行更新、删除、插入…

普中STM32-PZ6806L开发板(资料收集...)

简介 逐渐收集一些开发过程中使用到的文档资料数据手册 DS18B20 数据手册 DS18B20 Datasheet 开发文档 STM32F1各种文档 https://www.st.com/en/embedded-software/stm32cubef1.html#documentation HAL库文档开发文档 你使用的HAL文档, 在STM32CubeMX生成过程的最下面有…

uniapp 解决安卓App使用uni.requestPayment实现沙箱环境支付宝支付报错

背景&#xff1a;uniapp与Java实现的安卓端app支付宝支付&#xff0c;本想先在沙箱测试环境测支付&#xff0c;但一直提示“商家订单参数异常&#xff0c;请重新发起付款。”&#xff0c;接着报错信息就是&#xff1a;{ "errMsg": "requestPayment:fail [pa…

基于springboot智慧食堂管理系统源码和论文

随着Internet的发展&#xff0c;人们的日常生活已经离不开网络。未来人们的生活与工作将变得越来越数字化&#xff0c;网络化和电子化。网上管理&#xff0c;它将是直接管理“智慧食堂”系统的最新形式。本论文是以构建“智慧食堂”系统为目标&#xff0c;使用java技术制作&…

Spring之循环依赖底层源码(一)

文章目录 一、简介1. 回顾2. 循环依赖3. Bean的生命周期回顾4. 三级缓存5. 解决循环依赖的思路 二、源码分析三、相关问题1. Async情况下的循环依赖解析2. 原型Bean情况下的循环依赖解析3. 构造方法导致的循环依赖解析 一、简介 1. 回顾 前面首先重点分析了Spring Bean的整个…

消息队列-RocketMQ-概览与搭建

RocketMQ 领域模型 RockeMQ整体结构预览 RocketMQ 中的一些概念 Topic&#xff1a;主题&#xff0c;可以理解为类别、分类的概念 MessageQueue&#xff1a;消息队列&#xff0c;存储数据的一个容器&#xff08;队列索引数据&#xff09;&#xff0c;默认每个 Topic 下有 4 个队…

Danil Pristupov Fork(强大而易用的Git客户端) for Mac/Windows

在当今软件开发领域&#xff0c;团队协作和版本控制是非常重要的方面。在这个过程中&#xff0c;Git成为了最受欢迎的版本控制工具之一。然而&#xff0c;对于Git的使用&#xff0c;一个好的客户端是至关重要的。 今天&#xff0c;我们要为大家介绍一款强大而易用的Git客户端—…

C++ 软件常用分析工具及项目实战问题分析案例集锦

目录 1、库依赖关系查看工具Dependency Walker 2、GDI对象查看工具GDIview 3、PE信息查看工具PeViewer/MiTeC EXE Explorer 4、进程信息查看工具Process Explorer 5、进程监控工具Process Monitor 6、API函数调用监测工具API Monitor C软件异常排查从入门到精通系列教程&…

Linux-v4l2框架

框架图 从上图不难看出&#xff0c;v4l2_device作为顶层管理者&#xff0c;一方面通过嵌入到一个video_device中&#xff0c;暴露video设备节点给用户空间进行控制&#xff1b;另一方面&#xff0c;video_device内部会创建一个media_entity作为在media controller中的抽象体&a…

基于springboot的停车场管理系统-计算机毕业设计源码82061

摘要 由于数据库和数据仓库技术的快速发展&#xff0c;停车场管理系统建设越来越向模块化、智能化、自我服务和管理科学化的方向发展。停车场管理系统对处理对象和服务对象&#xff0c;自身的系统结构&#xff0c;处理能力&#xff0c;都将适应技术发展的要求发生重大的变化。停…

【读书笔记】网空态势感知理论与模型(九)

对分析人员数据分类分流操作的研究 1.概述 本章节介绍一种以人员为中心的智能数据分类分流系统&#xff0c;该系统利用了入侵检测分析人员的认知轨迹。整合了3个维度的动态网络-人系统&#xff08;cyber-humber system&#xff09;&#xff1a;网空防御分析人员、网络监测数据…

基于天牛须算法优化的Elman神经网络数据预测 - 附代码

基于天牛须算法优化的Elman神经网络数据预测 - 附代码 文章目录 基于天牛须算法优化的Elman神经网络数据预测 - 附代码1.Elman 神经网络结构2.Elman 神经用络学习过程3.电力负荷预测概述3.1 模型建立 4.基于天牛须优化的Elman网络5.测试结果6.参考文献7.Matlab代码 摘要&#x…