Flink-cdc更好的流式数据集成工具

news2025/1/12 23:10:04

What’s Flink-cdc?

在这里插入图片描述

Flink CDC 是基于Apache Flink的一种数据变更捕获技术,用于从数据源(如数据库)中捕获和处理数据的变更事件。CDC技术允许实时地捕获数据库中的增、删、改操作,将这些变更事件转化为流式数据,并能够对这些事件进行实时处理和分析。

Flink CDC提供了与各种数据源集成的功能,包括常见的关系型数据库(如MySQL、PostgreSQL、Oracle等)以及NoSQL数据库(如MongoDB、HBase等)。它通过监控数据库的日志或轮询方式来捕获数据变更,并将变更事件作为数据流发送到Flink的任务中进行处理。

Flink CDC 深度集成并由 Apache Flink 驱动,提供以下核心功能:

✅ 端到端的数据集成框架
✅ 为数据集成的用户提供了易于构建作业的 API
✅ 支持在 Source 和 Sink 中处理多个表
✅ 整库同步
✅具备表结构变更自动同步的能力(Schema Evolution)

在使用者的角度,就是Flink-cdc可以简化流处理的流程:

  • 引入Flink-cdc之前流处理流程
    在这里插入图片描述

  • 引入Flink-cdc之后后流处理流程
    在这里插入图片描述
    如上所示,在flink-cdc被引入后大大简化了流处理流程

Flink-cdc支持的链接及对应的版本

Pipeline Connectors
在这里插入图片描述
Source Connectors
在这里插入图片描述截止目前(2024-05-23)

Flink-cdc与Flink对应对影版本的关系

在这里插入图片描述截止目前(2024-05-23)

flink-connector-mysql-cdc 实例分析

示例代码

demo代码:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MySqlSourceDemo {

    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("mysql-server-host")
                .port(3306)
                .databaseList("mydb") // 设置捕获的数据库
                .tableList("mydb.products") // 设置捕获的表,如果需要同步整个数据库,请将 tableList 设置为 ".*".
//                .tableList(".*") // 捕获整个数据库的表
//                .tableList("^(?!mysql|information_schema|performance_schema).*") // 设置捕获的表,排除系统库
//                .tableList("mydb.(?!products|orders).*") // 同步排除products和orders表之外的整个my_db库
                .username("flink-cdc")
                .password("xxx")
                .serverId("5400-5405")
                .deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
                .serverTimeZone("Asia/Shanghai") // 设置时区
                .startupOptions(StartupOptions.initial())
                .scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
//                .includeSchemaChanges(true) // 包括 schema 变更
                .build();

        org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration();
        config.setString("rest.port", "8081");
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(config); //本地环境,调试用
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置 3s 的 checkpoint 间隔
        env.enableCheckpointing(3000);
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("file:///tmp/ck");//本地文件系统
//        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 1.14.0 版本开始支持
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // 设置 source 节点的并行度为 4
                .setParallelism(5)
                .print()
                .setParallelism(1); // 设置 sink 节点并行度为 1

        env.execute("Print MySQL Snapshot + Binlog");
    }
}

maven依赖:

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.14.5</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- 将 Apache Flink 的 Web 运行时模块添加到项目中 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope> <!--provided生命周期在test模式才可以运行,在main模式会找不到包-->
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

日志配置文件:
log4j.properties

log4j.rootCategory=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1}:%L - %m%n

启动standalone Flink级群

# jobmanager
docker run -d \
--name flink-jm \
--hostname flink-jm \
-p 8082:8081 \
--env FLINK_PROPERTIES="jobmanager.rpc.address: flink-jm" \
--network flink-network-standalone \
ponylee/flink:1.15.0-java8  \
jobmanager


# taskmanager
docker run -d \
--name flink-tm \
--hostname flink-tm \
--env FLINK_PROPERTIES="jobmanager.rpc.address: flink-jm" \
--network flink-network-standalone \
ponylee/flink:1.15.0-java8 \
taskmanager \
-Dtaskmanager.memory.process.size=1024m \
-Dtaskmanager.numberOfTaskSlots=5 \
-Drest.flamegraph.enabled=true

分析说明

为每个 Reader 设置不同的 Server id

每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 id,称为 Server id。 MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。 因此,如果不同的作业共享相同的 Server id, 则可能导致从错误的 binlog 位置读取数据。 因此,建议通过为每个 Reader 设置不同的 Server id , 假设 Source 并行度为 4,server id 配置必须:serverId(“5400-5405”),5405-5400=5 >= 4。来为 4 个 Source readers 中的每一个分配唯一的 Server id。

