SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志

news2025/1/10 16:14:32

SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志

    • 一、前言
    • 二、技术介绍(Flink CDC)
      • 1、Flink CDC
      • 2、Postgres CDC
    • 三、准备工作
    • 四、代码示例
    • 五、总结

一、前言

在工作中经常会遇到要实时获取数据库(postgresql、mysql等)的变更数据,主要体现数据的实时性;mysql数据库有canal工具实现很简单,但是基于postgresql数据库获取实时数据就比较复杂,之前已经写过一篇获取postgresql数据库实时数据的文章,如下:

【技术实现】java实时同步postgresql变更数据,基于WAL日志

但是,之前的实现方式比较繁琐,不利于维护,所有本文整合Flink CDC通过一个比较简单的方式实现;

二、技术介绍(Flink CDC)

1、Flink CDC

Flink CDC(Change Data Capture)是一个基于Apache Flink构建的开源数据变更捕获(CDC)框架。其核心功能是从各种关系型数据库(如MySQL、PostgreSQL、Oracle等)中捕获数据变更(如增删改操作),并将这些变更以流的形式提供给Flink等流处理引擎进行处理;
1)CDC(Change Data Capture):数据变更捕获的简称,用于监测并捕获数据库的变动,然后将这些变更按照发生顺序捕获,并写入到目标存储系统(如数据仓库、数据湖、消息队列等)。
2)Flink CDC:基于Flink的CDC实现,将CDC技术与Flink流处理引擎相结合,实现数据的实时捕获、处理和传输。

2、Postgres CDC

1)Postgres CDC(Change Data Capture)连接器是用于从PostgreSQL数据库捕获数据变更(如增删改操作)并将其以流的形式提供给数据处理引擎(如Flink)的组件;
2)PostgreSQL版本:Postgres CDC连接器通常支持PostgreSQL的多个版本,具体版本可能因连接器版本不同而有所差异。常见的支持版本包括9.6、10、11、12、13、14等;

三、准备工作

1、安装postgresql数据库,并创建库和测试使用的表,这里不再列举详细步骤;
在这里插入图片描述
2、修改postgresql数据库配置,通过wal日志监听变更数据

修改postgresql.conf文件,重启服务
wal_level=logical

3、springboot关键maven依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.19.0</version>
</dependency>
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-postgres-cdc</artifactId>
    <version>3.0.1</version>
</dependency>

注:其它依赖不在列举,可以通过获取源码查看

四、代码示例

InitAction02.java

package com.sk.proxytest.init;

import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;

@Configuration
public class InitAction02 {

    @PostConstruct
    public void run() throws Exception {
        DebeziumDeserializationSchema<String> deserializer =
                new JsonDebeziumDeserializationSchema();

        JdbcIncrementalSource<String> postgresIncrementalSource =
                PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
                        .hostname("127.0.0.1")
                        .port(5432)
                        .database("postgres")
                        .schemaList("public")
                        .tableList("public.student")
                        .username("postgres")
                        .password("password")
                        .slotName("flink")
                        .decodingPluginName("pgoutput") // use pgoutput for PostgreSQL 10+
                        .deserializer(deserializer)
                        .includeSchemaChanges(true) // output the schema changes as well
                        .splitSize(2) // the split size of each snapshot split
                        .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        env.fromSource(
                postgresIncrementalSource,
                WatermarkStrategy.noWatermarks(),
                "PostgresParallelSource")
                .setParallelism(2).addSink(new CustomSink());
                //.print();

        env.execute("Output Postgres Snapshot");
    }

}

CustomSink.java

package com.sk.proxytest.init;

import lombok.extern.log4j.Log4j2;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

@Log4j2
public class CustomSink extends RichSinkFunction<String> {
    @Override
    public void invoke(String value, Context context) throws Exception {
        log.info("============数据发生变化:{}", value);
    }
}

执行结果:

1)新增数据
在这里插入图片描述

2)变更数据输出

