Flink CDC数据同步

news2025/1/18 20:25:25

背景

随着信息化程度的不断提高,企业内部系统的数量和复杂度不断增加,因此,数据库系统的同步问题已成为越来越重要的问题。

缓存失效

在缓存中缓存的条目(entry)在源头被更改或者被删除的时候立即让缓存中的条目失效。如果缓存在一个独立的进程中运行(例如Redis,Memcache),那么简单的缓存失效逻辑可以放在独立的进程或服务中,从而简化主应用的逻辑。在一些场景中,缓存失效逻辑可以更复杂一点,让它利用更改事件中的更新数据去更新缓存中受影响的条目。

简化单体应用

许多应用更新数据库,然后在数据库中的更改被提交后,做一些额外的工作:更新搜索索引,更新缓存,发送通知,运行业务逻辑,等等。这种情况通常称为双写(dual-writes),因为应用没有在一个事务内写多个系统。这样不仅应用逻辑复杂难以维护,而且双写容易丢失数据或者在一些系统更新成功而另一些系统没有更新成功的时候造成不同系统之间的状态不一致。使用捕获更改数据技术(change data capture,CDC),在源数据库的数据更改提交后,这些额外的工作可以被放在独立的线程或者进程(服务)中完成。这种实现方式的容错性更好,不会丢失事件,容易扩展,并且更容易支持升级。

共享数据库

当多个应用共用同一个数据库的时候,一个应用提交的更改通常要被另一个应用感知到。一种实现方式是使用消息总线,尽管非事务性的消息总线总会受上面提到的双写影响。但是,另一种实现方式,变得很直接:每个应用可以直接监控数据库的更改,并且响应更改。

数据集成

数据通常被存储在多个地方,尤其是当数据被用于不同的目的的时候,会有不同的形式。保持多系统的同步是很有挑战性的,但是可以通过使用数据同步工具加上简单的事件处理逻辑来实现简单的ETL类型的解决方案。

命令查询职责分离

在命令查询职责分离 [Command Query Responsibility Separation (CQRS)]架构模式中,更新数据使用了一种数据模型,读数据使用了一种或者多种数据模型。由于数据更改被记录在更新侧(update-side),这些更改将被处理以更新各种读展示。所以CQRS应用通常更复杂,尤其是他们需要保证可靠性和全序(totally-ordered)处理。Debezium和CDC可以使这种方式更可行:写操作被正常记录,但是Debezium捕获数据更改,并且持久化到全序流里,然后供那些需要异步更新只读视图的服务消费。写侧(write-side)表可以表示面向领域的实体(domain-oriented entities),或者当CQRS和 Event Sourcing 结合的时候,写侧表仅仅用做追加操作命令事件的日志。

Flink CDC

CDC Connectors for Apache Flink 是Apache Flink的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink ®的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。

image-20230827094534219

数据抓取

FlinkCDC 使用 MySQL 的 binlog 技术进行数据抓取。binlog 是 MySQL 用于记录数据库变更操作的日志,包括对表的增删改操作。FlinkCDC 通过对 binlog 进行解析和读取,得到最新的增量数据,并将其转换为 Flink 支持的数据格式,如 Avro 或 JSON。

如下代码可以帮我们监听数据库的变更日志:

JdbcIncrementalSource<String> oracleChangeEventSource =
                new OracleSourceBuilder()
                        .hostname("host")
                        .port(1521)
                        .databaseList("XE")
                        .schemaList("DEBEZIUM")
                        .tableList("DEBEZIUM.PRODUCTS")
                        .username("username")
                        .password("password")
                        .deserializer(new JsonDebeziumDeserializationSchema())
                        .includeSchemaChanges(true) // output the schema changes as well
                        .startupOptions(StartupOptions.initial())
                        .debeziumProperties(debeziumProperties)
                        .splitSize(2)
                        .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // enable checkpoint
        env.enableCheckpointing(3000L);
        // set the source parallelism to 4
        env.fromSource(
                        oracleChangeEventSource,
                        WatermarkStrategy.noWatermarks(),
                        "OracleParallelSource")
                .setParallelism(4)
                .print()
                .setParallelism(1);
        env.execute("Print Oracle Snapshot + RedoLog");

数据同步

FlinkCDC 将抓取到的增量数据同步到 Flink 或者其他的计算引擎中进行处理。同步方式有两种:

pull 模式:FlinkCDC 在启动时会向 MySQL 中的某个位置开始读取 binlog,然后通过一个 HTTP 接口将增量数据暴露给 Flink。Flink 每隔一段时间就会调用该接口拉取增量数据。

push 模式:FlinkCDC 将增量数据通过一个 Kafka Topic 推送给 Flink。Flink 在消费 Kafka Topic 时,就可以直接消费到增量数据。

监听到数据变动,能拿到变更前后的数据对比,经过Sink数据转换成相应的INSERT、UPDATE、DELETE等相关SQL语句,并同步到目标数据库。

public class CustomSink extends RichSinkFunction<String> {

