FlinkCDC初体验

news2025/1/13 7:37:13

一、CDC简介
1.1 什么是CDC?

CDC是 Change Data Capture(变更数据获取 )的简称。 核心思想是,监测并捕获数据库的
变动(包括数据或数据表的插入 、 更新 以及 删除等),将这些变更按发生的顺序完整记录下
来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2 CDC的种类

CDC主要分为 基于查询 和 基于 Binlog两种方式,这两种之间的区别:
在这里插入图片描述

1.3 Canal、 Maxwell、 Debezium、FlinkCDC 之间的区别
在这里插入图片描述

1.4. 什么是FlinkCDC?

1.5 FlinkCDC之DataStream
1.5.1 导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>Flink-CDC</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.0.0</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.13.0</version> 
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.28</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs> </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

1.5.2 创建FlinkCDC测试类

package com.lzl;

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**
 * @author lzl
 * @create 2023-05-04 11:21
 * @name FlinkCDC
 */
public class FlinkCDC {

    public static void main(String[] args) throws Exception{

        //1.获取Flink的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.通过FlinkCDC构建SourceFunction
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("ch01")
                .port(3306)
                .username("root")
                .password("xxb@5196")
                .databaseList("cdc_test")
                .tableList("cdc_test.user_info")  //表前一定要加上库名
                .deserializer(new StringDebeziumDeserializationSchema())  //这是官网的,我们后面要自己定义反序列
                .startupOptions(StartupOptions.initial())
                .build();

        //3.使用 CDC Source从 MySQL读取数据
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);

        //4.数据打印
        dataStreamSource.print();

        //5.启动任务
        env.execute("FlinkCDC");
    }
}

1.5.3 开启Binlog日志并创建数据库

[root@ch01 ~]# vim /etc/my.cnf

添加这部分:
在这里插入图片描述
然后重启MySQL服务器:

[root@ch01 ~]# systemctl restart mysqld

在/var/mysql下可以看到mysql-bin.index的文件(一般)
我的是在/data/mysql/mysql-bin
在这里插入图片描述
或者打开MySQL客户端,输入以下命令:

mysql> show variables like 'log_%';
+----------------------------------------+-----------------------------+
| Variable_name                          | Value                       |
+----------------------------------------+-----------------------------+
| log_bin                                | ON                          |
| log_bin_basename                       | /data/mysql/mysql-bin       |
| log_bin_index                          | /data/mysql/mysql-bin.index |
| log_bin_trust_function_creators        | OFF                         |
| log_bin_use_v1_row_events              | OFF                         |
| log_builtin_as_identified_by_password  | OFF                         |
| log_error                              | /var/log/mysql/mysql.log    |
| log_error_verbosity                    | 3                           |
| log_output                             | FILE                        |
| log_queries_not_using_indexes          | OFF                         |
| log_slave_updates                      | OFF                         |
| log_slow_admin_statements              | OFF                         |
| log_slow_slave_statements              | OFF                         |
| log_statements_unsafe_for_binlog       | ON                          |
| log_syslog                             | OFF                         |
| log_syslog_facility                    | daemon                      |
| log_syslog_include_pid                 | ON                          |
| log_syslog_tag                         |                             |
| log_throttle_queries_not_using_indexes | 0                           |
| log_timestamps                         | UTC                         |
| log_warnings                           | 2                           |
+----------------------------------------+-----------------------------+
21 rows in set (0.00 sec)

mysql> show binary logs; 
+------------------+-----------+
| Log_name         | File_size |
+------------------+-----------+
| mysql-bin.000001 |       154 |
+------------------+-----------+
1 row in set (0.00 sec)
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 |      154 | cdc_test     |                  |                   |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)

mysql> show binlog events;
+------------------+-----+----------------+-----------+-------------+---------------------------------------+
| Log_name         | Pos | Event_type     | Server_id | End_log_pos | Info                                  |
+------------------+-----+----------------+-----------+-------------+---------------------------------------+
| mysql-bin.000001 |   4 | Format_desc    |         1 |         123 | Server ver: 5.7.11-log, Binlog ver: 4 |
| mysql-bin.000001 | 123 | Previous_gtids |         1 |         154 |                                       |
+------------------+-----+----------------+-----------+-------------+---------------------------------------+
2 rows in set (0.00 sec)

show binary logs; #获取binlog文件列表
show master status;#查看当前正在写入的binlog文件
show binlog events; #只查看第一个binlog文件的内容
show binlog events in ‘mysql-bin.000002’; #查看指定binlog文件的内容

创建MySQL数据库:cdc_test

mysql> create database cdc_test;
Query OK, 1 row affected (0.00 sec)

1.5.4 创建用户表user_info

mysql> use cdc_test;
Database changed
mysql> CREATE TABLE user_info (
        `id`  int(2)  primary key comment 'id',
        `name` varchar(255) comment '姓名',
        `sex` varchar(255) comment '性别'
    )ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='用户信息表';
