使用Flink CDC实时监控MySQL数据库变更

news2025/1/11 18:32:14

在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据,并将其打印到控制台。

环境准备

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.75</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.23</version>
</dependency>
  1. 获取Flink执行环境

首先,我们需要获取Flink的执行环境。这是所有Flink作业的起点。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  1. 启用检查点和设置并行度

为了确保作业的容错性和状态恢复,我们需要启用检查点,并设置作业的并行度。

env.enableCheckpointing(500); // 每500毫秒创建一个检查点
env.setParallelism(1); // 设置作业的并行度为1
  1. 使用Debezium Source读取MySQL的binlog

接下来,我们使用Debezium Source读取MySQL的binlog。我们需要配置MySQL的连接信息、监控的数据库和表、反序列化器以及启动选项。

DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
        .serverTimeZone("Asia/Shanghai") // 设置时区为亚洲/上海
        .hostname("localhost") // MySQL的IP地址
        .port(3306) // MySQL的端口
        .username("root") // MySQL的用户名
        .password("123456") // MySQL的密码
        .databaseList("my_db") // 监控的数据库
        .tableList("my_db.user") // 监控的数据库下的表
        .deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化
        .startupOptions(StartupOptions.initial()) // 启动选项
        .build();

这里 JsonDebeziumDeserializationSchema类的代码如下:

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

/**
*  自定义DeserializationSchema进行反序列化。
*/

public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
   @Override
   public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
       //创建JSON对象用于存储最终数据
       JSONObject result = new JSONObject();
       String topic = sourceRecord.topic();
       String[] fields = topic.split("\\.");
       String database = fields[1];
       String tableName = fields[2];
       Struct value  = (Struct)sourceRecord.value();
       //获取before数据
       Struct before = value.getStruct("before");
       JSONObject beforeJson = getJson(before);
       //获取after数据
       Struct after = value.getStruct("after");
       JSONObject afterJson = getJson(after);
       //获取操作类型
       Envelope.Operation operation = Envelope.operationFor(sourceRecord);
       //将字段写入JSON对象
       result.put("database",database);
       result.put("tableName",tableName);
       result.put("type",operation);
       result.put("before",beforeJson);
       result.put("after",afterJson);
       //输出数据
       collector.collect(result.toJSONString());
   }
   /**
    *  获取字段值并写入result对象
    * @param before
    * @return
    */
   private JSONObject getJson(Struct before) {
       JSONObject jsonObject = new JSONObject();
       if(before != null){
           Schema beforeSchema = before.schema();
           List<Field> beforeFields = beforeSchema.fields();
           for (Field field : beforeFields) {
               Object beforeValue = before.get(field);
               jsonObject.put(field.name(), beforeValue);
           }
       }
       return jsonObject;
   }
   @Override
   public TypeInformation getProducedType() {
       return BasicTypeInfo.STRING_TYPE_INFO;

   }
}
  1. 添加数据源并打印数据

将Debezium源函数添加到Flink环境中,生成一个数据流,并将数据流中的数据打印到控制台。

DataStream<String> dataStreamSource = env.addSource(sourceFunction, TypeInformation.of(String.class));
DataStreamSink<String> print = dataStreamSource.print();
  1. 启动任务

最后,启动Flink作业,开始处理数据流。

env.execute("Flink-CDC");

6.测试

在这里插入图片描述

总结

通过上述步骤,我们可以使用Flink CDC实时监控MySQL数据库的变更,并将变更数据以JSON格式打印出来。这种方法不仅适用于数据监控,还可以用于实时数据处理和分析。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1854522.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode 二分查找·系统掌握 搜索二维矩阵

题目&#xff1a; 题解&#xff1a; 一个可行的思路是使用~01~泛型对每一行的最后一个元素进行查找找到第一个大于等于target的那一行&#xff0c;判断查找结果如果“失败”返回false否则继续在改行进行常规二分查找target的值根据查找结果返回即可。 bool searchMatrix(vec…