查看mysql链接发现
select * from information_schema.processlist where user = ‘flink-cdc’;
在这里插入图片描述Flink-cdc对mysql的影响
正常情况下,Flink-cdc是No-lock Read,主库可以继续处理事务和查询,而不会导致主库进程阻塞,不会对主库产生直接影响。但是,在某些情况下数据同步的过程中可能会对主库产生一些间接影响,比如:网络、IO、CPU负载以及mysql的并发连接数等资源消耗。但这些对主库的开销影响相对较小(全量同步阶段可能比较耗能,但时间相对比较短)。

断点续传

通过从checkpoint/savepoint 恢复,flink-cdc可以保证断点续传。

  • 从checkpoint/savepoint恢复,缩小同步范围,例如:从tableList(“mydb.products,mydb.orders”)或tableList(“.*”) 缩小到 tableList(“mydb.products”),应用更新生效。

  • 应用从checkpoint/savepoint恢复,扩大同步范围的部分不会生效,例如:从tableList(“mydb.products”) 到 tableList(“mydb.products,mydb.orders”)或tableList(“.*”),应用更新不生效生效。若想使动态加表生效,可以显示制定scanNewlyAddedTableEnabled(true) ,来启用扫描新添加的表功能。如没有特殊情况,建议在开发环境开启此配置。

参考:
flink-cdc
flink-cdc docs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1686234.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

客厅落地台灯怎么摆放好?五款精品护眼落地灯分享

客厅落地台灯怎么摆放好&#xff1f;在追求品质生活的今天&#xff0c;许多人都选择入手了落地台灯&#xff0c;它不仅能够帮助补充光线&#xff0c;还能够提供敞亮的照明效果&#xff0c;特别在采光不是很好的地方&#xff0c;而客厅落地台灯怎么摆放好&#xff1f;其实大路灯…

SQL刷题笔记day1

1题目 我的代码&#xff1a; select * from employees order by hire_date desc limit 2,1 标准代码&#xff1a; select * from employees where hire_date (select distinct hire_date from employees order by hire_date desc limit 2,1) 复盘&#xff1a;因为按照入…

DOS学习-目录与文件应用操作经典案例-copy

欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一.前言 二.使用 三.案例 一.前言 copy命令的功能是复制一个或多个已经存在的文件到新的位置&#xff0c;或者将多个文件的内容整合后保存为一个单独的文件&#xff0c;亦或者用于创建批…

docker部署kafka实战

目录 一、部署kafaka、zookeeper 二、测试信息发送与接收 三、kafka进阶 一、部署kafaka、zookeeper 请提前安装docker、docker-compose 安装docker&#xff1a;docker--安装docker-ce-CSDN博客 安装docker-compose&#xff1a; 安装docker-compose_安装 docker-compose-CSD…

URL化00

题目链接 URL化 题目描述 注意点 字符串长度在 [0, 500000] 范围内假定该字符串尾部有足够的空间存放新增字符 解答思路 因为该字符串尾部有足够的空间存放新增字符&#xff0c;所以直接使用大小为s.length()的char数组进行操作&#xff0c;使用idx记录当前操作char数组的…

Vue学习笔记2——创建一个Vue项目

Vue项目 1、创建一个Vue项目2、Vue项目的目录结构3、模版语法4、属性绑定5、条件渲染 1、创建一个Vue项目 vue官方文档&#xff1a; https://cn.vuejs.org/打开命令行界面&#xff08; “winR"再输入"cmd”&#xff09;&#xff0c;切换位置到指定的位置创建vue项目…

Z缓冲技术在AI去衣中的关键角色

引言&#xff1a; 人工智能&#xff08;AI&#xff09;技术的飞速发展&#xff0c;为图像处理领域带来了革命性的变化。其中&#xff0c;AI去衣技术作为一种新兴的应用&#xff0c;引起了广泛关注。它不仅在多媒体内容的编辑、虚拟现实和增强现实等领域具有重要的应用价值&…

【B站 heima】小兔鲜Vue3 项目学习笔记Day03

文章目录 Home1.Home整体结构搭建和分类实现2. banner轮播图功能3. Home 面板组件封装4.新鲜好物和人气推荐实现5. 图片懒加载指令实现6. Home- product产品列表实现7. Home-GoodsItem 组件封装 一级路由1. 整体认识和路由配置2. 面包屑导航3. 一级分类 - 轮播图的实现4. 激活状…

MacPro中Ubuntu安装GNOME桌面