    @Override
    public void invoke(String value, Context context) throws Exception {

        System.out.println("监听到活动数据:" + LocalDateTime.now() + value);

        JSONObject jsonObject = JSONObject.parseObject(value);

        User before = jsonObject.getObject("before", User.class);
        User after = jsonObject.getObject("after", User.class);

        try {
            String table = jsonObject.getJSONObject("source").getString("table");

            SqlParse sqlParse = new SqlParse();
            String executeSQL = "";
            if(before == null){
                // 插入
                executeSQL = sqlParse.getInsert(after,table);
            }else if(after == null){
                // 删除
                executeSQL = sqlParse.getDeleteSQL(before,table);
            }
            else{
                // 更新
                executeSQL = sqlParse.getUpdateSQL(before,after,table);
            }
            SpringJDBC.executeSQL(executeSQL);
        }catch (Exception e){
            System.out.println("执行错误");
        }

    }

}

通用形SqlParse只能解析同构数据,异构数据需要单独处理。

增量数据的解析和处理

FlinkCDC 将抓取到的增量数据转换为 Flink 支持的数据格式后,交由 Flink 进行进一步的处理。Flink 可以对数据进行各种运算,如聚合、过滤、变换等,最终将处理结果输出到其他的存储介质中。

总的来说,FlinkCDC 的原理就是通过解析 MySQL 中的 binlog,抓取到最新的增量数据,并将其转换为 Flink 支持的数据格式,然后将增量数据同步到 Flink 或者其他的计算引擎中进行处理。通过 Flink 的强大计算能力,可以对增量数据进行各种计算,从而实现实时数据处理和分析的功能。

优缺点比较

优点:

  • 能监听多种数据源:MySQL、Oracle、PgSQL等;
  • 支持流式处理,可以实现数据的实时处理和分析;
  • 支持增量更新,可以实现数据的实时同步;
  • 支持容错处理,可以实现数据的高可靠性;

缺点:

  • 对Oracle支持不太友好,需要将开启归档日志,并且部分字段解析需要了解其语义;
  • 对于大表的查询性能较差;
  • 对于大规模数据的处理效率较低;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/934569.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

“返璞归真,数字排毒”,放下智能手机,美国功能手机卷土重来

近年来&#xff0c;智能手机的普及已经改变了人们的生活方式和沟通方式。然而&#xff0c;随着科技的不断进步和不断涌现的各种新应用程序&#xff0c;一些年轻人开始感到疲惫和厌倦。他们觉得智能手机带来了太多的干扰和依赖&#xff0c;也让人们容易沉迷于社交媒体和短视频。…

Rabbitmq的Federation Exchange

(broker 北京 ) &#xff0c; (broker 深圳 ) 彼此之间相距甚远&#xff0c;网络延迟是一个不得不面对的问题。有一个在北京的业务(Client 北京 ) 需要连接 (broker 北京 ) &#xff0c;向其中的交换器 exchangeA 发送消息&#xff0c;此时的网络延迟很小&#xff0c;(C…

全球边缘计算大会的十大至暗时刻

来源网友X小缘 ① 背景板文字全球边缘计算大会&#xff0c;被广告公司改为全球边缘计算机大会&#xff0c;因为他觉得少了个机字&#xff1b; ② 明天开会&#xff0c;今天遇到恶劣天气&#xff0c;讲师主持人一整晚滞留外地机场&#xff1b; ③ 视频直播的时候声音通道没开&am…

Redis数据结构全解析【超详细万字分析】

文章目录 前言一、SDS1、SDS的必要性2、SDS的数据结构3、SDS的优势O&#xff08;1&#xff09;复杂度获取字符串长度二进制安全不会发生缓冲区溢出节省空间 二、链表1、结构设计2、优缺点 三、压缩列表1、结构设计2、连续更新3、压缩列表的缺陷 四、哈希表1、结构设计2、哈希冲…

236. 二叉树的最近公共祖先-优化

本期我们对该题进行优化&#xff0c;不知道题目的小伙伴建议先看看之前的 236. 二叉树的最近公共祖先_KLZUQ的博客-CSDN博客 我们要将时间复杂度优化为O(N) class Solution { public:bool FindPath(TreeNode* root, TreeNode* x,stack<TreeNode*>& path){if(rootnul…

Kubernetes(K8s)基本环境部署

此处只做学习使用&#xff0c;配置单master环境。 一、环境准备 1、ip主机规划&#xff08;准备五台新机&#xff09;>修改各个节点的主机名 注意&#xff1a;关闭防火墙与selinux 节点主机名ip身份joshua1 kubernetes-master.openlab.cn 192.168.134.151masterjoshua2k…

无涯教程-分类算法 - 朴素贝叶斯

朴素贝叶斯算法是一种基于应用贝叶斯定理的分类技术&#xff0c;其中强烈假设所有预测变量彼​​此独立。简而言之&#xff0c;假设是某个类中某个要素的存在独立于同一类中其他任何要素的存在。 在贝叶斯分类中&#xff0c;主要的兴趣是找到后验概率&#xff0c;即给定某些观…

抽象类和接口有什么区别?

