Flink CDC介绍

news2024/11/17 11:47:27

1.CDC概述


CDC(Change Data Capture)是一种用于捕获和处理数据源中的变化的技术。它允许实时地监视数据库或数据流中发生的数据变动,并将这些变动抽取出来,以便进行进一步的处理和分析。

传统上,数据源的变化通常通过周期性地轮询整个数据集进行检查来实现。但是,这种轮询的方式效率低下且不能实时反应变化。而 CDC 技术则通过在数据源上设置一种机制,使得变化的数据可以被实时捕获并传递给下游处理系统,从而实现了实时的数据变动监控。

Flink 作为一个强大的流式计算引擎,提供了内置的 CDC 功能,能够连接到各种数据源(如数据库、消息队列等),捕获其中的数据变化,并进行灵活的实时处理和分析。

通过使用 Flink CDC,我们可以轻松地构建实时数据管道,对数据变动进行实时响应和处理,为实时分析、实时报表和实时决策等场景提供强大的支持。

Flink_CDC

 

2.CDC 的实现原理


通常来讲,CDC 分为主动查询和事件接收两种技术实现模式。对于主动查询而言,用户通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动,是否需要同步。这种方式优点是不涉及数据库底层特性,实现比较通用;缺点是要对业务表做改造,且实时性不高,不能确保跟踪到所有的变更记录,且持续的频繁查询对数据库的压力较大。事件接收模式可以通过触发器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)来实现。当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。这种方式的优点是实时性高,可以精确捕捉上游的各种变动;缺点是部署数据库的事件接收和解析器(例如 Debezium、Canal 等),有一定的学习和运维成本,对一些冷门的数据库支持不够。综合来看,事件接收模式整体在实时性、吞吐量方面占优,如果数据源是 MySQL、PostgreSQL、MongoDB 等常见的数据库实现,建议使用Debezium来实现变更数据的捕获(下图来自Debezium 官方文档如果使用的只有 MySQL,则还可以用Canal。
在这里插入图片描述

 

3.为什么选 Flink
从上图可以看到,Debezium 官方架构图中,是通过 Kafka Streams 直接实现的 CDC 功能。而我们这里更建议使用 Flink CDC 模块,因为 Flink 相对 Kafka Streams 而言,有如下优势:

强大的流处理引擎: Flink 是一个强大的流处理引擎,具备高吞吐量、低延迟、Exactly-Once 语义等特性。它通过基于事件时间的处理模型,支持准确和有序的数据处理,适用于实时数据处理和分析场景。这使得 Flink 成为实现 CDC 的理想选择。

内置的 CDC 功能: Flink 提供了内置的 CDC 功能,可以直接连接到各种数据源,捕获数据变化,并将其作为数据流进行处理。这消除了我们自行开发或集成 CDC 解决方案的需要,使得实现 CDC 变得更加简单和高效。

多种数据源的支持: Flink CDC 支持与各种数据源进行集成,如关系型数据库(如MySQL、PostgreSQL)、消息队列(如Kafka、RabbitMQ)、文件系统等。这意味着无论你的数据存储在哪里,Flink 都能够轻松地捕获其中的数据变化,并进行进一步的实时处理和分析。

灵活的数据处理能力: Flink 提供了灵活且强大的数据处理能力,可以通过编写自定义的转换函数、处理函数等来对 CDC 数据进行各种实时计算和分析。同时,Flink 还集成了 SQL 和 Table API,为用户提供了使用 SQL 查询语句或 Table API 进行简单查询和分析的方式。

完善的生态系统: Flink 拥有活跃的社区和庞大的生态系统,这意味着你可以轻松地获取到丰富的文档、教程、示例代码和解决方案。此外,Flink 还与其他流行的开源项目(如Apache Kafka、Elasticsearch)深度集成,提供了更多的功能和灵活性。

4.支持的连接器

 5.支持的 Flink 版本

 

6.Flink CDC特性
支持读取数据库快照,即使出现故障也能继续读取binlog,并进行Exactly-once处理
DataStream API 的 CDC 连接器,用户可以在单个作业中使用多个数据库和表的更改,而无需部署 Debezium 和 Kafka
Table/SQL API 的 CDC 连接器,用户可以使用 SQL DDL 创建 CDC 源来监视单个表上的更改
下表显示了连接器的当前特性:

 

7.用法实例

7.1DataStream API 的用法(推荐)

请严格按照上面的《5.支持的 Flink 版本》搭配来使用Flink CDC

<properties>
	<flink.version>1.13.0</flink.version>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependency>
	<groupId>com.ververica</groupId>
	<artifactId>flink-connector-mysql-cdc</artifactId>
	<version>${flinkcdc.version}</version>
</dependency>
<!-- flink核心API -->
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-clients_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-java</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-scala_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-java_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-scala_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-table-common</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-table-planner-blink_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-table-api-java-bridge_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>

请提前开启MySQL中的binlog,配置my.cnf文件,重启mysqld服务即可

my.cnf

[client]
default_character_set=utf8
[mysqld]
server-id=1
collation_server=utf8_general_ci
character_set_server=utf8
log-bin=mysql-bin
binlog_format=row
expire_logs_days=30

ddl&dml.sql

create table test_cdc
(
    id   int          not null
        primary key,
    name varchar(100) null,
    age  int          null
);

INSERT INTO flink.test_cdc (id, name, age) VALUES (1, 'Daniel', 25);
INSERT INTO flink.test_cdc (id, name, age) VALUES (2, 'David', 38);
INSERT INTO flink.test_cdc (id, name, age) VALUES (3, 'James', 16);
INSERT INTO flink.test_cdc (id, name, age) VALUES (4, 'Robert', 27);

FlinkDSCDC.java

package com.daniel.util;

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Author Daniel
 * @Date: 2023/7/25 10:03
 * @Description DataStream API CDC
 **/
public class FlinkDSCDC {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("flink")
                // 这里一定要是db.table的形式
                .tableList("flink.test_cdc")
                .deserializer(new StringDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();

        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
        dataStreamSource.print();
        env.execute("FlinkDSCDC");
    }
}

UPDATE flink.test_cdc t SET t.age = 24 WHERE t.id = 1;
UPDATE flink.test_cdc t SET t.name = 'Andy' WHERE t.id = 3;

打印出的日志

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=1,name=Daniel,age=25},after=Struct{id=1,name=Daniel,age=24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=7989,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=4}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=3,name=James,age=16},after=Struct{id=3,name=Andy,age=16},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=8113,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