基于Quartus Prime18.1的安装与FPGA的基础仿真(联合Modelsim)教程

Quartus是一种美国科技公司Intel&#xff08;英特尔&#xff09;公司开发的FPGA&#xff08;现场可编辑门阵列&#xff09;设计编译软件&#xff0c;用作设计、仿真、综合和布局、支持多种编程语言&#xff0c;包括VHDL、Verilog等&#xff0c;并具有丰富的功能和工具库&#x…

【Python机器学习】NMF——将NMF应用于模拟信号数据

假设我们对一个信号感兴趣&#xff0c;它是由三个不同信号源合成的&#xff1a; import matplotlib.pyplot as plt import mglearnSmglearn.datasets.make_signals() plt.figure(figsize(6,1)) plt.plot(S,-) plt.xlabel(Time) plt.ylabel(Signal) plt.show()不幸的是&#xff…

基于imx6ull开发板 移植opencv4.7.0

一、概述 本章节是针对opencv-4.7.0移植到Linux系统&#xff0c;运行在正点原子-I.MX6U ALPHA开发板 上&#xff0c;详细的移植流程如下。 二、环境要求 2.1 硬件环境 正点原子-I.MX6U ALPHA开发板虚拟机&#xff1a;VMware 2.2 软件环境 Ubuntu系统要求&#xff1a;20.0…

[SAP ABAP] 排序内表数据

语法格式 整表排序 SORT <itab> [ASCENDING|DESCENDING]. 按指定字段排序 SORT <itab> BY f1 [ASCENDING|DESCENDING] f2 [ASCENDING|DESCENDING] ... fn [ASCENDING|DESCENDING].<itab>&#xff1a;代表内表 不指定排序方式则默认升序排序 示例1 结果显…

Posix多线程编程总结

Posix在线文档&#xff1a; The Single UNIX Specification, Version 2 (opengroup.org) 本文主要参考这位大神的文章&#xff1a; Posix多线程编程学习笔记 - 凌峰布衣 - 博客园 (cnblogs.com) 线程安全问题 多线程编程中&#xff0c;经常遇到的就是线程安全问题&#xff0c;或…

React AntDesign Layout组件布局刷新页面错乱闪动

大家最近在使用React AntDesign Layout组件布局后刷新页面时&#xff0c;页面布局错乱闪动 经过组件属性的研究才发现&#xff0c;设置 hasSider 为 true 就能解决上面的问题&#xff0c;耽搁了半天的时间&#xff0c;接着踩坑接着加油&#xff01;&#xff01;&#xff01; …

STM32学习 修改系统主频

前面时钟树的学习说明单片机的主频是可以修改的&#xff0c;那么怎么更改系统的主频&#xff0c;这里做一个简单的介绍。首先要明白&#xff0c;单片机的程序是如何运行&#xff0c;这里简单说明一下。 对应的代码在startup_stm32....文件里面&#xff0c;这里是复位程序的汇编…

第T2周:彩色图片分类

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 &#x1f449; 要求&#xff1a; 学习如何编写一个完整的深度学习程序了解分类彩色图片会灰度图片有什么区别测试集accuracy到达72% &#x1f9be;我的环境&am…

QT事件处理系统之五:自定义事件的发送案例 sendEvent和postEvent接口

