数据实时获取方案之Flink CDC

news2025/1/11 22:01:13

目录

  • 一、方案描述
  • 二、Flink CDC
    • 1.1 什么是CDC
    • 1.2 什么是Flink CDC
    • 1.3 其它CDC
    • 1.4 FlinkCDC所支持的数据库情况
  • 二、使用Pipeline连接器实时获取数据
    • 2.1 环境介绍
    • 2.2 相关版本信息
    • 2.3 详细步骤
      • 2.3.1 实时获取MySQL数据并发送到Kafka
      • 2.3.2 实时获取MySQL数据并同步到Doris数据库

一、方案描述

在这里插入图片描述

由Flink CDC来监测到源数据库数据变更并将其发送到Kafka或同步到目标数据库中,再由后续消费者或其它应用来使用数据。

二、Flink CDC

1.1 什么是CDC

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2 什么是Flink CDC

官方文档地址:[项目介绍 | Apache Flink CDC](Introduction | Apache Flink CDC)
官方描述:Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。

1.3 其它CDC

在这里插入图片描述

1.4 FlinkCDC所支持的数据库情况

在这里插入图片描述

Flink CDC 提供了可用于 YAML 作业的 Pipeline Source 和 Sink 连接器来与外部系统交互。可以直接使用这些连接器,只需将 JAR 文件添加到您的 Flink CDC 环境中,并在 YAML Pipeline 定义中指定所需的连接器。
在这里插入图片描述

Flink社区开发了flink-cdc-connectors组件,这是一个可以直接从MySQL、PostgreSQL等数据库直接读取全量数据和增量变更数据的source组件(其中一些组件是基于Debezium来获取数据变更,它可以充分利用Debezium的能力)。使用这些组件可以通过Flink SQL或代码开发的方式获取目标数据库的全量数据和增量变更数据。
ConnectoryDatabaseDrivermongodb-cdcMongoDB: 3.6, 4.x, 5.0MongoDB Driver: 4.3.4mysql-cdcMySQL: 5.6, 5.7, 8.0.xRDS MySQL: 5.6, 5.7, 8.0.xPolarDB MySQL: 5.6, 5.7, 8.0.xAurora MySQL: 5.6, 5.7, 8.0.xMariaDB: 10.xPolarDB X: 2.0.1JDBC Driver: 8.0.28oceanbase-cdcOceanBase CE: 3.1.x, 4.xOceanBase EE: 2.x, 3.x, 4.xOceanBase Driver: 2.4.xoracle-cdcOracle: 11, 12, 19, 21Oracle Driver: 19.3.0.0postgres-cdcPostgreSQL: 9.6, 10, 11, 12, 13, 14JDBC Driver: 42.5.1sqlserver-cdcSqlserver: 2012, 2014, 2016, 2017, 2019JDBC Driver: 9.4.1.jre8tidb-cdcTiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0JDBC Driver: 8.0.27db2-cdcDb2: 11.5Db2 Driver: 11.5.0.0vitess-cdcVitess: 8.0.x, 9.0.xMySql JDBC Driver: 8.0.26

二、使用Pipeline连接器实时获取数据

2.1 环境介绍

我们下面将主要展示通过使用Pipeline连接器来获取实时数据的流程。
准备环境:

  • 单节点的standalone模式的Flink集群
  • Flink CDC
  • 单节点Kafka
  • Doris快速体验版数据库
  • Mysql测试数据库

2.2 相关版本信息

  • Flink 1.18
  • Flink CDC 3.11
  • Kafka 3.6.1
  • Doris doris-2.0.3-rc06

2.3 详细步骤

引入所需依赖包
将 flink-cdc-pipeline-connector-doris-3.1.1.jar flink-cdc-pipeline-connector-kafka-3.1.1.jar flink-cdc-pipeline-connector-mysql-3.1.1.jar放入flink cdc的lib文件夹下

2.3.1 实时获取MySQL数据并发送到Kafka

1.编写同步变更配置文件
将yaml文件放入到flink-cdc下的job文件夹中

# 数据来源
source:
  type: mysql
  hostname: xxx.xxx.xxx.xxx
  port: 3306
  username: root
  password: "password"
  tables: doris_test.\.*
  server-id: 5400-5404
  server-time-zone: UTC+8

 # 数据去向
sink:
  type: kafka
  topic: test003
  properties.bootstrap.servers: xxx.xxx.xxx.xxx:9092
  format: json

pipeline:
  name: Sync MySQL Data to KAFKA
  parallelism: 2

2.启动Flink集群

# 在flink/bin下执行
./start-cluster.sh

3.启动Flink CDC 任务

# 在flink-cdc-3.1.1/bin下运行
./flink-cdc.sh ../job/mysql-to-kafka.yaml

启动成功
在这里插入图片描述

4.启动Kafka消费者

kafka-console-consumer.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --topic test003

