实时数据集成的完美搭档:CDC技术与Kafka集成的解决方案

news2024/7/6 6:04:30

目录

一、实时数据同步

二、可靠的数据传输

三、灵活的数据处理

四、解耦数据系统

五、主流免费CDC工具介绍

六、Flink CDC安装使用步骤:

七、ETL CDC安装使用步骤

八、写在最后


一、实时数据同步

随着企业数据不断增长,如何高效地捕获、同步和处理数据成为了业务发展的关键。在这个数字化时代,CDC技术与Kafka集成为企业提供了一种无缝的数据管理方案,为数据的流动和实时处理打开了全新的大门。

CDC技术与Kafka集成能够实现快速、可靠的实时数据同步。CDC技术可以捕获数据库事务日志中的数据变更,并将其转化为可靠的数据流。这些数据流通过Kafka的高吞吐量消息队列进行传输,确保数据的实时性和一致性。无论是从源数据库到目标数据库的同步,还是跨不同数据存储系统的数据传输,CDC技术与Kafka集成提供了高效且无缝的解决方案。

二、可靠的数据传输

Kafka作为一个分布式、可扩展的消息队列系统,提供了高度可靠的数据传输机制。借助Kafka的持久性存储和数据复制机制,数据不会丢失或损坏。即使在高并发的情况下,Kafka集成能够保证数据的完整性和可靠性。这为企业提供了强大的数据传输基础,确保数据在各个环节的安全传输。

三、灵活的数据处理

CDC技术与Kafka集成不仅提供了实时数据同步,还为企业提供了灵活的数据处理能力。Kafka的消息队列和流处理特性使得企业可以在数据传输的同时进行实时的数据处理和分析。借助Kafka的消费者应用程序,企业可以对数据流进行转换、过滤、聚合等操作,实现实时数据的清洗、加工和分析。这种实时数据处理能力为企业提供了即时的洞察力,帮助其做出快速而准确的决策。

四、解耦数据系统

CDC技术与Kafka集成还能帮助企业解耦数据系统。通过将CDC技术与Kafka作为中间层,不同的数据源和目标系统可以独立操作,彼此之间解除了紧耦合的依赖关系。这种解耦带来了极大的灵活性,使得企业能够更加容易地添加、移除或升级数据源和目标系统,而无需对整个数据流程进行重构。

CDC技术与Kafka集成为企业带来了数据管理的全新体验。它提供了高效、可靠的数据同步和实时处理,帮助企业实现数据驱动的成功。无论是数据同步、实时处理还是数据系统的解耦,CDC技术与Kafka集成为企业提供了强大而灵活的解决方案。

五、主流免费CDC工具介绍

介绍两款能够快速且免费实现CDC技术与Kafka集成的主流工具:Flink CDC和ETLCloud CDC。

测试前的环境准备:JDK8以上、Mysql数据库(开启BinLog日志)、kafka

六、Flink CDC安装使用步骤:

下载安装包

Flink官网,下载1.13.3版本安装包 flink-1.13.3-bin-scala_2.11.tgz。(Flink1.13.3支持flink cdc2.x版本,为兼容flink cdc)

解压

在服务器上创建安装目录/home/flink,将 flink 安装包放在该目录下,并执行解压命令,解压至当前目录。tar -zxvf flink-1.13.3-bin-scala_2.11.tgz

启动

进入解压后的flink/lib目录,上传mysql和sql-connector驱动包。

 进入flink/bin目录,执行启动命令:./start-cluster.sh

 进入Flink可视化界面查看http://ip:8081

 测试

下面我们来新建一个maven工程做CDC数据监听的同步测试。

POM依赖

<!--    Flink CDC  -->
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.49</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.75</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.12.0</version>
</dependency>

新建Flink_CDC2Kafka类
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class Flink_CDC2Kafka {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //1.1 设置 CK&状态后端
        //略
        //2.通过 FlinkCDC 构建 SourceFunction 并读取数据
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("ip")  //数据库IP
                .port(3306) //数据库端口
                .username("admin")  //数据库用户名
                .password("pass")   //数据库密码
                .databaseList("test")   //这个注释,就是多库同步
                .tableList("test.admin") //这个注释,就是多表同步
                .deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式
//                .deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
                .startupOptions(StartupOptions.latest())
                .build();
        DataStreamSource<String> streamSource = env.addSource(sourceFunction);
        //3.打印数据并将数据写入 Kafka
        streamSource.print();
        String sinkTopic = "test";
        streamSource.addSink(getKafkaProducer("ip:9092",sinkTopic));
        //4.启动任务
        env.execute("FlinkCDC");
    }
    //kafka 生产者
    public static FlinkKafkaProducer<String> getKafkaProducer(String brokers,String topic) {
        return new FlinkKafkaProducer<String>(brokers,topic,
                new SimpleStringSchema());
    }
}