可以得出的结论:

1、日志中的数据变化操作类型(op)可以表示为 ‘u’,表示更新操作。在第一条日志中,发生了一个更新操作,对应的记录的 key 是 id=1,更新前的数据是 {id=1, name=Daniel, age=25},更新后的数据是 {id=1, name=Daniel, age=24}。在第二条日志中,也发生了一个更新操作,对应的记录的 key 是 id=3,更新前的数据是 {id=3, name=James, age=16},更新后的数据是 {id=3, name=Andy, age=16}。
2、每条日志还提供了其他元数据信息,如数据源(source)、版本号(version)、连接器名称(connector)、时间戳(ts_ms)等。这些信息可以帮助我们追踪记录的来源和处理过程。
3、日志中的 sourceOffset 包含了一些关键信息,如事务ID(transaction_id)、文件名(file)、偏移位置(pos)等。这些信息可以用于确保数据的准确顺序和一致性。

7.2Table/SQL API的用法

FlinkSQLCDC.java

package com.daniel.util;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @Author Daniel
 * @Date: 2023/7/25 15:25
 * @Description
 **/
public class FlinkSQLCDC {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql("CREATE TABLE test_cdc (" +
                " id int primary key," +
                " name STRING," +
                " age int" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'scan.startup.mode' = 'latest-offset'," +
                " 'hostname' = 'localhost'," +
                " 'port' = '3306'," +
                " 'username' = 'root'," +
                " 'password' = '123456'," +
                " 'database-name' = 'flink'," +
                " 'table-name' = 'test_cdc'" +
                ")");

        Table table = tableEnv.sqlQuery("select * from test_cdc");
        DataStream<Tuple2<Boolean, Row>> dataStreamSource = tableEnv.toRetractStream(table, Row.class);
        dataStreamSource.print();
        env.execute("FlinkSQLCDC");
    }
}

