33.flink cdc 实时数据同步利器

news2025/1/19 23:19:20

什么是flink cdc?

对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。
数据的同步目前对mysql来说比较常见是方式是使用:datax 和 canal配合, 为什么需要这两个框架配合呢?
因为datax不支持实时的同步, datax只能定义一个范围去同步,而且同步结束后程序就结束了。但是我想要的是数据仓库中的数据近乎实时的和mysql中的数据保持一致又该怎么办? 答案是再加上canal, canal和datax相反,它只支持指定一个binlog同步,然后会一直同步到现在,并且程序不会结束,会一直同步。 这样datax+canal就可以达到实时同步的功能。
这是业界比较常用的同步方式,datax同步历史数据,canal+kafka同步最新的数据,而且还要有一个程序去读取kafka中的binlog json数据(可以用flink或者spark又或者是flume)。可以看到这个链路比较长,不是很好。
下面是目前常见的cdc同步方案以及对比:
在这里插入图片描述

  1. DataX 不支持增量同步,Canal 不支持全量同步。虽然两者都是非常流行的数据同步工具,但
    在场景支持上仍不完善。
  2. 在全量+增量一体化同步方面,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。
  3. 在架构方面,Apache Flink 是一个非常优秀的分布式流处理框架,因此 Flink CDC 作为
    Apache Flink 的一个组件具有非常灵活的水平扩展能力。而 DataX 和 Canal 是个单机架构,
    在大数据场景下容易面临性能瓶颈的问题。
  4. 在数据加工的能力上,CDC 工具是否能够方便地对数据做一些清洗、过滤、聚合,甚至关联打
    宽? Flink CDC 依托强大的 Flink SQL 流式计算能力,可以非常方便地对数据进行加工。而
    Debezium 等则需要通过复杂的 Java 代码才能完成,使用门槛比较高。
  5. 另外,在生态方面,这里指的是上下游存储的支持。Flink CDC 上下游非常丰富,支持对接
    MySQL、PostgreSQL 等数据源,还支持写入到 TiDB、HBase、Kafka、Hudi 等各种存储系统
    中,也支持灵活的自定义 connector。
  6. 我们看到flink cdc 是比较友好的方案, 其内部实现上用的是Debezium去采集binlong, 而且可通过参数scan.startup.mode 来控制同步行为:
  1. initial (默认):在第一次启动时对受监视的数据库表执行全量同步,并继续读取最新的 binlog。
  2. earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
  3. latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
  4. specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
  5. timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。

一个demo

对flink_01 和flink_02 进行两个分表进行同步合并到:flink_merge


