flink cdc2.2.1同步postgresql表

news2025/2/15 23:36:05

目录

  • 简要说明
  • 前置条件
  • maven依赖
  • 样例代码

简要说明

在flink1.14.4 和 flink cdc2.2.1下,采用flink sql方式,postgresql同步表数据,本文采用的是上传jar包,利用flink REST api的方式进行sql执行。

前置条件

1.开启logical
确保你的 postgresql.conf 文件中的相关设置允许逻辑复制和插件的使用。特别是下面几个配置项:
wal_level 应该设置为 logical。
max_replication_slots 需要大于0。
配置文件修改完毕后,重启 PostgreSQL 服务
SHOW wal_level; 命令查看日志等级是否修改
2.创建逻辑复制槽
SELECT * FROM pg_create_logical_replication_slot(‘flink_slot’, ‘pgoutput’);
flink_slot 为槽名
pgoutput 是从PostgreSQL 10开始提供的一个内置输出插件,用于逻辑解码
验证逻辑复制槽:SELECT * FROM pg_replication_slots;
查询逻辑复制状态:SELECT * FROM pg_stat_replication;
3.更改复制标识包含更新和删除之前值(目的是为了确保表 xxxx(tableName) 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 xxxx 表可能无法实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE xxxx REPLICA IDENTITY FULL;
4.修改类加载机制
在flink的flink-conf.yaml文件,classloader.resolve-order: child-first,将 child-first 改为 parent-first

maven依赖

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.14.4</flink.version>
        <flink-cdc.version>2.2.1</flink-cdc.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>
<dependencies>
        <!-- flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>1.14.4</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <!-- flink cdc -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-oracle-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>

        <!-- database driver -->
        <!-- postgresql -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.5</version>
        </dependency>

        <!-- json -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.9.3</version>
        </dependency>

        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>

        <!-- log -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

        <!-- junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

样例代码

sql:
CREATE TABLE `new_table1_37877` (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'debezium.database.tablename.case.insensitive'='false',
'debezium.log.mining.continuous.mine'='true',
'password'='*****',
'hostname'='***.**.**.***',
'debezium.log.mining.strategy'='online_catalog',
'connector'='postgres-cdc',
'port'='5432',
'schema-name'='public',
'database-name'='test',
'table-name'='new_table1',
'username'='******',
'slot.name'='flink_slot',
'decoding.plugin.name'='pgoutput'
);
CREATE TABLE `new_table1_bak_37877` (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'password'='*****',
'connector'='jdbc',
'table-name'='public.new_table1_bak',
'url'='jdbc:postgresql://地址:5432/test',
'username'='用户'
);
insert into new_table1_bak_37877 select * from new_table1_37877;
参数类:
@Data
public class InputOutputParams {

    /**
     * 作业名称
     */
    private String jobName;
    
    /**
     * 代码文本,分号分隔的flink sql语句
     */
    private String codeText;
    
}
main方法:
public class FlinkMain {

    /**
     * flink job 运行入口
     *
     * @param args 运行参数
     */
    public static void main(String[] args) throws IOException {
        if (args == null || args.length == 0) {
            throw new RuntimeException("运行参数为空");
        }

        // 取第一个参数(必须是json字符串)为运行参数
        String json = args[0];
        ObjectMapper objectMapper =
                new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        InputOutputParams params = objectMapper.readValue(json, InputOutputParams.class);

        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 开启快照点,每 3 * 60秒保存一次快照
        env.enableCheckpointing(3 * 60 * 1000L);

        //检查点可容忍失败阈值
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
        //检查点超时时间
        env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);
        // 同一时间只允许一个 checkpoint 进行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 开启在 job 中止后仍然保留的 externalized checkpoints
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 重启策略,最多尝试重启3次,每次重启的时间间隔为20秒
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(20L, TimeUnit.SECONDS)));
        env.setParallelism(1);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        // 获取表执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
        tEnv.getConfig().getConfiguration().setString("pipeline.name", params.getJobName());

         // 执行操作sql
        String codeText = params.getCodeText();
        if (codeText == null || codeText.trim().isEmpty()) {
            throw new RuntimeException("flink sql is empty");
        }
        String[] flinkSqlArr = codeText.split(";");
        for (String flinkSql : flinkSqlArr) {
            if (flinkSql != null && !flinkSql.trim().isEmpty()) {
                tEnv.executeSql(flinkSql);
            }
        }

    }
}

