Flink安装和Flink CDC实现数据同步

news2024/11/13 7:46:35

 一,Flink 和Flink CDC

1, Flink

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。

中文文档 Apache Flink Documentation | Apache Flink

官方文档 :https://flink.apache.org
Flink 中文社区视频课程:https://github.com/flink-china/flink-training-course
Flink 中文社区 :https://www.slidestalk.com/FlinkChina
ververica 教程 :https://training.ververica.com/
ververica 教程中文文档:https://ci.apache.org/projects/flink/flink-docs-master/zh/
源码:https://github.com/apache/flink
Flink 知识图谱 https://developer.aliyun.com/article/744741

2,Flink CDC

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。

目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛:

  •  数据迁移:常用于数据库备份、容灾等;
  •  数据分发:将一个数据源分发给多个下游,常用于业务解耦、微服务;
  •  数据采集:将分散异构的数据源集成到数据仓库中,消除数据孤岛,便于后续的分析。

目前业界主流的 CDC 实现机制可以分为两种:

2.1 基于查询的 CDC

  • 离线调度查询作业,批处理。依赖表中的更新时间字段,每次执行查询去获取表中最新的数据;
  • 无法捕获删除事件,从而无法保证数据一致性;
  • 无法保障实时性,基于离线调度存在天然的延迟。

2.2 基于日志的 CDC

  • 实时消费日志,流处理。例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把binlog 文件当作流的数据源;
  • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
  • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。

2.3 主流CDC技术对比

1) DataX 不支持增量同步,Canal 不支持全量同步。虽然两者都是非常流行的数据同步工具,但在场景支持上仍不完善。

2) 在全量+增量一体化同步方面,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。

3) 在架构方面,Apache Flink 是一个非常优秀的分布式流处理框架,因此 Flink CDC 作为Apache Flink 的一个组件具有非常灵活的水平扩展能力。而 DataX 和 Canal 是个单机架构,在大数据场景下容易面临性能瓶颈的问题。

4) 在数据加工的能力上,CDC 工具是否能够方便地对数据做一些清洗、过滤、聚合,甚至关联打宽。Flink CDC 依托强大的 Flink SQL 流式计算能力,可以非常方便地对数据进行加工。而Debezium 等则需要通过复杂的 Java 代码才能完成,使用门槛比较高。

5) 另外,在生态方面,这里指的是上下游存储的支持。Flink CDC 上下游非常丰富,支持对接MySQL、PostgreSQL 等数据源,还支持写入到 TiDB、HBase、Kafka、Hudi 等各种存储系统中,也支持灵活的自定义 connector。

二,安装Flink

1,为了运行Flink,只需提前安装好 Java 11或以上版本

java -version

2,下载Flink

下载地址:Index of /dist/flinkicon-default.png?t=O83Ahttps://archive.apache.org/dist/flink

可以找到你要安装的版本

wget https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz

3,解压

 tar -xzf flink-1.19.1-bin-scala_2.12.tgz

4,进入到flink-1.19.1目录,启动集群

./bin/start-cluster.sh

5,如果想访问WebUI,可以看下面

【问题解决】Flink在linux上运行成功但是无法访问webUI界面_flink web ui的ip访问-CSDN博客

6,停止集群

./bin/stop-cluster.sh

三,运行Flink示例

在examples目录下有很多示例,可以试着运行

1,运行单词统计示例

./bin/flink run examples/streaming/WordCount.jar

进入到log目录,查看日志

tail -f flink-root-taskexecutor-0-rocky8-template.out

四,运行Flink CDC示例

1,通过编写sql脚本来实现同步数据

SET execution.checkpointing.interval = 5s;

drop table if exists  order_01;
CREATE TABLE order_01 (
  id INT NOT NULL,
  `order_id`  VARCHAR,
  amount VARCHAR,
  remark VARCHAR,
  PRIMARY KEY (`id`) NOT ENFORCED
 ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'x.x.x.x',
    'port' = '3306',
    'username' = 'root',
    'password' = 'p4ssword',
    'database-name' = 'account',
    'server-time-zone' = 'UTC',
    'table-name' = 'order_01'
 );