在 Java 中&#xff0c;抽象类和接口是两种不同的类类型。它们都不能直接实例化&#xff0c;并且它们都是用来定义一些基本的属性和方法的&#xff0c;但它们有以下几点不同&#xff1a; 定义不同&#xff1a;定义的关键字不同&#xff0c;抽象类是 abstract&#xff0c;而接口…

Linux操作系统--包管理yum

1.概述 YUM(全称为 Yellow dog Updater, Modified)是一个在 Fedora 和 RedHat 以及 CentOS中的 Shell 前端软件包管理器。基于 RPM 包管理,能够从指定的服务器自动下载 RPM 包并且安装,可以自动处理依赖性关系,并且一次安装所有依赖的软件包,无须繁琐地一次次下载、安装。…

软考:中级软件设计师:数据库并发控制,完整性约束,数据库安全

软考&#xff1a;中级软件设计师:数据库并发控制 提示&#xff1a;系列被面试官问的问题&#xff0c;我自己当时不会&#xff0c;所以下来自己复盘一下&#xff0c;认真学习和总结&#xff0c;以应对未来更多的可能性 关于互联网大厂的笔试面试&#xff0c;都是需要细心准备的…

CrossOver 23 新功能介绍 CrossOver 23 版本更新了哪些功能

本次发布的CrossOver 23为用户带来了许多令人期待的新功能和优化&#xff0c;特别是对游戏方面的支持&#xff0c;更是让广大Mac游戏玩家兴奋。CrossOver 23包括对Wine 8.0.1的更新&#xff0c;带来了5000多处改动&#xff0c;对各种应用程序进行了改进。该版本还包括 Wine Mon…

AD9361配置采用纯PL方式,QT编写的小软件可以快速实现

采用ADI官方的API函数&#xff0c;虽然能够快速的实现AD9361配置&#xff0c;让我们不必关注9361的内部寄存器的配置过程&#xff0c;但是在实际的项目开发过程中&#xff0c;也在一定程度上限制了AD9361与PL之间数据交互的灵活性。 今天给大家推荐采用AD9361官方提供的配置软…

Linux操作系统--克隆虚拟机

1.概述 我们在搭建大数据或者是集群的过程中,需要使用到许多配置相同或者相类似的环境。这一个时候就需要使用到克隆虚拟机的功能。 2.克隆虚拟机过程 (1).从现有虚拟机(关机状态)克隆出新虚拟机,右键选择管理=>克隆,如下所示 (2).直接点击下一步。如下所示 (3).选择…

什么是浮动(float)?请解释清楚浮动对父元素和兄弟元素的影响。

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 浮动&#xff08;Float&#xff09;⭐ 浮动对父元素的影响⭐ 浮动对兄弟元素的影响⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入…

Centos8的NAT网络设置

1、先将虚拟机设置为NAT模式 2、打开虚拟网络编辑器&#xff0c;记录以下信息 NAT设置&#xff1a;子网掩码、网关 DHCP设置&#xff1a;I P 范围 (自动时) 3、进入Centos8的网络设置页面&#xff0c;按照记录的信息进行配置 4、重载、重启网卡 nmcli c reload ensl60 n…

什么是字体图标(Icon Font)?如何在网页中使用字体图标?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 字体图标&#xff08;Icon Font&#xff09;⭐ 如何在网页中使用字体图标⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&a…

Linux操作系统--常用指令(系统定时任务)

(1).概述 在实际的运维过程中,这一个功能使用的还是很多,比如,我们需要在定点的时候进行系统的备份、清理、存盘等工作。这一个时候就需要使用系统定时任务。 (2).查看crontab是否开启 (3). crontab 定时任务设置 功能:设置定时的任务 语法: crontab [选项] 选项: -e 编…

《向量数据库指南》——什么叫“AI 向量数据库”,它和我们日常理解的数据库有什么不同?

我认为"AI 向量数据库"这个概念非常切合实际,它类似于关系数据库在交易领域的作用。个人观点是,向量数据库实际上是为了人工智能而生的。一方面,向量数据库的数据完全源自于人工智能技术。另一方面,对于 AI 应用而言,向量数据库也是至关重要的基础设施。 至于…

【CSS】CSS 背景设置 ( 背景半透明设置 )

一、背景半透明设置 1、语法说明 背景半透明设置 可以 使用 rgba 颜色值设置半透明背景 ; 下面的 CSS 样式中 , 就是 设置黑色背景 , 透明度为 20% ; background: rgba(0, 0, 0, 0.2);颜色的透明度 alpha 取值范围是 0 ~ 1 之间 , 在使用时 , 可以 省略 0.x 前面的 0 , 直接…

美国慌了,满世界找稀土替代却找不到,最终还是得求中国

中国先后对稀土、镓、锗等稀有金属材料的出口采取限制措施&#xff0c;美国一开始并不慌&#xff0c;毕竟全球还有美国自己、澳大利亚、蒙古等国家拥有稀土矿藏&#xff0c;因此美国以为可以迅速找到替代&#xff0c;然而如今却发现事情并不简单。 中国占有的稀土矿藏确实不算最…