自定义序列化类

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.ArrayList;
import java.util.List;
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        //1.创建 JSON 对象用于存储最终数据
        JSONObject result = new JSONObject();
        //2.获取库名&表名放入 source
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];
        JSONObject source = new JSONObject();
        source.put("database",database);
        source.put("table",tableName);
        Struct value = (Struct) sourceRecord.value();
        //3.获取"before"数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }
        //4.获取"after"数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }
        //5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("insert".equals(type)) {
            type = "c";
        }
        if ("update".equals(type)) {
            type = "u";
        }
        if ("delete".equals(type)) {
            type = "d";
        }
        if ("create".equals(type)) {
            type = "c";
        }
        //6.将字段写入 JSON 对象
        result.put("source", source);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("op", type);
        //7.输出数据
        collector.collect(result.toJSONString());
    }
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

开启CDC监听

 Mysql中新增一条人员数据

 控制台捕获到增量数据

 增量数据也成功推送到kafka中

至此通过Flink CDC监听数据库增量数据推送到kafka的过程已经完成,可以看到整个过程需要一些编码能力,对于业务人员的使用比较痛苦。

下面我们来介绍ETLCloud这款产品如何通过可视化配置,快速实现上述的场景内容。

七、ETL CDC安装使用步骤

下载安装包

ETLCloud提供了一键快捷部署包,只需运行启动脚本即可完成安装产品部署。部署包下载可以登录ETLCloud官网自行下载。

安装

官网下载linux一键部署包,把一键部署包放到一个目录下解压并进入该目录。

对脚本文件进行赋权

chmod +x restcloud_install.sh

执行脚本

./restcloud_install.sh

 等待tomcat启动,当出现这个界面,则restcloud证明启动成功

数据源配置

新增MySql数据源信息

新增Kafka数据源信息

测试数据源

 监听器配置

新增数据库监听器

 监听器配置

 接收端配置(数据传输类型选择kafka)

 高级配置(默认参数)

 启动监听

 监听成功

 测试

打开Navicat可视化工具新增并修改一条人员信息

 实时数据中可以动态捕捉实时传输数据

 Kafka中查看新增数据

 Kafka中查看修改数据

 八、写在最后

上面我们通过两个CDC工具均实现了实时数据同步到kafka的功能,但通过对比Flink CDC和ETLCloud CDC,可以看出ETLCloud CDC提供了可视化配置的方式,使配置过程更加简单快捷,不需要编码能力。而Flink CDC则需要进行编码,对于业务人员可能会有一定的学习成本。

无论选择哪种工具,都可以实现CDC技术与Kafka集成,实时捕获数据库的增量数据变化,提供了方便和高效的数据同步和传输方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/767411.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何将视频音频提取出来?这几个方法真的实用!

在视频制作和后期处理中&#xff0c;有时我们需要将视频中的音频提取出来&#xff0c;以便单独处理&#xff0c;如剪辑、增强声音等&#xff0c;而不影响视频内容。为了帮助您实现这一目标&#xff0c;下面将介绍几种常用的方法来提取视频音频。 方法一&#xff1a;记灵在线工…

SSH服务(二十六)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、概述 二、特点 三、身份验证机制 四、验证过程 五、加密机制 六、基本参数 ​七、 身份验证机制 八、基本操作 1. ssh 2. scp 3. sftp 4. 密钥对验证 总结 前…

vue或react当中canvas实现电子签名组件和使用canvas进行图片压缩

<template><div><h1>vue3</h1><canvas id"canvasWrite"> 浏览器不支持Canvas,请升级浏览器 </canvas><div><button class"submit" click"submitWrite">提交签名</button><button clas…

DOM学习

1.DOM 简介 1.1 什么是 DOM 文档对象模型&#xff08;Document Object Model&#xff0c;简称 DOM&#xff09;&#xff0c;是 W3C 组织推荐的处理可扩展标记语言&#xff08;HTML 或者XML&#xff09;的标准编程接口。 W3C 已经定义了一系列的 DOM 接口&#xff0c;通过这些…

Spring项目创建与Bean的存储与读取(DL)

文章目录 一. Spring项目的创建1. 创建Maven项目2. 配置国内源3. 添加spring依赖4. 创建启动类 二.储存或读取Bean对象1. 添加spring配置文件2. 创建Bean对象3. 读取Bean对象 一. Spring项目的创建 1. 创建Maven项目 第一步&#xff0c;创建 Maven 项目&#xff0c;Spring 也…

LAZYSYSADMIN1靶机详解

LAZYSYSADMIN1靶机复盘 一个五分钟就能结束的靶机&#xff0c;非常快。 下载地址&#xff1a;https://download.vulnhub.com/lazysysadmin/Lazysysadmin.zip nmap对这个ip进行单独扫描后发现开启的服务挺多。 就直接枚举了一下这个服务器的内容&#xff0c;发现了一个用户名…

试验数据管理平台解决方案-MDM

试验是汽车研发、生产制造和售后保障过程中必不可少的重要环节。试验员曾到过寒区、热区、高原、沙漠和山路等恶劣环境下的场地来对试验车辆进行相关试验。这是因为汽车在不同的道路、地理和气候条件下行驶时&#xff0c;其性能、效率、可靠性和耐久性等技术特性也会发生变化。…