5.在源数据库中修改数据并观察Kafka消费者
当在源数据库testfid表进行数据新增,删除或修改,Kafka消费者即能消费到对应数据
在这里插入图片描述

2.3.2 实时获取MySQL数据并同步到Doris数据库

1.编写同步变更配置文件

# 数据来源
source:
  type: mysql
  hostname: xxx.xxx.xxx.xxx
  port: 3306
  username: root
  password: "password"
  tables: doris_test.\.*
  server-id: 5400-5404
  server-time-zone: UTC+8

# 数据去向
sink:
  type: doris
  fenodes: xxx.xxx.xxx.xxx:8030
  username: root
  password: "password"
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2
     将yaml文件放入到flink-cdc下的job文件夹中

2.在Doris中创建数据库 doris_test

create database doris_test;

3.启动Flink CDC任务

# 在/app/path/flink-cdc-3.1.1/bin下执行
./flink-cdc.sh ../job/mysql-to-doris.yaml

4.进行数据变更并观察结果
先查看任务启动前源库MySQL和目标库Doris的数据情况,源库MySQL中共有两个表且表中已存在一些数据,Doris中没有表
在这里插入图片描述
在这里插入图片描述
启动任务后,两个表及数据都已同步到Doris中,当源表数据变更及表结构变更时,也都会实时同步到Doris中
在这里插入图片描述

5.进行路由变更后再进行测试并观察结果
Flink CDC Pipeline连接器也支持将两个同样表结构表的数据同步到目标数据库的一个表中

source:
  type: mysql
  hostname: xxx.xxx.xxx.xxx
  port: 3306
  username: root
  password: "password"
  tables: doris_test.\.*
  server-id: 5400-5404
  server-time-zone: UTC+8

sink:
  type: doris
  fenodes: xxx.xxx.xxx.xxx:8030
  username: root
  password: "password"
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

# 将源库中testfid和testfid_copy1表的数据同步到目标库的表route_test中
route:
  - source-table: doris_test.testfid
    sink-table: doris_test.route_test
  - source-table: doris_test.testfid_copy1
    sink-table: doris_test.route_test

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2

源数据
在这里插入图片描述
启动任务后Doris中的数据
在这里插入图片描述

源库中两个表的数据被合并同步到目标库的一个表中,但这只适用于相同表结构的合并,如果是不同表结构合并会造成数据错乱。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1937426.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

240718_使用Labelme制作自己的图像分割数据集