drop table if exists  order_02;
CREATE TABLE order_02 (
  id INT NOT NULL,
  `order_id`  VARCHAR,
  amount VARCHAR,
  remark VARCHAR,
  PRIMARY KEY (`id`) NOT ENFORCED
 ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://x.x.x.x:3306/account?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
    'username' = 'root',
    'password' = 'p4ssword',
    'table-name' = 'order_02',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'scan.fetch-size' = '200'
 );

INSERT INTO order_02
SELECT id,order_id,amount,remark
FROM order_01;

编写以上脚本,命名为flinkCdc2Mysql.sql,上传到flink的sql目录下,这里的sql是我新建的,你可以自己指定。

需要下载flink-connecter和flink-sql-connector包

下载地址:Central Repository: com/ververica

 通过以下命令执行

./bin/sql-client.sh -f ./sql/flinkCdc2Mysql.sql

就可以完成数据从order_01表同步到order_02表。

2, 编写Java类,编译成jar,执行jar来实现数据同步

AccountVoucherSumaryDWSSQL类
package com.xxx.demo

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @date 2024/10/31 下午3:57
 */
public class AccountVoucherSumaryDWSSQL {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        env.enableCheckpointing(5000);

        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        tEnv.executeSql("CREATE DATABASE IF NOT EXISTS account");

        // 动态表,此为source表
        tEnv.executeSql("CREATE TABLE account.order_01 (\n" +
            "    id                           INT,\n" +
            "    order_id                     VARCHAR,\n" +
            "    amount                       VARCHAR,\n" +
            "    remark                       VARCHAR,\n" +
            "    PRIMARY KEY (id) NOT ENFORCED\n" +
            ") WITH (\n" +
            "  'connector' = 'mysql-cdc',\n" +
            "  'hostname' = 'x.x.x.x',\n" +
            "  'port' = '3306',\n" +
            "  'username' = 'root',\n" +
            "  'password' = 'p4ssword',\n" +
            "  'database-name' = 'account',\n" +
            "  'table-name' = 'order_01',\n" +
            "  'server-time-zone' = 'UTC',\n" +
            "  'scan.incremental.snapshot.enabled' = 'false'\n" +
            ")");

        // 动态表,此为sink表。sink表和source表的connector不一样
        tEnv.executeSql("CREATE TABLE account.order_02 (\n" +
            "    id                           INT,\n" +
            "    order_id                     VARCHAR,\n" +
            "    amount                       VARCHAR,\n" +
            "    remark                       VARCHAR,\n" +
            "    PRIMARY KEY (id) NOT ENFORCED\n" +
            ") WITH (\n" +
            "  'connector' = 'jdbc',\n" +
            "  'url' = 'jdbc:mysql://x.x.x.x:3306/account?useSSL=false&allowPublicKeyRetrieval=true&connectionTimeZone=UTC',\n" +
            "  'username' = 'root',\n" +
            "  'password' = 'p4ssword',\n" +
            "  'table-name' = 'order_02',\n" +
            "  'sink.buffer-flush.max-rows' = '1',\n" +
            "  'sink.buffer-flush.interval' = '1s',\n" +
            "  'sink.max-retries' = '3' \n"+
            ")");

        tEnv.executeSql("INSERT INTO account.order_02 (id, order_id, amount, remark)\n" +
            " select t1.id,\n" +
            "       t1.order_id,\n" +
            "       t1.amount,\n" +
            "       t1.remark\n" +
            " from account.order_01 t1 \n");

        env.execute();
    }
}

pom文件
 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xxx.demo</groupId>
    <artifactId>FlinkDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.12</scala.version>
        <java.version>8</java.version>
        <flink.version>1.19.1</flink.version>
        <fastjson.version>1.2.62</fastjson.version>
        <hadoop.version>2.8.3</hadoop.version>
        <scope.mode>compile</scope.mode>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.2.0-1.19</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.2.0-1.19</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <!-- Add log dependencies when debugging locally -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.7</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Test dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.26</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.15</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>3.0.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>

            <!-- 打fatjar配置 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <!--这里指定要运行的main类-->
                            <mainClass>com.xxx.demo.AccountVoucherSumaryDWSSQL</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- 此处指定继承合并 -->
                        <phase>package</phase> <!-- 绑定到打包阶段 -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

完成之后,通过maven来打包。

上传包到flink下examples目录下,这里也新建一个目录CDC

