flink cdc mysql整理与总结

news2024/12/28 3:16:52

文章目录

  • 一、业务中常见的需要数据同步的场景
    • CDC是什么
    • FlinkCDC是什么
    • CDC原理
    • 为什么是FlinkCDC
    • 业务场景
    • flink cdc对应flink的版本
  • 二、模拟案例
    • 1.阿里云flink sql
    • 2.开源flink sql(单机模式)
    • flink 安装
    • 安装mysql
    • 3.flink datastream
  • 三、总结


提示:以下是本篇文章正文内容,下面案例可供参考

一、业务中常见的需要数据同步的场景

1、多个库的表合并到一张表。不同的业务线或者微服务在不同的数据库里开发,但是此时有些报表需要将多个库的类似的数据合并后做查询统计。或者,某些历史原因,类似刚开始的商业模式不清晰,导致一些业务线分分合合。或者某些边缘业务逐步融合到了主业务。早起的数据是分开的,业务运营也是分开,后来又合并成了一个大块业务。

2、某个数据需要写到多个存储中。业务数据需要写入到多个中间件或者存储中,比如业务的数据存储再Mysql的数据中,后来为了方便检索需要写入到ES,或者为了缓存需要写入到Redis,或者是Mysql分表的数据合并写入到Doris中。

3、数据仓库的场景。比如将表里的数据实时写入到DWS数据仓库的宽表中。

4、应急场景。如果不采专用CDC的方案,那么要达到实时查询的效果,只能在BFF层的代码调用多个中心层的查询API,然后再BFF层做各种聚合,运算。这种方式开发效率低下,万一有的中心层没有提供合适的查询API,临时开发的话,会让开发进度不可控。

总之,不管是数据多写、还是多表合并、还是建立数据仓库,都属于数据同步任务。

示例:pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。

CDC是什么

CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。

目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。

FlinkCDC是什么

官网地址:官网FlinkCDC

官方定义:This project provides a set of source connectors for Apache Flink® directly ingesting changes coming from different databases using Change Data Capture(CDC)。根据FlinkCDC官方给出的定义,FlinkCDC提供一组源数据的连接器,使用变更数据捕获的方式,直接吸收来自不同数据库的变更数据。

CDC原理

CDC的原理是,当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。这种方式的优点是实时性高,可以精确捕捉上游的各种变动。

为什么是FlinkCDC

1、FlinkCDC 提供了对 Debezium 连接器的封装和集成,简化了配置和使用的过程,并提供了更高级的 API 和功能,例如数据格式转换、事件时间处理等。Flink CDC 使用 Debezium 连接器作为底层的实现,将其与 Flink 的数据处理能力结合起来。通过配置和使用 Flink CDC,您可以轻松地将数据库中的变化数据流转化为 Flink 的 DataStream 或 Table,并进行实时的数据处理、转换和分析。

2、Flink的DataStream和SQL比较成熟和易用

3、Flink支持状态后端(State Backends),允许存储海量的数据状态

4、Flink有更好的生态,更多的Source和Sink的支持

业务场景

  • 数据合并流向:
    在这里插入图片描述
  • 数据多写流向:
    在这里插入图片描述
  • 单数据源写单表流向:
    在这里插入图片描述
  • 数据链路对比
    通过下图,我们可以看到Canal处理数据的链路比FlinkCDC更长,数据链路一旦变长意味着,出错的可能性更高。
    在这里插入图片描述
    在这里插入图片描述

flink cdc对应flink的版本

在这里插入图片描述

二、模拟案例

1.阿里云flink sql

  • 验证mysql开启binlog
    在这里插入图片描述
    flink sql 定义binlog源数据,拿到数据处理(业务逻辑)再写表,后面很简单了
    在这里插入图片描述

2.开源flink sql(单机模式)

背景:win10电脑安装vmware(虚拟化)软件,虚拟机中安装
linux节点一个,flink,mysql(yum默认安装的最新版本:8.0.37),java环境(此处安装java环境略,网上有)

flink 安装

###在linux下载flink包
[root@slave2 ~]# wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
### 加压包到当前目录下
[root@slave2 ~]# tar zxvf flink-1.16.3-bin-scala_2.12.tgz 
[root@slave2 ~]# cd flink-1.16.3
[root@slave2 lib]# wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.0/flink-sql-connector-mysql-cdc-2.4.0.jar

