基于 Flink CDC 构建 MySQL 的 Streaming ETL to MySQL

news2024/11/23 21:49:05

简介

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛:
• 数据同步:用于备份,容灾;
• 数据分发:一个数据源分发给多个下游系统;
• 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。
CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:
• 基于查询的 CDC:
• 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;
• 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
• 不保障实时性,基于离线调度存在天然的延迟。
• 基于日志的 CDC:
• 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
• 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
• 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。
对比常见的开源 CDC 方案,我们可以发现:
• 对比增量同步能力,
• 基于日志的方式,可以很好的做到增量同步;
• 而基于查询的方式是很难做到增量同步的。
• 对比全量同步能力,基于查询或者日志的 CDC 方案基本都支持,除了 Canal。
• 而对比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。
• 从架构角度去看,该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。
• 在数据转换 / 数据清洗能力上,当数据进入到 CDC 工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合?
• 在 Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些数据;
• 但是像 DataX、Debezium 等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。
• 另外,在生态方面,这里指的是下游的一些数据库或者数据源的支持。Flink CDC 下游有丰富的 Connector,例如写入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常见的一些系统,也支持各种自定义 connector。
在这里插入图片描述
在这里插入图片描述

1.安装单机版

下载

yum install -y java-1.8.0-openjdk.x86_64
yum install -y  java-1.8.0-openjdk-devel
wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz
mkdir -p /opt/flink
tar -zxvf flink-1.17.2-bin-scala_2.12.tgz -C /opt/flink 

下载jar复制到/opt/flink/flink-1.17.2/lib

<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-mysql-cdc</artifactId>
    <version>2.4.2</version>
    <scope>provided</scope>
</dependency>

配置

vim /opt/flink/flink-1.17.2/conf/flink-conf.yaml

rest.port: 8081
rest.bind-address: 0.0.0.0
jobmanager.execution.timezone: Asia/Shanghai

启动

/opt/flink/flink-1.17.2/bin/stop-cluster.sh
/opt/flink/flink-1.17.2/bin/start-cluster.sh 

访问http://10.6.8.227:8081/

2.创建 两个mysql 数据库

docker run -p 13306:3306 \
-e MYSQL_ROOT_PASSWORD=mysql \
-d mysql

docker run -p 23306:3306 \
-e MYSQL_ROOT_PASSWORD=mysql \
-d mysql

初始化mysql 表结构

CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);

在源库中插入数据

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

3.CDC 步骤

启动 /opt/flink/flink-1.17.2/bin/sql-client.sh
只能一条语句一条语句的执行

CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '13306',
    'username' = 'root',
    'password' = 'mysql',
    'database-name' = 'mydb',
    'table-name' = 'products'
  );


CREATE TABLE sink_products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:23306/mydb?serverTimezone=Asia/Shanghai',
    'username' = 'root',
    'password' = 'mysql',
    'table-name' = 'sink_products'
  );



insert into sink_products select * from products;

4.验证

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

参考文档

http://124.220.104.235/web/chatgpt

https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1292892.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【学习记录】从0开始的Linux学习之旅——字符型设备驱动及应用

一、概述 Linux操作系统通常是基于Linux内核&#xff0c;并结合GNU项目中的工具和应用程序而成。Linux操作系统支持多用户、多任务和多线程&#xff0c;具有强大的网络功能和良好的兼容性。基于前面应用与驱动的开发学习&#xff0c;本文主要讲述如何在linux系统上把应用与驱动…

随笔-这都是命吗

我与鹏哥、小付有个小群&#xff0c;前几天&#xff0c;鹏哥在群里发了一个图&#xff0c;是他那个城市准备扶持的高新产业&#xff0c;有元宇宙、量子信息、生物制药、人工智能什么的。 先前的时候鹏哥给我说过&#xff0c;当地准备了六百多亩地&#xff0c;准备发展高新产业…

Labelme2Yolo labelme格式的json标注转yolo格式txt

该工作适用于目标检测工作。 由于labelme标注出的文件是如下图的单个json文件格式&#xff0c;不符合yolo的训练格式&#xff0c;需要转格式。 观察发现labelme标注的json文件中有imageData&#xff0c;还挺大的&#xff0c;查阅后得知是base64后的图片数据&#xff0c;也就是…

多表操作、其他字段和字段参数、django与ajax(回顾)

多表操作 1 基于对象的跨表查 子查询----》执行了两句sql&#xff0c;没有连表操作 2 基于双下滑线的连表查 一次查询&#xff0c;连表操作 3 正向和反向 放在ForeignKey,OneToOneField,ManyToManyField的-related_namebooks&#xff1a;双下滑线连表查询&#xff0c;反向…

深圳锐杰金融:用金融力量守护社区健康

深圳市锐杰金融投资有限公司&#xff0c;作为中国经济特区的中流砥柱&#xff0c;近年来以其杰出的金融成绩和坚定的社会责任立场引人注目。然而&#xff0c;这并非一个寻常的金融机构。锐杰金融正在用自己的方式诠释企业责任和慈善精神&#xff0c;通过一系列独特的慈善项目&a…

定兴县第三实验小学开展“宪法宣传周”系列活动

2023年12月4日是我国第十个国家宪法日&#xff0c;我校集中深入学习宣传宪法&#xff0c;弘扬宪法精神&#xff0c;维护宪法权威&#xff0c;开展“宪法宣传周”系列活动。 宪法主题升旗仪式 五&#xff08;6&#xff09;班薛谨熙同学以《学法懂法 与我同行》为主题做国旗下讲…