通过以下命令来运行

./bin/flink run examples/cdc/FlinkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar

上面两种运行方式运行之后,都可以去Flink的Web页面查看到运行的任务。

五,小结

上面只是搭建了Flink,并运行了示例项目,初步体验了一下Flink的功能,还完全到不了实际项目中运用的程度。后续会一步步去探索Flink在项目中应用的技巧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2237594.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

车机版 Android Audio 框架笔记

车机版Android Audio 框架涉及的知识点很多&#xff0c;在工作中涉及的功能板块也及其繁杂&#xff0c;后面我会根据工作中的一些实际遇到的实例&#xff0c;逐步拆解 Android Audio的知识点&#xff0c;这里从网上整理了一些思维导图&#xff0c;可以做为未来的一个研究方向&a…

ubuntu 22.04 镜像源更换

双11抢了个云服务器&#xff0c;想要整点东西玩玩&#xff0c;没想到刚上来就不太顺利 使用sudo apt update更新软件&#xff0c;然后发生了如下报错 W: Failed to fetch http://mirrors.jdcloudcs.com/ubuntu/dists/jammy/InRelease 理所当然想到可能是镜像源连接不是很好&…

浅谈Agent

目录 什么是大模型 Agent &#xff1f; 大模型Agent 有哪些部分组成? 规划&#xff08;Planning&#xff09; Planning类型 不依赖反馈的计划 基于反馈的计划 拆解子目标和任务分解方法 COT TOT GOT LLMP 反思和完善 ReAct(融合推理与执行的能力) Reflexion(动态…

NAT网络工作原理和NAT类型

NAT基本工作流程 通常情况下&#xff0c;某个局域网中&#xff0c;只有路由器的ip是公网的&#xff0c;局域网中的设备都是内网ip&#xff0c;内网ip不具备直接与外部应用通信的能力。 处于内网的设备如何借助NAT来实现访问外网的应用&#xff1f; 对于开启了NAT功能的局域网…

Jenkins插件使用问题总结

Git Push插件 插件介绍 主要是用于git推送代码到远程仓库中使用&#xff0c;插件地址 pipeline中使用 官方说明中只有一句代码gitPush(gitScm: scm, targetBranch: env.BRANCH_NAME, targetRepo: origin) 流水线语法中也做的不齐全所以一开始我老是设置错&#xff0c;导致代…

GPT-5 终于来了 —— 人们的预期与现实

高智慧人工智能的两面性&#xff0c;利用AI和被AI利用 前言&#xff1a;人工智能的热度持续升温&#xff0c;似乎已无处不在&#xff0c;但大家对它的感知却并不显著。这种状况有点像美国 2024 年的总统大选&#xff0c;投票前人们彼此不清楚支持谁&#xff0c;直到最终计票才发…

微服务透传日志traceId

问题 在微服务架构中&#xff0c;一次业务执行完可能需要跨多个服务&#xff0c;这个时候&#xff0c;我们想看到业务完整的日志信息&#xff0c;就要从各个服务中获取&#xff0c;即便是使用了ELK把日志收集到一起&#xff0c;但如果不做处理&#xff0c;也是无法完整把一次业…

Matlab实现鲸鱼优化算法优化随机森林算法模型 (WOA-RF)(附源码)

目录 1.内容介绍 2.部分代码 3.实验结果 4.内容获取 1内容介绍 鲸鱼优化算法&#xff08;Whale Optimization Algorithm, WOA&#xff09;是受座头鲸捕食行为启发而提出的一种新型元启发式优化算法。该算法通过模拟座头鲸围绕猎物的螺旋游动和缩小包围圈的方式&#xff0c;在…

【学习笔记】网络设备(华为交换机)基础知识 10 —— 信息中心 ① 简介

提示&#xff1a;学习华为交换机信息中心的概述&#xff08; 包括信息中心的概念、功能、以及信息的分类、分级、和输出 &#xff09; &#xff1b;还包括信息中心常用的命令 &#xff08; 使能信息中心、命名信息通道、配置信息过滤、清除统计信息、查看信息中心相关信息的命令…

【unity】unity2021 URP管线下 SceneView没有MipMaps选项了怎么办?扩展Rendering Debugger工具