UPDATE flink.test_cdc t SET t.age = 55 WHERE t.id = 2;
UPDATE flink.test_cdc t SET t.age = 22 WHERE t.id = 3;
UPDATE flink.test_cdc t SET t.name = 'Alice' WHERE t.id = 4;
UPDATE flink.test_cdc t SET t.age = 18 WHERE t.id = 1;
INSERT INTO flink.test_cdc (id, name, age) VALUES (5, 'David', 29);

打印出的日志

(false,-U[2, David, 38])
(true,+U[2, David, 55])
(false,-U[3, Andy, 16])
(true,+U[3, Andy, 22])
(false,-U[4, Robert, 27])
(true,+U[4, Alice, 27])
(false,-U[1, Daniel, 24])
(true,+U[1, Daniel, 18])
(true,+I[5, David, 29])

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/947048.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机网络-笔记-第五章-运输层

目录 五、第五章——运输层 1、运输层概述 2、运输层端口号、复用、分用 &#xff08;1&#xff09;熟知端口号、登记端口号、短暂端口号 &#xff08;2&#xff09;熟知端口号 &#xff08;3&#xff09;发送方复用、接收方分用 3、UDP与TCP对比 &#xff08;1&#x…

十二、集合(2)

本章概要 添加元素组集合的打印列表 List 添加元素组 在 java.util 包中的 Arrays 和 Collections 类中都有很多实用的方法&#xff0c;可以在一个 Collection 中添加一组元素。 Arrays.asList() 方法接受一个数组或是逗号分隔的元素列表&#xff08;使用可变参数&#xff…

8.react18并发模式与startTransition(搜索高亮思路)

React 18 之前,渲染是一个单一的,不间断的,同步的事务,一旦渲染开始,就不能被中断 React 18引入并发模式,它允许你将标记更新作为一个transitions,这会告诉React他们可以被中断执行.这样可以将紧急任务先更新,不紧急任务后更新. 将任务给紧急任务先执行, 优先级低的任务后执行…

avue实现用户本地保存自定义配置字段属性及注意事项

avue实现用户本地保存自定义配置字段属性及注意事项 先看一段基于vue-nuxt2的page代码&#xff1a; 代码文件AvueSaveOption.vue <template><div><p>用户保存自定义表格项</p><avue-crudref"crud":defaults.sync"defaults":opt…

华为云服务器前后端分离项目打包上传及nginx配置

目录 1、Spring Boot项目打包 2、后端上传到云服务器 3、前端打包 1&#xff09;前端请求路径修改 2&#xff09;打包上传 4、下载nginx 1&#xff09;添加源 2&#xff09;安装Nginx 3&#xff09;查看nginx安装目录和版本 4&#xff09;启动 重启nginx命令 5&#…

接口测试与功能测试的区别~

今天为大家分享的是我们在日常测试工作中, 一定会接触并且目前在企业中是主要测试内容的 功能测试与接口测试 一.功能测试与接口测试的基本概念。 1.1 什么是功能测试呢? 功能测试: 是黑盒测试的一方面, 检查实际软件的功能是否符合用户的需求 功能测试测试的内容包括以下…

工程制造领域:企业IT架构

一、500强IT部门的组织规划架构图 1.1 IT服务保证梯队与指导思想 二、工程制造领域&#xff0c;整体业务规划架构图 三、工程制造领域&#xff0c;数据化项目规划架构图 四、工程制造领域&#xff0c;IT应用系统集成架构图

15. 实现业务功能--帖子操作

1. 集成编译器 editor.md 支持 MarkDown 语法编辑&#xff0c;在需要用户输⼊内容的页面按以下代码嵌入编辑器 1.1 编写 HTML <!-- 引⼊编辑器的CSS --> <link rel"stylesheet" href"./dist/editor.md/css/editormd.min.css"> <!-- 引⼊编…

学习pytorch6 torchvision中的数据集使用

torchvision中的数据集使用 1. torchvision中的数据集使用官网文档注意点1 totensor实例化不要忘记加括号注意点2 download可以一直保持为True代码执行结果 2. DataLoader的使用 1. torchvision中的数据集使用 官网文档 注意左上角的版本 https://pytorch.org/vision/0.9/ 注…

行业趋势和新兴领域分析:分析当前网络安全行业的发展趋势,如IoT安全、AI安全、区块链安全等。

