FlinkCDC 实现 MySQL 数据变更实时同步

news2024/12/24 4:08:57

文章目录

  • 1、基本介绍
  • 2、代码实战
    • 2.1、数据源准备
    • 2.2、代码实战
    • 2.3、数据格式

1、基本介绍

Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据,FlinkCDC 同步数据有两种方式:

  1. FlinkSQL
  2. Flink DataStream 和 Table API(本文使用该方式)
    在这里插入图片描述
    对比其他的CDC开源方案,发现FlinkCDC是绝大多数场景最好的选择方式,别在傻傻的只关注Canal了,如下图所示:
    在这里插入图片描述

2、代码实战

2.1、数据源准备

本次我是用MySQL 8.0版本,并且创建好数据库(库名为quick_chat),本次演示表结构如下:

CREATE TABLE `quick_chat_msg` (
  `id` bigint NOT NULL COMMENT '主键id',
  `from_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(发送人)',
  `to_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(接收人)',
  `relation_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '发送关联',
  `content` varchar(500) DEFAULT NULL COMMENT '消息内容',
  `msg_type` tinyint(1) DEFAULT NULL COMMENT '消息类型(1:文字,2:语音,3:表情包,4:文件,5:语音通话,6:视频通话)',
  `extra_info` varchar(500) DEFAULT NULL COMMENT '额外信息',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `deleted` tinyint(1) DEFAULT NULL COMMENT '删除标识',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

需要保证MySQL的Binlog格式是ROW,不过MySQL 8.0版本格式默认就是ROW:
在这里插入图片描述
最后,要把数据库时区配置好,否则会出现问题,命令如下:

SET persist time_zone = '+8:00';
SET time_zone = '+8:00';
SHOW VARIABLES LIKE '%time_zone%';

在这里插入图片描述

2.2、代码实战

首先,引入Flink CDC相关依赖,内容如下:

<dependencies>
    <!-- Flink connector连接器基础包 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>1.14.0</version>
    </dependency>
    <!-- Flink CDC MySQL源 -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-sql-connector-mysql-cdc</artifactId>
        <version>2.3.0</version>
    </dependency>
    <!-- Flink DataStream数据流API -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.2.0</version>
        <scope>provided</scope>
    </dependency>
    <!-- Flink客户端-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <!--Flink WebUI,端口8081(默认没有开启)-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <!--Flink Table API&SQL程序可以连接到其他外部系统,用于读写批处理表和流式表。-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-runtime_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
</dependencies>

第二步,开发 Sink 监听类,用于监听 MySQL 数据变化:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class MySinkHandler extends RichSinkFunction<String> {
    @Override
    public void invoke(String value, Context context) throws Exception {
        System.out.println(value);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
    }

    @Override
    public void close() throws Exception {
    }
}

最后,配置好 Flink CDC 监听进程,随着项目启动运行:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class MySqlSourceExample {
    @PostConstruct
    public void init() throws Exception {
        // 配置监听数据源
        MySqlSource<String> source = MySqlSource.<String>builder()
                .hostname("8.141.28.132")
                .port(3306)
                // 数据库集合,可以配置多个
                .databaseList("quick_chat")
                // 表集合,可以配置多个
                .tableList("quick_chat.quick_chat_msg")
                .username("root")
                .password("root")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .includeSchemaChanges(true)
                .build();

        // 配置 Flink WebUI
        Configuration configuration = new Configuration();
        configuration.setInteger(RestOptions.PORT, 8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

        // 检查点间隔时间
        // checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。
        env.enableCheckpointing(5000);
        DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .addSink(new MySinkHandler());
        env.execute();
    }
}

项目启动完毕后,可以通过8081端口访问Flink UI页面:
在这里插入图片描述

2.3、数据格式

上述操作完毕后,我对表数据进行了新增、修改、删除操作,控制台可以看到MySQL变更监听日志输出信息:

# 新增
{
    "before": null,
    "after": {
        "id": 3,
        "from_id": "dog",
        "to_id": "cat",
        "relation_id": "dog:cat",
        "content": "你好啊",
        "msg_type": 1,
        "extra_info": null,
        "create_time": 1729164075000,
        "deleted": 0
    },
    "source": {
        "version": "1.6.4.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1729135279000,
        "snapshot": "false",
        "db": "quick_chat",
        "sequence": null,
        "table": "quick_chat_msg",
        "server_id": 1,
        "gtid": null,
        "file": "binlog.000002",
        "pos": 2452,
        "row": 0,
        "thread": null,
        "query": null
    },
    "op": "c",
    "ts_ms": 1729135278633,
    "transaction": null
}
# 修改
{
    "before": {
        "id": 3,
        "from_id": "dog",
        "to_id": "cat",
        "relation_id": "dog:cat",
        "content": "你好啊",
        "msg_type": 1,
        "extra_info": null,
        "create_time": 1729164075000,
        "deleted": 0
    },
    "after": {
        "id": 3,
        "from_id": "dog",
        "to_id": "cat",
        "relation_id": "dog:cat",
        "content": "你好啊,小猫咪",
        "msg_type": 1,
        "extra_info": null,
        "create_time": 1729164075000,
        "deleted": 0
    },
    "source": {
        "version": "1.6.4.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1729135289000,
        "snapshot": "false",
        "db": "quick_chat",
        "sequence": null,
        "table": "quick_chat_msg",
        "server_id": 1,
        "gtid": null,
        "file": "binlog.000002",
        "pos": 2825,
        "row": 0,
        "thread": null,
        "query": null
    },
    "op": "u",
    "ts_ms": 1729135288473,
    "transaction": null
}
# 删除
{
    "before": {
        "id": 3,
        "from_id": "dog",
        "to_id": "cat",
        "relation_id": "dog:cat",
        "content": "你好啊,小猫咪",
        "msg_type": 1,
        "extra_info": null,
        "create_time": 1729164075000,
        "deleted": 0
    },
    "after": null,
    "source": {
        "version": "1.6.4.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1729135301000,
        "snapshot": "false",
        "db": "quick_chat",
        "sequence": null,
        "table": "quick_chat_msg",
        "server_id": 1,
        "gtid": null,
        "file": "binlog.000002",
        "pos": 3247,
        "row": 0,
        "thread": null,
        "query": null
    },
    "op": "d",
    "ts_ms": 1729135300692,
    "transaction": null
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2217323.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

结构体通讲——数据结构解惑

文章目录 一.第一种写法二.第二种三.第三种四.-> 结构&#xff1a;一个变量里包含很多变量 一.第一种写法 int a[]&#xff1b;//一个数组中可以包含许多相同类型的数组 想让一个数组中包含很多不同类型的变量用结构 struct {int a;char bc; }t,ti;//t和ti拥有了前面所定…

谷歌审核放宽,恶意软件不再封号?是反垄断案影响还是开发者们的错觉

最近&#xff0c;谷歌因其“垄断”案而成为科技行业的焦点&#xff0c;这个案件可能导致谷歌业务的重大调整。同时&#xff0c;在Google Play上&#xff0c;一些开发者发现谷歌审核好像放宽了不少&#xff0c;这是不是与反垄断有关&#xff0c;谷歌应用上架或将迎来春天&#x…

MySQL-12.DQL-条件查询

一.DQL-条件查询 -- DQL:条件查询 -- 1.查询 姓名 为 杨逍 的员工 select id, username, password, name, gender, image, job, entrydate, create_time, update_timefrom tb_emp where name 杨逍;-- 2.查询 id小于等于5 的员工信息 select * from tb_emp where id < 5;-…

HT3382 2x75W D类立体声音频功放

1、特点 输出功率(BTL) 2x60W (VDD24V,RL4Ω,THDN1%) 2x75W(VDD24V,RL4Ω,THDN10%) 输出功率(PBTL) 115W(VDD24V,RL2Ω,THDN1%) 140W(VDD24V,RL2Ω,THDN10%) 单电源系统&#xff0c;4.5V-26V宽电压输入范围 超过93%效率&#xff0c;需散热器 扩频功能 MUTE功能 模拟差分/单端输…

LLM - 使用 Neo4j 可视化 GraphRAG 构建的 知识图谱(KG) 教程

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/142938982 免责声明&#xff1a;本文来源于个人知识与公开资料&#xff0c;仅用于学术交流&#xff0c;欢迎讨论&#xff0c;不支持转载。 Neo4j …

中科大科大讯飞开源OpenMusic:音乐生成更高质量,更有乐感

文章链接&#xff1a;https://arxiv.org/pdf/2405.15863 代码链接&#xff1a;https://github.com/ivcylc/qa-mdt Huggingface链接&#xff1a;https://huggingface.co/spaces/jadechoghari/OpenMusic Demo链接&#xff1a;https://qa-mdt.github.io/ &#xff08;chatgpt * 3…

苹果 AI 及国产大模型之争:悬念不再?

AI基本盘&#xff0c;牢牢把握在苹果手里 苹果终于公布了最新的AI进程。 一个月前&#xff0c;正如此前预期的那样&#xff0c;人工智能是今年 WWDC 发布会的焦点。全程105分钟的主题演讲&#xff0c;就有40多分钟用于介绍苹果的AI成果。 苹果似乎还有意玩了一把“谐音梗”&…

人脸识别-特征算法

文章目录 一、LBPH算法1.基本原理2.实现步骤3.代码实现 二、Eigenfaces算法1.特点2.代码实习 三、FisherFaces算法1.算法原理2.算法特点3.代码实现 四、总结 人脸识别特征识别器是数字信息发展中的一种生物特征识别技术&#xff0c;其核心在于通过特定的算法和技术手段&#xf…

跟李沐学AI—pytorch版本锚框代码解析

网上大佬的解释 https://fkjkkll.github.io/2021/11/23/%E7%9B%AE%E6%A0%87%E6%A3%80%E6%B5%8BSSD/?highlight%E9%94%9A%E6%A1%86 w torch.cat((size_tensor * torch.sqrt(ratio_tensor[0]),sizes[0] * torch.sqrt(ratio_tensor[1:])))\* in_height / in_widthh torch.cat((…

当贝投影双十一战报揭晓:天猫投影品类销量稳居首位

相比往年&#xff0c;2024年双11提前了10天&#xff0c;于10月14日正式拉开帷幕。其中&#xff0c;作为国内智能投影头部品牌之一的当贝投影&#xff0c;首战告捷&#xff0c;迎来开门红&#xff0c;战绩相当喜人&#xff01; 根据当贝投影官方数据显示&#xff0c;10月至今当贝…

基于SpringBoot的在线视频教育平台的设计与实现(论文+源码)_kaic

摘 要 随着科学技术的飞速发展&#xff0c;各行各业都在努力与现代先进技术接轨&#xff0c;通过科技手段提高自身的优势&#xff1b;对于在线视频教育平台当然也不能排除在外&#xff0c;随着网络技术的不断成熟&#xff0c;带动了在线视频教育平台&#xff0c;它彻底改变了过…

vue 2.0 使用 html2canvas + jspdf +ant-design-vue 1.x + echarts + 高德地图 导出数据报告

思路&#xff1a; 1. 因为html转为图片再加入到PDF中 会导致截断&#xff0c;因此前端自定义分页添加 类 &#xff08;page&#xff09; 2. 通过page生成图片 在加入PDF中<template><div id"pdf-content"><div class"page first-page">&…

Jmeter接口测试企业级项目实战day2

1.JMeter接口关联 含义&#xff1a;把上一个接口的响应内容&#xff0c;作为下一个接口的请求参数 思路&#xff1a;通过变量来传递数据 步骤&#xff1a; 1.创建&#xff1a;上一个接口&#xff0c;添加【后置处理器】&#xff1a;提取数据创建变量 2.使用&am…

项目分析:自然语言处理(语言情感分析)

在这个信息爆炸的时代&#xff0c;我们每天都在与海量的文本数据打交道。从社交媒体上的帖子、在线评论到新闻报道&#xff0c;文本信息无处不在。然而&#xff0c;这些文本不仅仅是文字的堆砌&#xff0c;它们背后蕴含着丰富的情感和观点。如何有效地理解和分析这些情感&#…

leetcode48:旋转矩阵

题目&#xff1a; 给定一个 n n 的二维矩阵 matrix 表示一个图像。请你将图像顺时针旋转 90 度。 你必须在 原地 旋转图像&#xff0c;这意味着你需要直接修改输入的二维矩阵。请不要 使用另一个矩阵来旋转图像。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,2,3],[4,5…

昆仑虚 - NextJS 项目如何进行部署?

引言 NextJS 是一个构建于 NodeJS 之上的一个 Web 开发框架。它基于 React 特性进行了一些列的扩展!! 在社区中也很是火热, 前段时间 「昆仑虚」 也终于完成了项目的迁移(React > NexJS)!! 那么接下来就是项目部署, NextJS 相比常规的前端部署还是有所区别的: 常规的前端…

便捷实用的桌面时钟 让你随时掌握时间 美观且大气

便捷实用的桌面时钟 让你随时掌握时间 美观且大气。桌面时钟顾名思义就是可以放在桌面上的时钟&#xff0c;这是一款界面优美,功能实用,易于操作的桌面时钟工具芝麻时钟&#xff08;下载地址&#xff1a;https://clock.zhimasoft.cn/?bi&#xff09; 找个好看的桌面时钟&…

【R语言】随机森林+相关性热图组合图

数据概况文末有获取方式 随机森林部分 #调用R包 library(randomForest) library(rfPermute) library(ggplot2) library(psych) library(reshape2) library(patchwork) library(reshape2) library(RColorBrewer) ​ ​ #读取数据 df<-read.csv("F:\\EXCEL-元数据\\2020…

Spring Boot与JavaWeb协同:在线考试系统的实现“

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理基于JavaWeb技术的在线考试系统设计与实现…

【02】Windows特殊权限-Trustedinstaller

知识点&#xff1a; “TrustedInstaller” 是 Windows 操作系统中的一个特殊账户&#xff0c;用于管理和保护重要的系统文件。它是 Windows 模块安装程序 (Windows Modules Installer) 的一部分&#xff0c;负责安装、修改和删除 Windows 更新和可选组件。默认情况下&#xff…