修改flink 文件
[root@slave2 flink-1.16.3]# cat conf/flink-conf.yaml 修改内容如下(如果不修改则win10本地电脑无法访问flink web UI,这里浪费很多时间):
taskmanager.host: localhost
rest.bind-address: 0.0.0.0

###启动flink
[root@slave2 flink-1.16.3]# bin/start-cluster.sh
###启动客户端
root@slave2 flink-1.16.3]# ./bin/sql-client.sh embedded 

在这里插入图片描述

安装mysql

安装mysql遇到很多小问题

[root@slave2 ~]# yum -y install mysql-community-server
“MySQL 8.0 Community Server” 的 GPG 密钥已安装,但是不适用于此软件包。请检查源的公钥 URL 是否配置正确。

失败的软件包是:mysql-community-libs-8.0.37-1.el7.x86_64

在这里插入图片描述

###用于跳过GPG签名检查 可以安装成功
[root@slave2 ~]# yum -y install mysql-server --nogpgcheck   
###验证myqsl是否可用
[root@slave2 ~]# systemctl start mysqld
[root@slave2 ~]# mysql -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 63
Server version: 8.0.37 MySQL Community Server - GPL
Copyright (c) 2000, 2024, Oracle and/or its affiliates.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> 

### 配置开启binlog
[root@slave2 ~]# vim /etc/my.cnf
log_bin=mysql_bin 
binlog-format=Row 
server-id=1   
###重启mysql
[root@slave2 ~]# systemctl restart mysqld
###重新登陆mysql并查看binlog开启情况
mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.01 sec)

在这里插入图片描述
mysql创建新用户等

###创建新用户时发现一直报错。查看mysql建密码时要求比较高(新版本mysql对密码要求高)
mysql> SHOW VARIABLES LIKE 'validate_password%';
+-------------------------------------------------+--------+
| Variable_name                                   | Value  |
+-------------------------------------------------+--------+
| validate_password.changed_characters_percentage | 0      |
| validate_password.check_user_name               | ON     |
| validate_password.dictionary_file               |        |
| validate_password.length                        | 10     |
| validate_password.mixed_case_count              | 1      |
| validate_password.number_count                  | 1      |
| validate_password.policy                        | MEDIUM |
| validate_password.special_char_count            | 1      |
+-------------------------------------------------+--------+
8 rows in set (0.00 sec)
###修改密码要求(在工作生产环境不建议这么做)
mysql> SET GLOBAL validate_password.length = 3;
Query OK, 0 rows affected (0.03 sec)
mysql> SET GLOBAL validate_password.policy = LOW;
Query OK, 0 rows affected (0.00 sec)
###再次查看对新建用户密码要求
mysql> SHOW VARIABLES LIKE 'validate_password%';
+-------------------------------------------------+-------+
| Variable_name                                   | Value |
+-------------------------------------------------+-------+
| validate_password.changed_characters_percentage | 0     |
| validate_password.check_user_name               | ON    |
| validate_password.dictionary_file               |       |
| validate_password.length                        | 4     |
| validate_password.mixed_case_count              | 1     |
| validate_password.number_count                  | 1     |
| validate_password.policy                        | LOW   |
| validate_password.special_char_count            | 1     |
+-------------------------------------------------+-------+
8 rows in set (0.00 sec)
###新建用户:root1234
mysql> CREATE USER 'root1234'@'localhost' IDENTIFIED BY 'root1234';
Query OK, 0 rows affected (0.01 sec)

mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root1234'@'localhost';
Query OK, 0 rows affected (0.02 sec)

mysql> GRANT SELECT ON *.* TO 'root1234'@'localhost';
Query OK, 0 rows affected (0.01 sec)

mysql> SELECT User, Host FROM mysql.user;
+------------------+-----------+
| User             | Host      |
+------------------+-----------+
| mysql.infoschema | localhost |
| mysql.session    | localhost |
| mysql.sys        | localhost |
| root             | localhost |
| root1234         | localhost |
+------------------+-----------+
5 rows in set (0.00 sec)

mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.00 sec)

建mysql库表(数据源头库表)

Flink SQL>
select * from test_flink_cdc5;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Can’t find any matched tables, please check your configured database-name: [mysql] and table-name: [mysql.test_cdc]