第一章&#xff1a;引言 随着数字化时代的迅速发展&#xff0c;网络安全已经成为各行各业不可忽视的重要领域。恶意攻击、数据泄露以及黑客入侵等威胁逐渐增多&#xff0c;推动着网络安全行业不断创新与进步。本文将深入探讨当前网络安全领域的发展趋势&#xff0c;聚焦于新兴…

肿瘤科医师狂喜,15分RNA修饰数据挖掘文章

Biomamba荐语 与这个系列的前面一些论文类似&#xff0c;这次给大家推荐的是一篇纯生物信息学数据挖掘的文章&#xff0c;换句话说&#xff0c;这又是一篇不需要支出科研经费&#xff08;白嫖&#xff09;的论文(当然&#xff0c;生信分析用的服务器还是得掏点费用的)。一般来…

springboot第37集:kafka,mqtt,Netty,nginx,CentOS,Webpack

image.png binzookeeper-server-start.shconfigzookeeper.properties.png image.png image.png 消费 image.png image.png image.png image.png image.png image.png image.png image.png image.png Netty的优点有很多&#xff1a; API使用简单&#xff0c;学习成本低。功能强大…

【操作系统】聊聊文件传输的零拷贝、PageCache、异步IO机制

在目前主流的系统中&#xff0c;其实大多数都是数据密集型系统&#xff0c;所以设计数据密集型应用一书非常经典&#xff0c;推荐一读。而大多数遇到的问题都是存储问题。CPU、内存 因为本身的读写速度比较快&#xff0c;所以磁盘就成为了一个性能瓶颈。 针对磁盘优化的技术层…

对class文件进行base64编码

使用以下代码 package org.springframework.cloud.gateway.sample;import org.springframework.util.Base64Utils;import java.io.*; import java.nio.charset.StandardCharsets;public class EncodeShell {public static void main(String[] args){byte[] data null;try {In…

大数据之linux入门

一、linux是什么 linux操作系统 开发者是林纳斯-托瓦兹&#xff0c;出于个人爱好编写。linux是一个基于posix和unix的多用户、多任务、支持多线程和多CPU的操作系统。 Unix是20世纪70年代初出现的一个操作系统&#xff0c;除了作为网络操作系统之外&#xff0c;还可以作为单…

6路液体水位检测芯片VK36W6D SOP16 抗电源干扰及手机干扰特性好

产品品牌&#xff1a;永嘉微电/VINKA 产品型号&#xff1a;VK36W6D 封装形式&#xff1a;SOP16/QFN16L 详细资料&#xff1a;13.5/5.474/4.703 概述 VK36W6D具有6个触摸检测通道&#xff0c;可用来检测6个点的水位。该芯片具有较高的集成度&#xff0c;仅需极少的外部组件便…

vscode GDB 调试linux内核 head.S

遇到的问题 此前参考如下文章 https://zhuanlan.zhihu.com/p/510289859 已经完成了在ubuntu 虚拟机用vscode 调试linux 内核。但是美中不足的是&#xff0c;断点最早只能加在__primary_switched() 函数。无法停在更早的断点上&#xff0c;比如ENTRY(stext) 位置。参考《奔跑吧…

C语言_初识C语言指针

文章目录 前言一、指针 ... 一个内存单元多大比较合适&#xff1f;二、地址或者编号如何产生&#xff1f;三、指针变量的大小 前言 内存是电脑上特别重要的存储器&#xff0c;计算机中程序的运行都是在内存中进行的。 所以为了有效的使用内存&#xff0c;就把内存划分成一个个…

记1次前端性能优化之CPU使用率

碰到这样的一个问题&#xff0c;用户反馈页面的图表一直加载不出来&#xff0c;页面还卡死 打开链接页面&#xff0c;打开控制台 Network 看到有个请求一直pending&#xff0c;结合用户描述&#xff0c;页面一直loading,似乎验证了我的怀疑&#xff1a;后端迟迟没有相应。 但是…

【工作笔记-0038】mongodb mongorestore 命令行导入 bson.gz数据

1. 导出的集合文件格式如下&#xff08;也就是导出的表文件&#xff09;&#xff1a; 例如&#xff1a; D:\Files\xxxx集合名称.bson.gz 怎样导出&#xff0c;这里不做介绍&#xff0c;用 mongodb compass 或者 studio 3t 都可以 2. 下载命令行导入工具&#xff1a; 官方…