将项目打包成不带依赖的jar

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.10</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <!-- 复制依赖jar包 -->
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <!-- 依赖jar包输出目录 -->
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <!-- main方法所在主类 -->
                            <mainClass>com.test.FlinkMain</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>

然后将lib下的依赖全部拷贝到flink的lib下,将刚才打包好的jar界面上传
在这里插入图片描述
然后通过postman调用flink的REST api接口提交sql,接口文档地址:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/rest_api/
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2298970.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

纪念日倒数日项目的实现-【纪念时刻-时光集】

纪念日/倒数日项目的实现## 一个练手的小项目&#xff0c;uniappnodemysql七牛云。 在如今快节奏的生活里&#xff0c;大家都忙忙碌碌&#xff0c;那些具有特殊意义的日子一不小心就容易被遗忘。今天&#xff0c;想给各位分享一个“纪念日”项目。 【纪念时刻-时光集】 一…

WPF的MVVMLight框架

在NuGet中引入该库&#xff1a; MVVMLight框架中的命令模式的使用&#xff1a; <StackPanel><TextBox Text"{Binding Name}"/><TextBox Text"{Binding Title}"/><Button Content"点我" Command"{Binding ShowCommand…

DeepSeek从入门到精通(清华大学)

​ DeepSeek是一款融合自然语言处理与深度学习技术的全能型AI助手&#xff0c;具备知识问答、数据分析、编程辅助、创意生成等多项核心能力。作为多模态智能系统&#xff0c;它不仅支持文本交互&#xff0c;还可处理文件、图像、代码等多种格式输入&#xff0c;其知识库更新至2…

【DeepSeek】DeepSeek R1 本地windows部署(Ollama+Docker+OpenWebUI)

1、背景&#xff1a; 2025年1月&#xff0c;DeepSeek 正式发布 DeepSeek-R1 推理大模型。DeepSeek-R1 因其成本价格低廉&#xff0c;性能卓越&#xff0c;在 AI 行业引起了广泛关注。DeepSeek 提供了多种使用方式&#xff0c;满足不同用户的需求和场景。本地部署在数据安全、性…

windows平台上 oracle简单操作手册

一 环境描述 Oracle 11g单机环境 二 基本操作 2.1 数据库的启动与停止 启动: C:\Users\Administrator>sqlplus / as sysdba SQL*Plus: Release 11.2.0.4.0 Production on 星期五 7月 31 12:19:51 2020 Copyright (c) 1982, 2013, Oracle. All rights reserved. 连接到:…

【弹性计算】弹性计算的技术架构

弹性计算的技术架构 1.工作原理2.总体架构3.控制面4.数据面5.物理设施层 虽然弹性计算的产品种类越来越多&#xff0c;但不同产品的技术架构大同小异。下面以当前最主流的产品形态 —— 云服务器为例&#xff0c;探查其背后的技术秘密。 1.工作原理 云服务器通常以虚拟机的方…

RAG(检索增强生成)落地:基于阿里云opensearch视线智能问答机器人与企业知识库

文章目录 一、环境准备二、阿里云opensearch准备1、产品文档2、准备我们的数据3、上传文件 三、对接1、对接文本问答 一、环境准备 # 准备python环境 conda create -n opensearch conda activate opensearch# 安装必要的包 pip install alibabacloud_tea_util pip install ali…

【踩坑】pytorch模型导出部署onnx问题记录

问题1&#xff1a;repeat_interleave 无法转译 具体报错为&#xff1a; TypeError: torch._C.Value object is not iterable (Occurred when translating repeat_interleave).原因是我的模型代码中有&#xff1a; batch_indices torch.repeat_interleave(torch.arange(can…

DeepSeek vs ChatGPT:AI对决中的赢家是……人类吗?

DeepSeek vs ChatGPT&#xff1a;AI对决中的赢家是……人类吗&#xff1f; 文章目录 DeepSeek vs ChatGPT&#xff1a;AI对决中的赢家是……人类吗&#xff1f;一、引言1. 背景2. 问题 二、DeepSeek vs ChatGPT&#xff1a;谁更胜一筹&#xff1f;2.1 语言生成能力评测对比场景…

基于ollama搭建本地deepseek大模型服务

基于ollama搭建本地deepseek大模型服务 简介准备工作系统要求ollama的安装ollama 模型ollama 安装流程ollama 如何运行大模型前端部署注意事项简介 本指南旨在帮助初学者在本地环境中设置和运行DeepSeek大模型服务。本文将使用Ollama平台来简化这一过程,确保即使是新手也能顺…