###mysql 默认只有这4个库(当时直接用默认库mysql建表导致flink 报一些上面奇诡的错)
mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
| sys                |
+--------------------+
###新建一个库:test
mysql> CREATE DATABASE test CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
Query OK, 1 row affected (0.02 sec)

mysql> use test;
Database changed
###建表
mysql>  CREATE TABLE `test_cdc` (
    ->   `id` int NOT NULL AUTO_INCREMENT,
    ->   `name` varchar(255) DEFAULT NULL,
    ->   PRIMARY KEY (`id`)
    ->  ) ENGINE=InnoDB ;
Query OK, 0 rows affected (0.04 sec)

mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| test_cdc       |
+----------------+
1 row in set (0.00 sec)

##开始验证

##mysql 使用上面创建的用户密码登录mysql
[root@slave2 ~]# mysql -u root -p'root1234'
mysql> use test
##flink登录
[root@slave2 flink-1.16.3]# bin/stop-cluster.sh
[root@slave2 flink-1.16.3]# ./bin/sql-client.sh embedded 
##在flink sql中定义mysql源
CREATE TABLE test_flink_cdc ( 
  id INT, 
  name STRING,
  primary key(id)  NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc', 
  'hostname' = '127.0.0.1', 
  'port' = '3306', 
  'username'='root1234', 
  'password'='root1234', 
  'database-name'='test', 
  'table-name'='test_cdc'
);
###查询flink 接收到的binlog数据
Flink SQL> select * from test_flink_cdc;

###到mysql sql界面向test_cdc表插入数据
mysql> INSERT INTO test_cdc VALUES (001, 'test01');
Query OK, 1 row affected (0.02 sec)

mysql> INSERT INTO test_cdc VALUES (002, 'test02');
Query OK, 1 row affected (0.01 sec)

mysql> INSERT INTO test_cdc VALUES (003, 'test03');
Query OK, 1 row affected (0.04 sec)

mysql> INSERT INTO test_cdc VALUES (004, 'test04');
Query OK, 1 row affected (0.01 sec)

mysql> INSERT INTO test_cdc VALUES (005, 'test05');
Query OK, 1 row affected (0.01 sec)

mysql> INSERT INTO test_cdc VALUES (006, 'test06');
Query OK, 1 row affected (0.00 sec)


向mysql表中插入数据
在这里插入图片描述
flink sql这时可以接到binlog数据
在这里插入图片描述
查看flink UI job情况
在这里插入图片描述
在这里插入图片描述
小结:当flink可以拿到mysql binlog源头数据,下面就好做了,根据自己的业务处理sink到任何数据库或组件中(例如sink到mysql,hbase,hive,pg,kafka等等),后面sink就不演示了。

下载链接:
1.mysql jdbc jar包驱动下载
2.flink cdc驱动下载
3.flink下载

3.flink datastream

datastream 比较灵活简单,下面是举例代码片段(datastream的CDC比flink sql还简单,打个jar包在flink web UI界面上传运行即可,此处不做举例)

public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // set captured database
        .tableList("yourDatabaseName.yourTableName") // set captured table
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // enable checkpoint
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    env.execute("Print MySQL Snapshot + Binlog");
  }
}

三、总结

  • 公司用的cdc不能分享出来,所以搭建上面案例时遇到很多问题,遇到很多坑。
  • 工作中使用的就是如下截图流程,没有使用canal和kafka,使用的是logtail和SLS(阿里云的组件,类似kafka,但要比kafka等功能强大)
  • 上面说了很多关于flink cdc的优点。我结合工作中使用的flink cdc说一些缺点(自己结合业务场景)。

1.使用flink cdc不适合直观观察binlog的数据(例如脏数据,数据断流,不能直观看到最近的binglog情况,表的更新频率不高等造成的困扰)。


2.使用阿里云SLS收集mysql binlog数据,将数据保存近1个月(保持时间可设置),同时可以在logstore查看数据结构样式(sls支持sql语法查询日志数据),方便后续flink代码开发,也方便flink sql和datastream代码debug。
3.使用flink cdc做数据质量监控比较后知后觉(只能监控已表数据),我这边做法是直接监控sls原始数据质量,发现有数据质量问题会报出来。sls也支持实时断流提醒。
4.一般对于简单的不太重要的业务适合使用flink cdc,这样开发快,数据流不用咋校验。对于复杂数据或复杂业务或重要数据,需要观察binlog数据结果(不信任上游其他部门数据)还是使用类似sls比较好。方便查询数据变化与汇总,方便做数据报警等。
5.总之还是要根据自己的业务场景和自己公司现有技术组件组合着使用比较好。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1700284.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MFC扩展库BCGControlBar Pro v34.1新版亮点:日历和计划表等功能升级