CREATE TABLE `flink_01` (
  `indicator_name` varchar(255) DEFAULT NULL COMMENT '指标名称',
  `indicator_value` varchar(255) DEFAULT NULL COMMENT '指标值',
  `indicator_code` int NOT NULL COMMENT '指标编码',
  `table_name` varchar(255) NOT NULL COMMENT '指标计算上游表名',
  `window_start` datetime NOT NULL COMMENT '窗口开始时间',
  `window_end` datetime DEFAULT NULL COMMENT '窗口截止时间',
  `create_time` datetime DEFAULT NULL COMMENT '创建更新时间',
  `indicator_description` varchar(255) DEFAULT NULL COMMENT '指标描述',
  PRIMARY KEY (`indicator_code`,`table_name`,`window_start`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '52', 0, 'app_login_log', '2022-12-14 00:00:00', '2022-12-15 00:00:00', '2022-12-19 18:09:24', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '49', 0, 'app_login_log', '2022-12-15 00:00:00', '2022-12-16 00:00:00', '2022-12-19 18:09:24', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '62', 0, 'app_login_log', '2022-12-16 00:00:00', '2022-12-17 00:00:00', '2022-12-19 18:09:25', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '3', 0, 'app_login_log', '2022-12-17 00:00:00', '2022-12-18 00:00:00', '2022-12-19 18:09:25', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '37', 0, 'app_login_log', '2022-12-19 00:00:00', '2022-12-20 00:00:00', '2022-12-20 11:22:02', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '20', 0, 'app_login_log', '2022-12-20 00:00:00', '2022-12-21 00:00:00', '2022-12-21 10:41:24', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '19', 0, 'app_login_log', '2022-12-21 00:00:00', '2022-12-21 15:19:00', '2022-12-21 15:46:27', '登录用户数');

CREATE TABLE `flink_02` (
  `indicator_name` varchar(255) DEFAULT NULL COMMENT '指标名称',
  `indicator_value` varchar(255) DEFAULT NULL COMMENT '指标值',
  `indicator_code` int NOT NULL COMMENT '指标编码',
  `table_name` varchar(255) NOT NULL COMMENT '指标计算上游表名',
  `window_start` datetime NOT NULL COMMENT '窗口开始时间',
  `window_end` datetime DEFAULT NULL COMMENT '窗口截止时间',
  `create_time` datetime DEFAULT NULL COMMENT '创建更新时间',
  `indicator_description` varchar(255) DEFAULT NULL COMMENT '指标描述',
  PRIMARY KEY (`indicator_code`,`table_name`,`window_start`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '52', 0, 'app_login_log', '2022-12-14 00:00:00', '2022-12-15 00:00:00', '2022-12-19 18:09:24', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '49', 0, 'app_login_log', '2022-12-15 00:00:00', '2022-12-16 00:00:00', '2022-12-19 18:09:24', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '62', 0, 'app_login_log', '2022-12-16 00:00:00', '2022-12-17 00:00:00', '2022-12-19 18:09:25', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '3', 0, 'app_login_log', '2022-12-17 00:00:00', '2022-12-18 00:00:00', '2022-12-19 18:09:25', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '37', 0, 'app_login_log', '2022-12-19 00:00:00', '2022-12-20 00:00:00', '2022-12-20 11:22:02', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '20', 0, 'app_login_log', '2022-12-20 00:00:00', '2022-12-21 00:00:00', '2022-12-21 10:41:24', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '19', 1, 'app_login_log', '2022-12-21 00:00:00', '2022-12-21 15:19:00', '2022-12-21 15:46:27', '登录用户数');

CREATE TABLE `flink_merge` (
  `indicator_name` varchar(255) DEFAULT NULL COMMENT '指标名称',
  `indicator_value` varchar(255) DEFAULT NULL COMMENT '指标值',
  `indicator_code` int NOT NULL COMMENT '指标编码',
  `table_name` varchar(255) NOT NULL COMMENT '指标计算上游表名',
  `window_start` datetime NOT NULL COMMENT '窗口开始时间',
  `window_end` datetime DEFAULT NULL COMMENT '窗口截止时间',
  `create_time` datetime DEFAULT NULL COMMENT '创建更新时间',
  `indicator_description` varchar(255) DEFAULT NULL COMMENT '指标描述',
  PRIMARY KEY (`indicator_code`,`table_name`,`window_start`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

所需要的依赖jar:

  1. mysql 的驱动请自行下载
  2. flink-sql 的连接器
    https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/ 在这里下载flinksql 连接器
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc_2.11</artifactId>
  <version>1.13.6</version>
</dependency>

  1. flink-cdc 依赖
    https://ververica.github.io/flink-cdc-connectors/release-2.3/content/connectors/mysql-cdc%28ZH%29.html#a-name-id-002-a 在这里下载
    在这里插入图片描述

下载后的jar统一放在flink安装目录下的lib目录下即可。

运行程序

package com.test.demo.table.sql;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class flinkcdc {
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
//                .inBatchMode()
                .build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);
        // 'table-name' = 'flink.*' 意思是读取tablename以flink开头的所有的表
        tableEnv.executeSql("CREATE TABLE `source_table`\n" +
                "(\n" +
                "    `indicator_name`        STRING,\n" +
                "    `indicator_value`       STRING,\n" +
                "    `indicator_code`        INT,\n" +
                "    `table_name`            STRING,\n" +
                "    `window_start`          TIMESTAMP(0),\n" +
                "    `window_end`            TIMESTAMP(0),\n" +
                "    `create_time`           TIMESTAMP,\n" +
                "    `indicator_description` STRING,\n" +
                "    PRIMARY KEY (`indicator_code`, `table_name`, `window_start`) NOT ENFORCED\n" +
                ") WITH (\n" +
                "   'connector' = 'mysql-cdc',\n" +
                "   'hostname' = '172.18.3.135',\n" +
                "   'scan.startup.mode' = 'initial',\n" +
                "   'port' = '3306',\n" +
                "   'username' = 'root',\n" +
                "   'password' = '123456',\n" +
                "   'database-name' = 'test',\n" +
                "   'table-name' = 'flink.*'\n" +
                ")");

//        tableEnv.sqlQuery("select * from MyTable").execute().print();
//查询的时候定义event_time窗口
       tableEnv.executeSql("CREATE TABLE `flink_merge`\n" +
               "(\n" +
               "    `indicator_name`        STRING,\n" +
               "    `indicator_value`       STRING,\n" +
               "    `indicator_code`        INT,\n" +
               "    `table_name`            STRING,\n" +
               "    `window_start`          TIMESTAMP(0),\n" +
               "    `window_end`            TIMESTAMP(0),\n" +
               "    `create_time`           TIMESTAMP,\n" +
               "    `indicator_description` STRING,\n" +
               "    PRIMARY KEY (`indicator_code`, `table_name`, `window_start`) NOT ENFORCED\n" +
               ") WITH (\n" +
               "      'connector' = 'jdbc',\n" +
               "      'url' = 'jdbc:mysql://172.18.3.135:3306/test',\n" +
               "      'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
               "      'username' = 'root',\n" +
               "      'password' = '123456',\n" +
               "      'table-name' = 'flink_merge'\n" +
               ")");//直接sql查询



       tableEnv.executeSql("insert into flink_merge select * from source_table");




    }
}

总结

按照上面的步骤就可以进行实时同步了, 如果你要在生产环境用建议配置上savepoint 和checkpoint, 这样可以达到断点续传的功能。 文件比较简短适合有一定flink基础的人快速开发,如果你对flink还不是很了解建议先去学下flink相关的知识,再来进行cdc的实验。 flink cdc可以说是以后数据同步的主流,和其他方式相比架构比较简单,而且通过参数控制是否是全量同步,十分友好。
多说一句,目前对flinksql我们公司已不用写代码进行开发了,而是用的streamx框架,streamx框架可以很方便配置savepoint/chekpoints, 以及启动参数,而且可以在web页面启动flinksql 不需要在控制台写一堆参数提交到yarn上,很方便。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/106117.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2-2-3-7、FutureCompletableFuture详解

Runnable 线程的任务接口&#xff0c;用于定义被执行任务方法的抽象&#xff0c;是函数式接口&#xff08;仅存在一个需要实现方法的接口&#xff09;&#xff0c;其方法为run方法通过对并发编程中java线程的了解&#xff0c;Thread调用start方法&#xff0c;最后操作系统会通过…

Confluence 调整会话超时(session timeout)

文章目录前言一、概括二、实际场景应用1.更改空闲超时2.更改记住我 cookie 的生命周期3.在用户通过身份验证后的某个时间强制注销用户总结前言 在 Confluence 中有两个会话 Cookie&#xff1a; JSESSIONID: 由 Tomcat 使用和管理。 默认情况下&#xff0c;这被视为会话 cooki…

类与对象(中)

类与对象类的6个默认成员函数构造函数概念特性析构函数概念特性拷贝构造函数概念特性赋值运算符重载运算符重载赋值运算符重载前置 后置 重载const成员函数取地址及const取地址操作符重载类的6个默认成员函数 当类中没有任何成员时&#xff0c;称作空类 但是呢&#xff0c;编译…

Docker使用(容器、镜像相关命令)

虚拟化 在计算机中&#xff0c;虚拟化&#xff08;英语&#xff1a;Virtualization&#xff09;是一种资源管理技术&#xff0c;是将计算机的各种实体资源&#xff0c;如服务器、网络、内存及存储等&#xff0c;予以抽象、转换后呈现出来&#xff0c;打破实体结构间的不可切割…

虹科方案|将以太网连接添加到Dell EMC PowerVault™ ML3 SAS库

一、Dell EMC 和 ATTO 磁带解决方案 Dell EMC 和 ATTO 提供了业界唯一的商用解决方案&#xff0c;可将高速以太网连接添加 到标准 SAS LTO 磁带驱动器。ATTO XstreamCORE ET 8200 智能网桥允许您使用 iSCSI 和 iSER 协议通过标准以太网远程连接到 SAS 磁带驱动器。当与采用最新…

花 2 个月备战字节跳动Java岗,3 轮面试拿下 60W Offer

最近收到一位刚入职字节的 Java 工程师朋友投稿——以下内容来自其亲身经历&#xff0c;某双非硕士拿到 字节 60W offer &#xff0c;感谢他的走心分享&#xff08;文末附硬货笔记&#xff09; PART1&#xff1a;个人情况简介 菜 J 一枚&#xff0c;本硕都是计算机&#xff08…

[论文阅读] SqueezeSeg V1

文章目录1. 主要思想2. 具体方法2.1 数据处理方式2.2 网络架构3. 实验支撑4. 总结启示5. 相关文献paper 原论文的链接 code: 源代码链接 paper全称&#xff1a;SqueezeSeg: Convolutional Neural Nets with Recurrent CRF for Real-Time Road-Object Segmentation from 3D LiDA…

【02】FreeRTOS获取10.4.6源码+移植到STM32F407步骤

目录 1.获取FreeRTOS源码 1.1 FreeRTOS官网下载步骤 1.2FreeRTOS源码内容 1.3FreeRTOS内核文件 1.3.1Demo文件夹 1.3.2Source文件夹 2.FreeRTOS移植 2.1添加FreeRTOS源码 2.1.1复制FreeRTOS源码 2.1.2将文件添加到工程 2.1.3添加头文件路径 2.2添加FreeRTOS.h 2.3修改SYS…

vpp process类型节点调度过程

vpp节点类型 VLIB_NODE_TYPE_PROCESS&#xff1a;process类型节点可以被挂起也可以被恢复&#xff0c;main线程上调度 &#xff08;免费订阅,永久学习&#xff09;学习地址: Dpdk/网络协议栈/vpp/OvS/DDos/NFV/虚拟化/高性能专家-学习视频教程-腾讯课堂 process节点注册 pro…

【MC】新加载器 Quilt 好用吗?和 Fabric 相比好在哪?

在今年四月 (2022/4/20) &#xff0c;一个船新加载器 Quilt 发布了第一个测试版。 Quilt officially entered its first beta today, attracting an influx of new users and an amazing amount of support and positive feedback. By the end of the day, Quilt was happily l…

Go语言设计与实现 -- 字符串

Go语言的字符串与Java和python是一样的。具有不可变性。是一个只读的字节数组&#xff0c;如图所示。 因为Go的字符串具有不可变性&#xff0c;所以我们只能通过string和[]byte类型之间反复转换实现修改。 将这一段内存复制到栈上将变量的类型转换成[]byte后并修改字节数据将修…

功能上新|使用 Excel 低门槛进行指标分析!

Kyligence Zen 功能上新啦&#xff01;用户不仅可以在 Kyligence Zen 中定义、分析和管理指标&#xff0c;还可直接使用 Excel 插件来分析 Kyligence Zen 中已经定义好的指标&#xff0c;学习无门槛&#xff0c;上手更轻松&#xff01;欢迎访问 http://zen.kyligence.io 申请免…

实验二A 图像的空域(源代码一站式复制粘贴)

实验二A 图像的空域一、实验目的二、实验原理三、实验内容与要求四、实验的具体实现一、实验目的 1.掌握图像滤波的基本定义及目的。 2.理解空间域滤波的基本原理及方法。 3.掌握进行图像的空域滤波的方法。 二、实验原理 1.空域增强 空域滤波是在图像空间中借助模板对图像进…

阳哥JUC并发编程之AQS后篇全网最详细源码笔记

文章目录AQS后序课程笔记AQS源码ReentryLock锁的原理分析公平锁以及非公平锁源码详解Aquire方法调用原码流程分析第一步、tryAquire第二步、addwrite第三步&#xff1a;aquireQueuedAQS释放锁的过程第一步、释放锁第二步进入aquireQueueAQS异常情况下走Cancel流程分析第一种队尾…

ECharts项目实战:全球GDP数据可视化

【课程简介】 可视化是前端里一个几乎可以不用写网页&#xff0c;但又发展得非常好的方向。在互联网产品里&#xff0c;无论是C端中常见的双十一购物节可视化大屏&#xff0c;还是B端的企业中后台管理系统都离不开可视化。国家大力推动的智慧城市、智慧社区中也有很多可视化的…

对于DDoS攻击防御有哪些误区?

​  DDoS攻击是属于常见网络攻击之一&#xff0c;也是一种较难防御的网络攻击。它的特点就是易于启动、难防御、有害、难跟踪等。因此DDoS攻击也是众多站长最怕遇见的网络攻击。那么大家在使用海外服务器时&#xff0c;会有哪些DDoS攻击防御的误区呢? 1、防御全部DDoS攻击 防…

CSS -- 09. 移动WEB开发之flex布局

文章目录移动WEB开发之flex布局1 flex布局原理2 常见的父项属性2.1 设置主轴方向 flex-direction2.2 设置主轴上的子元素排列方式 justify-content2.3 设置元素是否换行 flex-wrap2.4 设置侧轴上的子元素的排列方式&#xff08;单行&#xff09; align-items2.5 设置侧轴上的子…

【矩阵论】6.范数理论——基本概念——矩阵范数生成向量范数谱范不等式

6.1.3 矩阵范数产生向量范数 CnnC^{n\times n}Cnn 上任一矩阵范数 ∥∙∥\Vert \bullet\Vert∥∙∥ 都产生一个向量范数 φ(X)∥X∥V\varphi(X)\Vert X\Vert_Vφ(X)∥X∥V​ 矩阵范数与向量范数的相容性&#xff1a;φ(Ax)≤∥A∥φ(x)\varphi(Ax)\le \Vert A\Vert\varphi(x)φ…

MySQL SSL安全解读

安全一直是不可不重视的问题。目前MySQL这方面应大方向上技术手段都具备。如&#xff1a;网络链接&#xff0c;权限控制&#xff0c;key秘钥认证&#xff0c;数据加密脱敏 等方式。综合考虑&#xff0c;虽然很多环境无法所有这些安全策略全部应用上&#xff0c;但在可控范围内尽…

【C语言数据结构(基础版)】第五站:树和二叉树

目录 一、树的概念及结构 1.树的概念 2.树的表示 3.树在实际中的应用 二、二叉树概念及结构 1.概念 2.特殊的二叉树 3.二叉树的性质 4.二叉树的存储结构 &#xff08;1&#xff09;顺序存储 &#xff08;2&#xff09;链式存储 三、二叉树链式结构的实现 1.二叉树的…