2024-07-31T00:00:15,761 INFO  [debezium-reader-0] io.debezium.util.Threads$3: Creating thread debezium-postgresconnector-postgres_cdc_source-keep-alive
2024-07-31T00:00:15,761 INFO  [debezium-reader-0] io.debezium.connector.postgresql.PostgresStreamingChangeEventSource: Processing messages
2024-07-31T00:00:15,762 INFO  [debezium-reader-0] io.debezium.connector.postgresql.connection.WalPositionLocator: Message with LSN 'LSN{0/3588018}' arrived, switching off the filtering
2024-07-31T00:00:16,678 INFO  [Sink: Unnamed (1/4)#0] com.sk.proxytest.init.CustomSink: ============数据发生变化:{"before":null,"after":{"id":8,"name":"8","age":8,"remark":"8"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres_cdc_source","ts_ms":1722355215252,"snapshot":"false","db":"postgres","sequence":"[null,\"56131608\"]","schema":"public","table":"student","txId":932,"lsn":56131608,"xmin":null},"op":"c","ts_ms":1722355216336,"transaction":null}

五、总结

Postgres CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后继续读取二进制日志,即使发生故障,也会进行一次处理;

Postgres CDC 连接器

👇🏻 👇🏻 👇🏻注:文章源代码关注下面公众号获取👇🏻 👇🏻 👇🏻

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1965738.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

“等保测评下的数据加密与隐私保护“

在当今数字化时代&#xff0c;数据已成为企业最宝贵的资产之一。然而&#xff0c;数据泄露、隐私侵犯等事件频发&#xff0c;不仅给企业带来经济损失&#xff0c;更严重损害了公众信任。等保测评&#xff0c;作为国家信息安全等级保护制度的重要组成部分&#xff0c;对数据加密…

微信小程序开发:宿主环境—组件

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

Python安装与环境配置,2024最新,超详细保姆级教程!

安装Python 来到Python官网&#xff1a;https://www.python.org/ Downloads>Windows&#xff1a; 选择想要的版本后点击进去&#xff1a; 下载后点击安装&#xff1a; 在本地电脑输入命令提示符&#xff1a;winR 环境变量配置 若执行命令提示符&#xff0c;输入Python后&…

Centos7.9开机自启更新系统时间

在CentOS 7.9中设置自动同步系统时间&#xff0c;主要依赖于NTP&#xff08;Network Time Protocol&#xff09;服务。以下是详细的步骤&#xff1a; 1、安装NTP服务 首先&#xff0c;你需要确保NTP服务已经安装在系统上。如果尚未安装&#xff0c;可以通过以下命令进行安装&…

电脑怎么更新系统

电脑系统的更新不仅可以带来新的功能和改进&#xff0c;还能提升系统的安全性和稳定性。许多用户对于如何更新系统可能不太了解&#xff0c;本文将详细介绍电脑系统更新的方法和步骤&#xff0c;帮助大家轻松完成系统更新。 为什么要更新系统&#xff1f;在了解如何更新系统之…

532nm绿光激光模组技术原理及特征

在现代科技飞速发展的今天&#xff0c;绿光激光模组作为一种高精度、高效率的光学器件&#xff0c;在工业加工、指示测量、演示展示等领域展现出了其独特的优势。其中&#xff0c;532nm绿光激光模组以其稳定的光束质量和广泛的应用范围&#xff0c;成为了市场上的热门产品。接下…

如何将幻灯片中的图片背景设置为透明

在制作幻灯片时&#xff0c;我们经常需要插入图片来丰富内容&#xff0c;提升视觉效果。但有时&#xff0c;图片的背景可能会干扰幻灯片的整体设计&#xff0c;这时将图片背景设置为透明就显得尤为重要。本文将详细介绍如何在常用的幻灯片制作软件中实现这一效果&#xff0c;帮…

【Golang 面试 - 进阶题】每日 3 题(十)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/UWz06 &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏…

精选3款国内wordpress 主题,建站首选

WordPress作为一款功能强大且易于使用的建站平台&#xff0c;已经成为了许多企业和个人搭建网站的首选。为了帮助大家更好地选择适合自己的WordPress主题&#xff0c;小编将为大家推荐三款国内优秀的WordPress主题&#xff1a;子比主题、OneNav主题和RiTheme主题。 1.子比主题…

施耐德电气荣获HMS工业网络颁发的第1000万个Anybus模块奖

全球可持续能源解决方案领导者施耐德电气&#xff0c;荣获 HMS 工业网络颁发的第 1000 万个 Anybus 模块奖。通过将 Anybus 网络连接接口集成到其变频器系列中&#xff0c;施耐德电气为客户提供了多种可根据性能和能效进行微调的变频器。这一双重优势不仅提升了客户的运营效率&…

【mongodb】mongodb副本集的搭建和使用

本站以分享各种运维经验和运维所需要的技能为主 《python零基础入门》&#xff1a;python零基础入门学习 《python运维脚本》&#xff1a; python运维脚本实践 《shell》&#xff1a;shell学习 《terraform》持续更新中&#xff1a;terraform_Aws学习零基础入门到最佳实战 《k8…

连锁门店收银系统源码

近年来&#xff0c;随着移动互联网的快速发展&#xff0c;越来越多的企业开始注重私域流量的积累和管理。尤其针对连锁零售门店尤为重要。自然对收银系统的要求也越来越多&#xff0c;要有丰富的营销活动来助力商户玩转私域营销。今天一起来看看收银系统需要具备哪些营销功能吧…

【C语言】进制转换无难事:二进制、十进制、八进制与十六进制的全解析与实例

目录 C语言进制转换详解精讲文章重点内容汇总表格1. 进制概念基础1.1 二进制&#xff08;Binary&#xff09;1.2 十进制&#xff08;Decimal&#xff09;1.3 八进制&#xff08;Octal&#xff09;1.4 十六进制&#xff08;Hexadecimal&#xff09; 2. 进制之间的相互转换2.1 十…

云WAF最新动态追踪,为您的网络安全防护加码升级!

技术创新引领防护升级 近年来&#xff0c;各大云运营商纷纷推出了基于人工智能和机器学习技术的增强型 WAF&#xff0c;能够在毫秒级内准确识别和抵御 OWASP排名前10的网络安全威胁&#xff0c;其中包括最近出现的各种病毒&#xff0c;保护效率和智能程度都提高到一个新的层次…

遗传算法与深度学习实战——生命模拟及其应用

遗传算法与深度学习实战——生命模拟及其应用 0. 前言1. 康威生命游戏1.1 康威生命游戏的规则1.2 实现康威生命游戏1.3 空间生命和智能体模拟 2. 实现生命模拟3. 生命模拟应用小结系列链接 0. 前言 生命模拟是进化计算的一个特定子集&#xff0c;模拟了自然界中所观察到的自然…

两种字符串有什么区别——字符数组和字符串,初步认识STL

两种字符串有什么区别——字符数组和字符串&#xff0c;初步认识STL 1.字符数组1.1.基础语法介绍1.2.例题1——自动修正题目描述输入格式输出格式输入输出样例输入 #1输出 #1 1.3.另解——getchar和putchar函数1.4.字符数组相关函数 2.字符串2.1.字符串简介和基础功能2.2.例题2…

MATLAB(6)水纹碰撞覆盖地形

前言 在MATLAB中模拟水纹&#xff08;如水波&#xff09;碰撞并覆盖地形的效果涉及到几个复杂的步骤&#xff0c;包括地形的生成、水波的模拟&#xff08;通常使用波动方程&#xff09;以及两者的交互。下面我将给出一个简化的示例&#xff0c;展示如何在MATLAB中创建一个基本的…

学琴笔记

1-初级钢琴入门课程介绍_哔哩哔哩_bilibili 一些乐理记录&#xff1a; 1.5&#xff1a;

大语言模型稀疏水印技术

面对大型语言模型&#xff08;LLMs&#xff09;在假新闻制造与作弊方面的潜在风险&#xff0c;研究者提出了稀疏水印&#xff08;Sparse Watermark&#xff09;——种创新的文本水印技术&#xff0c;旨在监测和追溯LLMs生成的内容。不同于传统水印方法在可检测性与文本质量间的…

CI522一款电动车仪表NFC开发方案

电动车NFC一键启动 NFC智能刷卡解锁&#xff0c;为你解决四处寻找钥匙的困扰&#xff0c;提升电动车智能化。 Ci522是一款高度集成的13.56MHz非接触式读写器芯片&#xff0c;专为电动车NFC一键启动系统&#xff08;包括仪表总成和电源锁&#xff09;而设计。这款芯片支持ISO/IE…