Query OK, 0 rows affected (0.01 sec)

没插入数据之前,mysql-bin.000001的位置是在695.
在这里插入图片描述
在表中插入一行数据之后看它有何变化?
在这里插入图片描述
在这里插入图片描述
说明MySQL日志有监控到了。同时位置在983那里。

1.5.5 启动FlinkCDC程序
在这里插入图片描述

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1683182998, file=mysql-bin.000001, pos=983}} 
ConnectRecord{topic='mysql_binlog_source.cdc_test.user_info', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.cdc_test.user_info.Key:STRUCT}, 
value=Struct{after=Struct{id=1,name=关羽,sex=male},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1683182998091,snapshot=last,db=cdc_test,table=user_info,server_id=0,file=mysql-bin.000001,pos=983,row=0},op=r,ts_ms=1683182998099}, 
valueSchema=Schema{mysql_binlog_source.cdc_test.user_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

我们再新增一条数据看看:
在这里插入图片描述
在这里插入图片描述我们修改一条数据:把赵云改成张飞
在这里插入图片描述
在这里插入图片描述
op改成了u,即op=u。
在这里插入图片描述
多了一个before和after。更新前和更新后的数据。

我们删除一条数据看看:
在这里插入图片描述
就只有before了。且op=d。

1.6 开启CK模式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/488420.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Flask】Python基于Flask应用

Flask介绍 Flask 是一款发布于2010年非常流行的 Python Web 框架。 特点 微框架、简洁&#xff0c;给开发者提供了很大的扩展性。Flask和相应的插件写得很好&#xff0c;用起来很爽。 开发效率非常高&#xff0c;比如使用 SQLAlchemy 的 ORM 操作数据库可以节省开发者大量书…

【LeetCode】数据结构题解(5)[分割链表]

分割链表 1.题目来源2.题目描述3.解题思路4.代码展示 所属专栏&#xff1a;玩转数据结构题型 博主首页&#xff1a;初阳785 代码托管&#xff1a;chuyang785 感谢大家的支持&#xff0c;您的点赞和关注是对我最大的支持&#xff01;&#xff01;&#xff01; 博主也会更加的努力…

聊一聊 GDB 调试程序时的几个实用命令

一&#xff1a;背景 1. 讲故事 用惯了宇宙第一的 Visual Studio 再用其他的开发工具还是有一点不习惯&#xff0c;不习惯在于想用的命令或者面板找不到&#xff0c;总的来说还是各有千秋吧&#xff0c;今天我们来聊一下几个在调试中比较实用的命令&#xff1a; 查看内存硬件…

B站java、计算机学习整理(菜鸟版本)

B站java、计算机学习整理&#xff08;菜鸟版本&#xff09; 简介1、入门篇2、工具篇3、数据库篇4、框架篇5、JVM 篇6、源码篇7、算法与数据结构8、操作系统9、计算机组成原理10、计算机网络11、 设计模式 简介 处在互联网时代&#xff0c;是一种幸福&#xff0c;因为各式各样的…

Win10系统开机自动蓝屏无法使用怎么U盘重装系统?

Win10系统开机自动蓝屏无法使用怎么U盘重装系统&#xff1f;今天和大家一起来分享Win10系统蓝屏之后怎么去进行修复的方法。很多用户都有遇到电脑蓝屏无法启动的问题&#xff0c;那么遇到这个问题之后怎么去重装系统呢&#xff1f;接下来我们来看看以下的解决方法分享吧。 准备…

Python突破某网游游戏JS加密限制,进行逆向解密,实现自动登录

前言 大家早好、午好、晚好吖 ❤ ~欢迎光临本文章 今天来分享一下如何使用Python突破某网游游戏JS加密限制&#xff0c;进行逆向解密&#xff0c;实现自动登录。 逆向目标 目标&#xff1a;某 7 网游登录 主页&#xff1a;aHR0cHM6Ly93d3cuMzcuY29tLw 接口&#xff1a;aHR…

牛客_华为_ HJ63 DNA序列

HJ63 DNA序列 st input() n int(input())max_ratio 0 ratio 0 res for i in range(0,len(st)-n1):s st[i:in]ratio s.count(C)s.count(G)if ratio > max_ratio:max_ratio ratiores s print(res)

cPanel XSS漏洞分析研究(CVE-2023-29489)

一、漏洞原理 漏洞简述 cPanel 是一套在网页寄存业中最享负盛名的商业软件&#xff0c;是基于于 Linux 和 BSD 系统及以 PHP 开发且性质为闭源软件&#xff1b;提供了足够强大和相当完整的主机管理功能&#xff0c;诸如&#xff1a;Webmail 及多种电邮协议、网页化 FTP 管理、…

【考前看几题】系统集成项目管理师-2022年上半年-上午真题(广东卷)

前言 汇总知识点、重点问题、难点 由问题引出知识点 软件技术、其他技术、管理基础、整体管理、范围管理、成本管理、人力资源管理 干系人管理、合同管理、采购管理、配置管理、质量管理、风险管理、安全管理 文章目录 前言软件技术、其他技术管理基础整体管理范围管理成本管理…

第12届蓝桥杯国赛真题剖析-2021年5月29日Scratch编程初中级组

[导读]&#xff1a;超平老师的《Scratch蓝桥杯真题解析100讲》已经全部完成&#xff0c;后续会不定期解读蓝桥杯真题&#xff0c;这是Scratch蓝桥杯真题解析第128讲。 第12届蓝桥杯Scratch国赛真题&#xff0c;这是2021年5月29日举办的全国总决赛&#xff0c;比赛仍然采取线上…

【java】Java 异常处理的十个建议

文章目录 前言一、尽量不要使用e.printStackTrace(),而是使用log打印。二、catch了异常&#xff0c;但是没有打印出具体的exception&#xff0c;无法更好定位问题三、不要用一个Exception捕捉所有可能的异常四、记得使用finally关闭流资源或者直接使用try-with-resource五、捕获…

TCP协议特性讲解

文章目录 TCP报文结构确认应答超时重传三次握手与四次挥手滑动窗口流量控制拥塞控制延时应答捎带应答面向字节流 - 粘包问题异常处理 - 心跳包 TCP报文结构 16位源端口号&#xff1a;表示数据从哪来的。 16位目的端口号&#xff1a;表示数据要到哪里去。 32位序号&#xff1a;由…

Centos7 安装 MySql8

1、查看是否安装 mariadb rpm -qa | grep mariadb 显示&#xff1a;mariadb-libs-5.5.56-2.el7.x86_64 2、卸载 mariadb rpm -e --nodeps mariadb-libs-5.5.56-2.el7.x86_64 3、安装 mysql 依赖包 yum install libaio 4、创建 mysql 安装目录 mkdir /usr/local/mysql 创…

【完整攻略】OPPO手机无密码解锁方法

全世界有数百万人拥有 OPPO 手机。它以其经济实惠但功能强大的智能手机而闻名。但是&#xff0c;与许多其他人一样&#xff0c;您可能会在某些日子后忘记密码。那么&#xff0c;如果您忘记了 OPPO 手机的密码或图案怎么办&#xff1f;你将如何解锁它&#xff1f;这是一个大问题…

Java+大数据学习笔记分享!

今天给大家分享一个笔记网站&#xff01; 我在码云中整理超全面学习笔记内容&#xff01; Gitee地址&#xff1a;https://gitee.com/fanggaolei/learning-notes-warehouse 项目中目前包含了大数据和Java后端方向的全部学习笔记&#xff0c;同样也是一个学习路线&#xff0c;笔记…

1-photoshop--修改图片内容--填充的使用

一、photoshop--修改图片内容--填充的使用&#xff1a; 在photohsop中没有发现图形填充功能&#xff0c;如何实现功能&#xff1f; 需要把这个图片中的原有文字覆盖 &#xff0c;1.使用吸管工具 &#xff0c;获取图片背景颜色&#xff0c;前景色为需要的颜色 2.在需要的选择进…

采购管理怎么做(详解采购工作流程)

阅读本文您将了解&#xff1a;1.采购管理的意义&#xff1b;2.采购工作流程&#xff08;中小企业&#xff09;&#xff1b;3.采购管理未来发展趋势。 一、采购管理的意义 采购管理是指企业为满足生产经营需要&#xff0c;对外部供应商进行的采购活动的规划、组织、实施和控制…

RocketMQ-ONS 内存占用过大问题处理

RocketMQ-ONS 内存占用过大问题处理 1、问题环境描述2、问题现像描述3、问题分析3.1、问题定位阶段1&#xff08;确认内存占用原因&#xff09;3.2、问题定位阶段2&#xff08;缓存参数配置无效问题&#xff09;3.3、问题定位阶段2&#xff08;分析占用原因&#xff09; 4、解决…

详解文件操作和 IO

&#x1f397;️ 主页&#xff1a;小夜时雨 &#x1f397;️ 专栏&#xff1a;javaEE初阶 &#x1f397;️ 如何优雅的活着&#xff0c;是我找寻的方向 目录 一、认识文件二、文件路径三、文件系统操作四、文件内容的操作读写 - 数据流4.1 InputStream 概述4.2 OutputStream 概…

快讯 | ALVA Systems 参加 Open Bosch 首届 Demo Day 活动

4 月 27 日&#xff0c;博世&#xff08;中国&#xff09;投资有限公司&#xff08;下以“博世”简称&#xff09;在上海总部举办 Open Bosch 第一届 Demo Day 活动&#xff0c;展示与初创伙伴的合作项目成果&#xff0c;宣传合作理念&#xff0c;为初创伙伴搭建参观交流的开放…