elementUI rules 判断 el-cascader控件修改值未生效

今天修改一个前端项目&#xff0c;增加一个多选字段&#xff0c;使用的是el-cascader控件&#xff0c;因页面是通过引用子页面组件形式使用&#xff0c;出现一个点选后再勾选原有值&#xff0c;输入框内不展示或取消后的也未正常隐藏&#xff0c;如果勾选的值是全新的则其他已选…

讯方·智汇云校华为授权培训机构的介绍

官方授权 华为授权培训服务伙伴&#xff08;Huawei Authorized Learning Partner&#xff0c;简称HALP&#xff09;是获得华为授权&#xff0c;面向公众&#xff08;主要为华为企业业务的伙伴/客户&#xff09;提供与华为产品和技术相关的培训服务&#xff0c;培养华为产业链所…

DeepSeek4j 已开源,支持思维链,自定义参数,Spring Boot Starter 轻松集成,快速入门!建议收藏

DeepSeek4j Spring Boot Starter 快速入门 简介 DeepSeek4j 是一个专为 Spring Boot 设计的 AI 能力集成启动器&#xff0c;可快速接入 DeepSeek 大模型服务。通过简洁的配置和易用的 API&#xff0c;开发者可轻松实现对话交互功能。 环境要求 JDK 8Spring Boot 2.7Maven/Gr…

MySQL数据库误删恢复_mysql 数据 误删

2、BigLog日志相关 2.1、检查biglog状态是否开启 声明: 当前为mysql版本5.7 当前为mysql版本5.7****当前为mysql版本5.7 2.1.1、Navicat工具执行 SHOW VARIABLES LIKE LOG_BIN%;OFF 是未开启状态&#xff0c;如果不是ON 开启状态需要开启为ON。{默认情况下就是关闭状态} 2.…

避雷,Ubuntu通过ollama本地化部署deepseek,open-webui前端显示

0.如题&#xff0c;预期在Ubuntu上本地化部署DeepSeek&#xff0c;通过浏览器访问达到chatgpt的对话效果。 1.裸机&#xff0c;安装Ubuntu。 原有的系统盘采用大白菜&#xff0c;下载24.04.1的镜像&#xff0c;插到电脑上&#xff0c;无法识别&#xff0c;重新查到笔记本&…

HCIA项目实践--静态路由的拓展配置

7.7 静态路由的拓展配置 网络中的两个重要思想&#xff1a; &#xff08;1&#xff09; 实的不行来虚的&#xff1b; &#xff08;2&#xff09; 范围太大&#xff0c;划分范围。&#xff08;分治&#xff09; 7.7.1 负载均衡 &#xff08;1&#xff09;定义 负载均衡是一种网…

缓存三大问题及其解决方案

缓存三大问题及其解决方案 1. 前言 ​ 在现代系统架构中&#xff0c;缓存与数据库的结合使用是一种经典的设计模式。为了确保缓存中的数据与数据库中的数据保持一致&#xff0c;通常会给缓存数据设置一个过期时间。当系统接收到用户请求时&#xff0c;首先会访问缓存。如果缓…

Unity崩溃后信息结合符号表来查看问题

目录 SO文件符号表对调试和分析的重要性调试方面分析方面 错误数据安装Logcat解释符号表设置符号文件路径生成解析 相关参考 SO文件 so 文件&#xff08;Shared Object File&#xff0c;共享目标文件&#xff09;和符号表紧密相关&#xff0c;它们在程序的运行、调试和分析过程…

DeepSeek官方发布R1模型推荐设置

今年以来&#xff0c;DeepSeek便在AI领域独占鳌头&#xff0c;热度一骑绝尘。其官方App更是创造了惊人纪录&#xff0c;成为史上最快突破3000万日活的应用&#xff0c;这一成绩无疑彰显了它在大众中的超高人气与强大吸引力。一时间&#xff0c;各大AI及云服务厂商纷纷投身其中&…

STM32 ADC介绍(硬件原理篇)

目录 背景 AD转换器 采样与保持 量化 编码 AD转换器转换原理 DA转换原理 AD转换原理 1.逐次逼近型AD转换器 2.并联比较型AD转换器 编码器 同步D触发器和边沿D触发器 基本RS触发器 同步RS触发器 同步D触发器 边沿型D触发器&#xff08;维持-阻塞D触发器&#xff…