【开源】基于JAVA语言的APK检测管理系统

项目编号&#xff1a; S 038 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S038&#xff0c;文末获取源码。} 项目编号&#xff1a;S038&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 开放平台模块2.3 软…

低代码你需要了解一下

低代码的概念可以追溯到1980年代&#xff0c;当时IBM的快速应用程序开发工具&#xff08;RAD&#xff09;被冠以新的名称——低代码&#xff0c;由此&#xff0c;低代码的概念首次面向大众。然而&#xff0c;在近40年的历程中&#xff0c;低代码发展经历了两个阶段&#xff1a;…

Ray构建GPU隔离的机器学习平台

Ray框架介绍 Ray 是一个开源分布式计算框架,在 机器学习基础设施中发挥着至关重要的作用。Ray 促进分布式机器学习训练,使机器学习从业者能够有效利用多个 GPU 的能力。 Ray可以在集群上分布式地运行任务,并且可以指定任务运行时需要使用的GPU数量。Ray可与Nvidia-docker等…

Adobe系列软件:创意之旅的得力助手

在数字创意领域&#xff0c;Adobe系列软件一直以其卓越的性能和广泛的应用而备受瞩目。从图像处理、视频编辑到音频编辑&#xff0c;从网页开发到排版设计&#xff0c;这些软件都提供了强大的功能和工具&#xff0c;帮助用户实现他们的创意。 让我们详细介绍这些软件的作用&…

文件管理:每个文件夹只移入1个文件要怎样操作?批量移动文件技巧

在文件管理过程中&#xff0c;有时要将多个文件分别移动到不同的文件夹中&#xff0c;每个文件夹只包含一个文件。这样的需求可能出现在许多场景中&#xff0c;比如整理文件、备份资料或者进行特定的项目处理。如果每个手动去移动文件就会出现丢失的情况&#xff0c;以及太过耗…

【设计模式-3.1】结构型——外观模式

说明&#xff1a;本文介绍设计模式中结构型设计模式中的&#xff0c;外观模式&#xff1b; 亲手下厨还是点外卖&#xff1f; 外观模式属于结构型的设计模式&#xff0c;关注类或对象的组合&#xff0c;所呈现出来的结构。以吃饭为例&#xff0c;在介绍外观模式之前&#xff0…

谷歌ARCore认证,什么是ARCore认证

一、谷歌ARCore认证介绍 谷歌ARCore 是 Google 推出的用于打造增强现实体验的平台,利用移动设备的传感器以及相机通过不同的 API 让您的手机能够感知其所处环境、了解世界并进行信息交互。设备要使用谷歌的ARCore功能&#xff0c;需要进行测试并通过认证后方可预载或使用Googl…

Python编程技巧 – 异常处理

Python编程技巧 – 异常处理 Python Programming Skills – Exception Handling By JacksonML 每一个程序都未必是健壮的&#xff0c;有时候很脆弱。只有在人的理想思维状况下&#xff0c;返回的结果才是正确的&#xff0c;如意的。 1. 错误发生及异常输出 面对种种编写有疏…

人工智能_机器学习061_KKT条件公式理解_原理深度解析_松弛变量_不等式约束---人工智能工作笔记0101

然后我们再来看,前面我们,拉格朗日乘子法,把带有条件的,问题,优化成了等式问题,从而, 构建拉格朗日乘子公式,进行实现了求解,但是在现实生活中,往往也有,很多不等式问题. 比如上面的这个,就是要求是h(x)<=0的情况下,函数f(x)的最小值. 可以看到,这个带有一个不等式的条件,…

揭秘C语言结构体:通往内存对齐的视觉之旅

揭秘C语言结构体&#xff1a;通往内存对齐的视觉之旅 引言 在C语言的编程旅程中&#xff0c;结构体&#xff08;structs&#xff09;是一个关键而强大的概念。结构体不仅允许我们组织和存储不同类型的数据&#xff0c;而且通过深入了解内存对齐&#xff0c;我们可以更好地优化…

ZKP Understanding Nova (2) Relaxed R1CS

Understanding Nova Kothapalli, Abhiram, Srinath Setty, and Ioanna Tzialla. “Nova: Recursive zero-knowledge arguments from folding schemes.” Annual International Cryptology Conference. Cham: Springer Nature Switzerland, 2022. Nova: Paper Code 2. Unders…

micro_ros_setup包镜像及部分注释(我觉得此包支持很有限)

GitHub - micro-ROS/micro_ros_setup at humble README.md This ROS 2 package(这是一个包) is the entry point for building micro-ROS apps for different embedded platforms. Supported platforms Standalone build system toolsDependenciesQuick startBuilding Creati…

QML与C++之间结构体输出

1.定义带有结构体的头文件TrackClass.h #ifndef TRACKCLASS_H #define TRACKCLASS_H#include <QGuiApplication> #include "QObject" #include <QVector>struct TrackPoint {Q_GADGETQ_PROPERTY(qreal lat MEMBER lat)Q_PROPERTY(qreal lon MEMBER lon)…

中山大学李华山、王彪课题组开发 SEN 机器学习模型,高精度预测材料性能

内容一览&#xff1a;了解全局晶体对称性并分析等变信息&#xff0c;对于预测材料性能至关重要&#xff0c;但现有的、基于卷积网络的算法尚且无法完全实现这些需求。针对于此&#xff0c;中山大学的李华山、王彪课题组&#xff0c;开发了一款名为 SEN 的机器学习模型&#xff…