2023四大进销存软件推荐,第一款适合中小商户使用!

在当今竞争激烈的商业环境中&#xff0c;企业或商户迫切需要使用进销存管理软件&#xff0c;来优化库存控制、提高运营效率、降低成本&#xff0c;并加强与供应链合作伙伴之间的协作和沟通。 为了帮助企业选择适合自己需求的进销存软件&#xff0c;我们特别整理了2023年市面上比…

选择适合的接口测试工具,助力高效在线接口测试

选择适合的接口测试工具&#xff0c;助力高效在线接口测试 接口测试工具在软件开发中扮演着重要的角色。本文将重点介绍不同类型的接口测试工具&#xff0c;特别是在线接口测试工具&#xff0c;探讨其优势和适用场景&#xff0c;帮助您选择适合的工具&#xff0c;提高接口测试…

4、深入理解ribbon

一、负载均衡的两种方式 服务器端负载均衡 传统的方式前端发送请求会到我们的的nginx上去&#xff0c;nginx作为反向代理&#xff0c;然后路由给后端的服务器&#xff0c;由于负载均衡算法是nginx提供的&#xff0c;而nginx是部署到服务器端的&#xff0c;所以这种方式又被称为…

【Linux】多线程(二)

文章目录 生产者消费者模型为何要使用生产者消费者模型生产者消费者模型优点基于BlockingQueue的生产者消费者模型条件变量条件变量代码 POSIX信号量基于环形队列的生产消费模型 生产者消费者模型 为何要使用生产者消费者模型 生产者消费者模式就是通过一个容器来解决生产者和…

qt与opencv学习记录

qtopencv开发入门&#xff1a;4步搞定环境配置-1_哔哩哔哩_bilibili qtopencv开发入门&#xff1a;4步搞定opencv环境配置2_哔哩哔哩_bilibili 文章内容来自上面两个视频&#xff0c;感谢创作者。 ps&#xff1a;配置环境的过程中&#xff0c;遇到了很多问题&#xff0c;我…

UML 图

统一建模语言&#xff08;Unified Modeling Language&#xff0c;UML&#xff09;是用来设计软件的可视化建模语言。它的特点是简单、统一、图形化、能表达软件设计中的动态与静态信息。 UML 从目标系统的不同角度出发&#xff0c;定义了用例图、类图、对象图、状态图、活动图…

网络流量监控分析

网络管理员是维护健全网络基础设施的关键&#xff0c;这通常是一项艰巨的任务&#xff0c;因为管理员需要 24x7 全天候监控和管理网络和服务器。但是&#xff0c;即使进行全天候监控&#xff0c;每个网络也容易受到带宽占用的影响&#xff0c;如果导致关键业务应用程序变慢&…

CSS 实现 Turbo 官网 3D 网格线背景动画

转载请注明出处&#xff0c;点击此处 查看更多精彩内容 查看 Turbo 官网 时发现它的背景动画挺有意思&#xff0c;就自己动手实现了一下。下面对关键点进行解释说明&#xff0c;查看完整代码及预览效果请 点击这里。 简单说明原理&#xff1a;使用 mask-image 遮罩绘制网格&a…

二叉树--C语言实现数据结构

本期带大家一起用C语言实现二叉树&#x1f308;&#x1f308;&#x1f308; 1、二叉树的定义 二叉树是一种特殊的树状数据结构&#xff0c;它由节点组成&#xff0c;每个节点最多有两个子节点&#xff0c;分别称为左子节点和右子节点 二叉树的链式存储结构是指用 链表 来表示…

公司私服Maven踩坑,项目配置都OK但是包就是下载不下来【已解决】

我的问题是公司的私服Maven下载不下来&#xff0c;因为公司保密协议&#xff0c;这里用阿里云为例&#xff01; 具体的至少参考&#xff1a;(32条消息) 这篇博文只讲MirrorOf_Java软件工程师的博客-CSDN博客 1&#xff1a;Java的Maven爆红了就找到资源库&#xff0c;然后把对于…

2.10messagebox弹窗

2.10messagebox弹窗 messagebox部件 其实这里的messagebox就是我们平时看到的弹窗。 我们首先需要定义一个触发功能&#xff0c;来触发这个弹窗 这里我们就放上以前学过的button按钮 tk.Button(window, texthit me, commandhit_me).pack()通过触发功能&#xff0c;调用messa…

超高性能协议框架fury完爆protostuff(附性能测试对比)

简单介绍: 序列化框架是系统通信的基础组件&#xff0c;在大数据、AI 框架和云原生等分布式系统中广泛使用。当对象需要跨进程、跨语言、跨节点传输、持久化、状态读写、复制时&#xff0c;都需要进行序列化&#xff0c;其性能和易用性影响运行效率和开发效率。 Fury 是一个基于…

3.2.18 DIR函数的补充说明

【分享成果&#xff0c;随喜正能量】人与人之间都是相互的&#xff0c;你给人搭桥&#xff0c;别人为你铺路&#xff1b;你让人难堪&#xff0c;别人给你添堵。。 我给VBA的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的劳动效…