240718_使用Labelme制作自己的图像分割数据集 从目标检测入门的朋友们可能更熟悉的是LabelImg,这里要注意做好区分,LabelImg和Labelme不是一个东西,如下经典图: (a)图像分类(目标检测&#xff…

机器学习·概率论基础

概率基础 这部分太简单,直接略过 条件概率 独立性 独立事件A和B的交集如下 非独立事件 非独立事件A和B的交集如下 贝叶斯定理 先验 事件 后验 在概率论和统计学中,先验概率和后验概率是贝叶斯统计的核心概念 简单来说后验概率就是结合了先验概率的前提…

院内影像一体化平台PACS源码,C#语言的PACS/RIS系统,二级医院应用案例

全院级PACS系统源码,一体化应用系统整合,满足放射、超声、内窥镜中心、病理、检验等多个科室的工作流程和需求,为不同科室提供专业的解决方案,实现了全院乃至区域内信息互联互通、数据统一存储与管理等功能,做到以病人…

微软研发致胜策略 05:进度狂

这是一本老书,作者 Steve Maguire 在微软工作期间写了这本书,英文版于 1994 年发布。我们看到的标题是中译版名字,英文版的名字是《Debugging the Development Process》,这本书详细阐述了软件开发过程中的常见问题及其解决方案&a…

免费视频批量横转竖

简介 视频处理器 v1.3 是一款由是貔貅呀开发的视频编辑和处理工具,提供高效便捷的视频批量横转竖,主要功能: 导入与删除文件:轻松导入多个视频文件,删除不必要的文件。暂停与继续处理:随时暂停和继续处理。…

大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli

点一下关注吧!!!非常感谢!!持续更新!!! 目前已经更新到了: HadoopHDFSMapReduceHiveFlumeSqoopZookeeperHBaseRedis (正在更新) 章节内容 上一…

python中的数据类型-适合新手-比较完善(写了好久……)

作者的话 首先,我先申明,以下思路仅为个人理解,如有不同,望指导,谢谢。 数据类型它是什么,有什么用,怎么用就是它的全部内容,知识框架串联起来之后就是这三部分内容,没有…

【网络安全科普】勒索病毒 防护指南

勒索病毒简介 勒索病毒是一种恶意软件,也称为勒索软件(Ransomware),其主要目的是在感染计算机后加密用户文件,并要求用户支付赎金以获取解密密钥。这种类型的恶意软件通常通过电子邮件附件、恶意链接、下载的软件或漏洞…

数论基础知识

整除 辗转相除法 同余 模计算机 一次同余方程 费马小定理|欧拉定理|威尔逊定理 孙子定理(中国剩余定理) 快速指数算法(快速模乘法)(反复平方乘) 模重复平方法 二次剩余 Legendre符号欧拉判别法 原根 gc…

golang 解压带密码的zip包

目录 Zip文件详解ZIP 文件格式主要特性常用算法Zip格式结构图总览Zip文件结构详解数据区本地文件头文件数据文件描述 中央目录记录区(核心目录记录区 )中央目录记录尾部区 压缩包解压过程方式1 通过解析中央目录区来解压方式2 通过读取本地文件头来解压两…

JVM常用工具中jmap实现手动进行堆转储(heap dump文件)并使用MAT(Memory Analyzer Tool)进行堆分析-内存消耗分析

场景 JVM-常用工具(jps、jstat、jinfo、jmap、jhat、jstack、jconsole、jvisualvm)使用: JVM-常用工具(jps、jstat、jinfo、jmap、jhat、jstack、jconsole、jvisualvm)使用_jvm分析工具-CSDN博客 上面讲了jmap的简单使用。 下面记录其常用功能,实现堆…

C#+layui+echarts实现动态生成折线图

概要 C#layuiecharts实现动态生成折线图 整体架构流程 后端是c#语言编写的业务流程,前端是layui和echarts 技术细节 1.先看echarts折线图需要什么样子的数据,在想后端怎么处理 2.后端代码 List<ValveTempData> list new List<ValveTempData>(); string …

Spring Cloud中怎么使用Resilience4j RateLimiter对接口进行限流

在微服务架构中&#xff0c;限流是保护系统稳定性的重要手段之一。限流可以防止某个服务因流量过大而过载&#xff0c;影响整个系统的稳定性和性能。Resilience4j 提供了多种限流策略&#xff0c;其中 RateLimiter 是一种常用的限流机制。本文将详细介绍如何在 Spring Cloud 项…

组内第一次会议

会议内容 1、科研平台使用 增删改查对文件 cp -r /root/mmdetection/dataset/ /root/user/wbzExperiment/mmdetection/ rm -r /root/user/yolov5-master tar -czvf test03.tar.gz test03/ unzip abc.zip 上传文件、解压文件&#xff1a;要在自己的目录中&#xff0c;进入…

普中51单片机:LED点阵屏组成结构及实现方法详解(九)

文章目录 引言什么是LED点阵屏&#xff1f;工作原理74HC595移位寄存器基本引脚作用级联工作原理 电路图代码演示——16*16LED点阵屏轮播点亮每行LED代码演示——显示数字0代码演示——16*16游动字幕显示 引言 LED点阵屏作为一种广泛应用于现代显示技术的设备&#xff0c;因其能…

Linux_线程的使用

目录 1、线程与进程的关系 2、线程的优缺点 3、创建线程 4、查看启动的线程 5、验证线程是共享地址空间的 6、pthread_create的重要形参 6.1 线程id 6.2 线程实参 7、线程等待 8、线程退出 9、线程取消 10、线程tcb 10.1 线程栈 11、创建多线程 12、__th…

学生信息管理系统设计

学生信息管理系统的设计是一个综合性的项目&#xff0c;涉及到数据的存储、检索、更新和删除等基本操作&#xff0c;同时也需要考虑系统的易用性、安全性和扩展性。以下是一些关键步骤和要素&#xff0c;用于指导设计这样一个系统&#xff1a; 1. 需求分析 目标用户&#xff…

wls2下的centos使用桥接模式连接宿主机网络独立静态ip

前提&#xff1a;wsl2已安装&#xff0c;可正常更新 1.在控制面板中&#xff0c;打开开启或关闭windows功能&#xff0c;将里面的 Hyper-V功能打开&#xff0c;此处涉及重启 2. 按一下win键&#xff0c;输入hy&#xff0c;上面可以看到Hyper-V Manager,点进去 3.选择右边的 Vi…

Redis系列命令更新--Redis有序集合命令

Redis有序集合&#xff08;sorted set&#xff09; &#xff08;1&#xff09;说明&#xff1a; A、Redis有序集合和集合一样也是string类型元素的集合&#xff0c;且不允许重复的成员&#xff1b;不同的是每个元素都会关联一个double类型的分数&#xff1b;redis正式通过分数…

Java语言程序设计——篇五(1)

数组 概述数组定义实例展示实战演练 二维数组定义数组元素的使用数组初始化器实战演练&#xff1a;矩阵计算 &#x1f4ab;不规则二维数组实战演练&#xff1a;杨辉三角形 概述 ⚡️数组是相同数据类型的元素集合。各元素是有先后顺序的&#xff0c;它们在内存中按照这个先后顺…