第一步&#xff0c;先在MacPro中安装UTM虚拟机。 查看另一文章&#xff1a; https://blog.csdn.net/qq_38382925/article/details/139157877?spm1001.2014.3001.5502 第二步&#xff0c;在虚拟机中安装Ubuntu ARM64 server 查看另一文章&#xff1a; https://blog.csdn.net/qq…

Golang | Leetcode Golang题解之第100题相同的树

题目&#xff1a; 题解&#xff1a; func isSameTree(p *TreeNode, q *TreeNode) bool {if p nil && q nil {return true}if p nil || q nil {return false}queue1, queue2 : []*TreeNode{p}, []*TreeNode{q}for len(queue1) > 0 && len(queue2) > …

uni-app 微信 支付宝 小程序 使用 longpress 实现长按删除功能,非常简单 只需两步

1、先看效果 2、直接上代码 ui结构 <view class"bind" longpress"deleteImage" :data-index"index"><view class"bind_left">绑定设备</view><view class"bind_right"><view class"bind_t…

三方登录- iOS Twitter登录

背景 在现代移动应用中&#xff0c;集成第三方登录已经成为一种常见的需求&#xff0c;它不仅能提高用户体验&#xff0c;还能减少用户注册的阻力。我们选择了 Twitter 作为示例&#xff0c;但类似的步骤和逻辑也适用于其他第三方登录服务。希望这篇博客能为你提供清晰的指导&…

【Spring】spring入门程序

案例要求&#xff1a;创建一个 Studentservice 类&#xff0c;其中需要使用 studentDao 接口的保存方法&#xff0c;来存储一个Student 类的对象&#xff0c;StudentDao 接口有两个不同的实现类&#xff0c;通过 Spring 的方式&#xff0c;为 Student类创建对象并为属性赋值&am…

2024电工杯B题食谱评价与优化模型思路代码论文分析

2024年电工杯数学建模竞赛B题论文和代码已完成&#xff0c;代码为B题全部问题的代码&#xff0c;论文包括摘要、问题重述、问题分析、模型假设、符号说明、模型的建立和求解&#xff08;问题1模型的建立和求解、问题2模型的建立和求解、问题3模型的建立和求解&#xff09;、模型…

vmware - 主机向虚拟机拷贝文件的临时方法

文章目录 vmware - 主机向虚拟机拷贝文件的临时方法概述笔记确认主机/虚拟机之间网络是通的在虚拟机中新建一个文件夹(e.g. c:\test), 将这个文件夹设为共享文件夹。查看虚拟机中的当前用户(远程登录要用)远程登录END vmware - 主机向虚拟机拷贝文件的临时方法 概述 程序打包…

go 微服务框架kratos错误处理的使用方法及原理探究

通过go语言原生http中响应错误的实现方法&#xff0c;逐步了解和使用微服务框架 kratos 的错误处理方式&#xff0c;以及探究其实现原理。 一、go原生http响应错误信息的处理方法 处理方法&#xff1a; ①定义返回错误信息的结构体 ErrorResponse // 定义http返回错误信息的…

vscode 插件开发指南

1安装nodejs、vscode 2安装插件脚手架 npm install -g yo generator-code 3使用命令创建插件项目 yo code 4在vscode中打开项目 5运行调试&#xff0c;按F5键 6在新打开的窗口中按shiftctrlp 然后执行命令 7配置右键菜单命令 遇到问题&#xff1a; 1.package.json中vsc…

【ELK日志收集过程】

文章目录 为什么要使用ELK收集日志ELK具体应用场景ELK日志收集的流程 为什么要使用ELK收集日志 使用 ELK&#xff08;Elasticsearch, Logstash, Kibana&#xff09;进行日志收集和分析有多种原因。ELK 堆栈提供了强大、灵活且可扩展的工具集&#xff0c;能够满足现代 IT 系统对…

B端概念稿,贼靓!像概念车一样未必落地,但是潮流引领。

概念稿在UI设计中往往难以落地&#xff0c; 主要有以下几个原因&#xff1a; 抽象性&#xff1a;概念稿通常是设计师在初始阶段为了表达和传达设计理念而创建的&#xff0c;它们往往比较抽象和概念化。这使得概念稿在实际落地时需要进一步细化和具体化&#xff0c;以便开发人员…

ChatGPT类大模型应用入门了解与使用

一 前言 ChatGPT大众热情逐渐褪去&#xff0c;但在后台技术人的探索还处于热火朝天状态。如果我们生活的世界是一杯清水&#xff0c; 那类似ChatGPT的语言大模型技术的横空出世就如滴入水杯的一滴墨汁&#xff0c;第一滴很显眼&#xff0c;但实际上是后续墨汁慢慢扩散渗透才是…