BCGControlBar库拥有500多个经过全面设计、测试和充分记录的MFC扩展类。 我们的组件可以轻松地集成到您的应用程序中&#xff0c;并为您节省数百个开发和调试时间。 BCGControlBar专业版 v34.1已正式发布了&#xff0c;这个版本包含了对Windows 10/11字体图标的支持、功能区和…

huggingface 笔记:查看GPU占用情况

0 准备部分 0.1 创建虚拟数据 import numpy as npfrom datasets import Datasetseq_len, dataset_size 512, 512 dummy_data {"input_ids": np.random.randint(100, 30000, (dataset_size, seq_len)),"labels": np.random.randint(0, 1, (dataset_size…

PHP+Lucky+Baby母婴用品网站的设计与实现75554-计算机毕业设计项目选题推荐(附源码)

摘 要 近年来&#xff0c;随着移动互联网的快速发展&#xff0c;电子商务越来越受到网民们的欢迎&#xff0c;电子商务对国家经济的发展也起着越来越重要的作用。简单的流程、便捷可靠的支付方式、快捷畅通的物流快递、安全的信息保护都使得电子商务越来越赢得网民们的青睐。现…

中国低调海外巨头,实力超乎想象!

在全球化的浪潮中&#xff0c;中国公司正以前所未有的速度和规模走向世界。他们或许低调&#xff0c;但却实力非凡&#xff0c;在国际市场上掀起了一股不可小觑的“中国风暴”。今天&#xff0c;就让我们揭开那些在国外牛逼到爆炸的中国公司的神秘面纱&#xff0c;深度解析他们…

IP编址、进制转换、IP地址分类、变长子网掩码VLSM、无类域间路由CIDR

前言 网络层位于数据链路层与传输层之间。网络层中包含了许多协议&#xff0c;其中最为重要的协议就是IP协议。网络层提供了IP路由功能。理解IP路由除了要熟悉IP协议的工作机制之外&#xff0c;还必须理解IP编址以及如何合理地使用IP地址来设计网络。 IP编址 每个网段上都有两…

滤波电路应用笔记

滤波器 什么是滤波器&#xff1f; 滤波器是一种使某些频率或频带上的电信号通过而阻止其他信号通过的装置。 滤波电路的作用是尽可能减少脉动直流电压中的交流分量&#xff0c;保持其直流分量&#xff0c;降低输出电压的纹波系数&#xff0c;并使波形相对平滑。整流电路的输出…

商标注册申请名称的概率,多想名称选通过率好的!

近日给深圳客户申请的商标初审下来了&#xff0c;两个类别都下的初审&#xff0c;和当初的判断基本一致&#xff0c;普推知产老杨当时沟通说需要做担保申请注册也可以&#xff0c;后面选择了管家注册&#xff0c;最近大量的帮客户检索商标名称&#xff0c;分享下经验。 两个字基…

PC端应用订阅SDK接入攻略

本文档介绍了联想应用联运sdk接入操作指南&#xff0c;您可在了解文档内容后&#xff0c;自行接入应用联运sdk。 1. 接入前准备 1. 请先与联想商务达成合作意向。 2. 联系联想运营&#xff0c;提供应用和公司信息&#xff0c;并获取商户id、app id、key&#xff08;公私钥、…

【设计模式深度剖析】【4】【结构型】【组合模式】| 以文件系统为例加深理解

&#x1f448;️上一篇:适配器模式 设计模式深度剖析-专栏&#x1f448;️ 目 录 组合模式定义英文原话直译如何理解&#xff1f; 3个角色UML类图代码示例 组合模式的优点组合模式的使用场景示例解析&#xff1a;文件系统 组合模式 组合模式&#xff08;Composite Pattern&a…

【经典论文阅读10】MNS采样——召回双塔模型的最佳拍档

这篇发表于2020 WWW 上的会议论文&#xff0c;提出一种MNS方式的负样本采样方法。众所周知&#xff0c;MF方法难以解决冷启动问题&#xff0c;于是进化出双塔模型&#xff0c;但是以双塔模型为基础的召回模型的好坏十分依赖负样本的选取。为了解决Batch内负样本带来的选择性偏差…

IT廉连看——UniApp——条件渲染

IT廉连看——UniApp——条件渲染 什么是条件渲染&#xff1f; 顾名思义&#xff0c;满足一定的条件他才会进行渲染。 这是我们上节事件绑定保留的代码。 一、现在我有这样一个需求&#xff1a; 增加一个按钮&#xff0c;当我点击这个按钮&#xff0c;这里的文本&#xff0…

ICML 2024 | 即插即用!无需训练!基于球面高斯约束引导的条件扩散模型

©PaperWeekly 原创 作者 | 杨凌霄 单位 | 上海科技大学信息学院 论文标题&#xff1a; Guidance with Spherical Gaussian Constraint for Conditional Diffusion 论文作者&#xff1a; 杨凌霄、丁枢桐、蔡逸凡、虞晶怡、汪婧雅、石野 通讯作者&#xff1a; 石野 论文链接…

【数据结构】二叉树-堆(上)

个人主页~ 二叉树-堆 一、树的概念及结构1、概念2、相关概念3、树的表示4、树的实际应用 二、二叉树的概念和结构1、概念2、特殊二叉树3、二叉树的性质4、二叉树的存储结构&#xff08;1&#xff09;顺序存储&#xff08;2&#xff09;链式存储 三、二叉树的顺序结构以及实现1、…

【Linux-时间管理和内核定时器】

Linux-时间管理和内核定时器 ■ 设置系统节拍率■ 高节拍率和低节拍率的优缺点&#xff1a;■ jiffies 系统节拍数■ get_jiffies_64 这个函数可以获取 jiffies_64 的值■ 处理绕回■ 使用 jiffies 判断超时 ■ jiffies 和 ms、 us、 ns 之间的转换函数在这里插入代码片■ 内核…

pytest:指定测试用例执行顺序

在自动化测试中&#xff0c;测试用例的执行顺序有时对测试结果具有重要影响。本文将介绍如何在pytest框架中使用pytest-ordering插件以及Collection hooks来控制测试用例的执行顺序。 方式1&#xff1a; 使用pytest-ordering插件控制执行顺序 1.1 安装pytest-ordering插件 首…

Android Studio开发之路(十三)主题影响Button颜色问题解决及button自定义样式

一、问题描述 在开发过程中发现安卓的默认主题色是紫色&#xff0c;并且会导致button也是紫色&#xff0c;有时直接在xml布局文件中直接设置button的背景色或者设置背景图片不起效果 方案一、如果是app&#xff0c;可以直接设置主题颜色 比如&#xff0c;将主题设置为白色&a…

【STM32】 独立看门狗配置方法

什么是看门狗 看门狗&#xff08;watchdog&#xff09;指的是一种监控系统或程序&#xff0c;用于定期检测和监控其他系统或程序的运行状态&#xff0c;并在出现问题或故障时采取相应的措施。它可以是硬件设备&#xff0c;也可以是软件程序。 在计算机领域中&#xff0c;看门狗…

Java Swing + MySQL图书借阅管理系统

系列文章目录 Java Swing MySQL 图书管理系统 Java Swing MySQL 图书借阅管理系统 文章目录 系列文章目录前言一、项目展示二、部分代码1.Book2.BookDao3.DBUtil4.BookAddInternalFrame5.Login 三、配置 前言 项目是使用Java swing开发&#xff0c;界面设计比较简洁、适合作…

Taipy快速打造数据驱动的Web应用

Taipy&#xff1a; 用Taipy&#xff0c;让数据洞察与应用开发无缝对接- 精选真开源&#xff0c;释放新价值。 概览 Taipy快速打造数据驱动的 Web 应用。这是一个基于 Python 和 Flask 的项目&#xff0c;结合了 React 等前端技术&#xff0c;为开发者提供了一个简洁、高效的开…

【计算机毕业设计】基于SSM+Vue的新能源汽车在线租赁管理系统【源码+lw+部署文档】

摘 要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;新能源汽车在线租赁当然也不能排除在外。新能源汽车在线租赁是以实际运用为开发背景&#xff0c;运用软件工程开发方法&…