一、前言 之前项目 Unity打开后 Scene窗口 有一个MipMaps选项模式&#xff0c; 可以查看哪些贴图正常距离下发红 &#xff0c;说明用不到那么大&#xff0c;可以缩一下尺寸。 但 新的项目在Unity2021上&#xff0c;用了URP&#xff0c; 就没见过这个选项。 查了一篇介绍详细的…

前端代码分析题(选择题、分析题)——JS事件循环分析、await和作用域分析

Promise其实也不难-CSDN博客 Promise 的执行顺序分析 Promise 对象的执行是异步的&#xff0c;但其执行器函数内部的代码是立即执行的&#xff0c;而 then方法注册的回调函数则是在 Promise 状态改变后执行的。 const myPromise new Promise((resolve, reject) > {conso…

DAY24|回溯算法Part03|LeetCode:93.复原IP地址、78.子集、90.子集II

目录 LeetCode:93.复原IP地址 基本思路 C代码 LeetCode:78.子集 基本思路 C代码 LeetCode:90.子集II 基本思路 C代码 通过used实现去重 通过set实现去重 不使用used和set版本 LeetCode:93.复原IP地址 力扣代码链接 文字讲解&#xff1a;LeetCode:93.复原IP地…

ts 将100个元素,每行显示9个元素,然后显示出所有行的元素,由此我们延伸出一个项目需求的简单算法实现。

1、先看一下baidu ai出的结果&#xff1a; 2、我们将上面的代码修改下&#xff0c;定义一个数组&#xff0c;然后记录每行的行号及相应的元素&#xff1a; <template><div>console</div> </template> <script setup lang"ts"> import …

17、论文阅读:VMamba:视觉状态空间模型

前言 设计计算效率高的网络架构在计算机视觉领域仍然是一个持续的需求。在本文中&#xff0c;我们将一种状态空间语言模型 Mamba 移植到 VMamba 中&#xff0c;构建出一个具有线性时间复杂度的视觉主干网络。VMamba 的核心是一组视觉状态空间 (VSS) 块&#xff0c;搭配 2D 选择…

用 Python 从零开始创建神经网络(三):添加层级(Adding Layers)

添加层级&#xff08;Adding Layers&#xff09; 引言1. Training Data2. Dense Layer Class 引言 我们构建的神经网络变得越来越受人尊敬&#xff0c;但目前我们只有一层。当神经网络具有两层或更多隐藏层时&#xff0c;它们变成了“深度”网络。目前我们只有一层&#xff0c…

推荐一款功能强大的视频修复软件:Apeaksoft Video Fixer

Apeaksoft Video Fixer是一款功能强大的视频修复软件&#xff0c;专门用于修复损坏、不可播放、卡顿、画面失真、黑屏等视频问题。只需提供一个准确且有效的样本视频作为参考&#xff0c;该软件就能将受损视频修复到与样本视频相同的质量。该软件目前支持MP4、MOV、3GP等格式的…

Web前端开发--HTML语言

文章目录 前言1.介绍2.组成3.基本框架4.常见标签4.1双标签4.1.1.标题标签4.2.2段落标签4.1.3文本格式化标签4.1.4超链接标签4.1.5视频标签4.1.6 音频标签 4.2单标签4.2.1换行标签和水平线标签4.2.2 图像标签 5.表单控件结语 前言 生活中处处都有网站&#xff0c;无论你是学习爬…

[ DOS 命令基础 2 ] DOS 命令详解-网络相关命令

&#x1f36c; 博主介绍 &#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 _PowerShell &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【数据通信】 【通讯安全】 【web安全】【面试分析】 &#x1f389;点赞➕评论➕收藏 养成习…

gitlab无法创建合并请求是所有分支都不显示

点击Merge Requests ------> New merge request 创建新的合并请求时&#xff0c;在Source branch和Target branch中一个分支都不显示 排查思路&#xff1a; 1.怀疑是权限问题。 发现只有我的一个账号出现&#xff0c;检查了账号的权限&#xff0c;尝试了master、develop角色…

【温度表达转化】

【温度表达转化】 C语言代码C代码Java代码Python代码 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; 利用公式 C5∗(F−32)/9 &#xff08;其中C表示摄氏温度&#xff0c;F表示华氏温度&#xff09; 进行计算转化。 输出 输出一行&#x…