1、案例 双击窗口,会发送 自定义事件,然后在事件过滤中心进行拦截处理自定义事件。 2、核心代码 /*解释:双击窗口时,将产生双击事件,然后该事件被包裹成一个对象,随后将会被发往event事件中心,然后进行事件的处理(Widget对象);因为m_lineEdit开启了事件过滤机制,所…

【UML用户指南】-21-对基本行为建模-活动图

目录 1、概念 2、组成结构 2.1、动作 2.2、活动节点 2.3、控制流 2.4、分支 2.5、分岔和汇合 2.6、泳道 2.7、对象流 2.8、扩展区域 3、一般用法 3.1、对工作流建模 3.2、对操作建模 一个活动图从本质上说是一个流程图&#xff0c;展现从活动到活动的控制流 活动图…

图像编辑技术的新篇章:基于扩散模型的综述

在人工智能的浪潮中&#xff0c;图像编辑技术正经历着前所未有的变革。随着数字媒体、广告、娱乐和科学研究等领域对高质量图像编辑需求的不断增长&#xff0c;传统的图像编辑方法已逐渐无法满足日益复杂的视觉内容创作需求。尤其是在AI生成内容&#xff08;AIGC&#xff09;的…

【论文复现|智能算法改进】一种基于多策略改进的鲸鱼算法

目录 1.算法原理2.改进点3.结果展示4.参考文献5.代码获取 1.算法原理 SCI二区|鲸鱼优化算法&#xff08;WOA&#xff09;原理及实现【附完整Matlab代码】 2.改进点 混沌反向学习策略 将混沌映射和反向学习策略结合&#xff0c;形成混沌反向学习方法&#xff0c;通过该方 法…

三十八篇:架构大师之路:探索软件设计的无限可能

架构大师之路&#xff1a;探索软件设计的无限可能 1. 引言&#xff1a;架构的艺术与科学 在软件工程的广阔天地中&#xff0c;系统架构不仅是设计的骨架&#xff0c;更是灵魂所在。它如同建筑师手中的蓝图&#xff0c;决定了系统的结构、性能、可维护性以及未来的扩展性。本节…

LSSS算法实现,基于eigen和pbc密码库【一文搞懂LSSS,原理+代码】

文章目录 一. LSSS简介1.1 概述1.2 线性秘密分享方案&#xff08;LSSS&#xff09;与 Shamir的秘密分享方案对比LSSS1.2.1 Shamir的秘密分享方案1.2.2 线性秘密分享方案&#xff08;LSSS&#xff09;1.2.3 主要区别 二. 基于矩阵的LSSS加解密原理分析2.1 LSSS矩阵构造2.1.1 定义…

【python】python基于微博互动数据的用户类型预测(随机森林与支持向量机的比较分析)(源码+数据集+课程论文)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

群体优化算法---电磁共振优化算法(EROA)介绍包含示例滤波器设计

介绍 电磁共振优化算法&#xff08;Electromagnetic Resonance Optimization Algorithm, EROA&#xff09;是一种新型的元启发式优化算法&#xff0c;其灵感来源于电磁共振现象。电磁共振是一种物理现象&#xff0c;当一个系统在特定频率下响应最大时&#xff0c;这个频率被称…

【算法】优先级队列-基础与应用

优先级队列&#xff08;Priority Queue&#xff09;是一种特殊的队列类型&#xff0c;它允许在其元素中分配优先级。与传统的先进先出&#xff08;FIFO&#xff09;队列不同&#xff0c;优先级队列中元素的出队顺序取决于它们的优先级。优先级较高的元素会被优先处理&#xff0…

LLaMA:挑战大模型Scaling Law的性能突破

实际问题 在大模型的研发中,通常会有下面一些需求: 计划训练一个10B的模型,想知道至少需要多大的数据?收集到了1T的数据,想知道能训练一个多大的模型?老板准备1个月后开发布会,给的资源是100张A100,应该用多少数据训多大的模型效果最好?老板对现在10B的模型不满意,想…

完美解决找不到steam_api64.dll无法执行代码问题

游戏缺失steam_api64.dll通常意味着该游戏依赖于Steam平台的一些功能或服务&#xff0c;而这个DLL文件是Steam客户端的一部分&#xff0c;用于游戏与Steam平台之间的交互。如果游戏中缺失这个文件&#xff0c;可能会出现无法启动、崩溃或其他问题。 一&#xff0c;详细了解stea…