基于Flink CDC datastream mysql to mysql 序列化sql 数据同步

news2025/1/12 13:23:37

基于Flink CDC datastream mysql to mysql 序列化sql 数据同步

Flink CDC有两种方式同步数据库:

1. 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步;
2. 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行,可以多表多库。

本方案使用DataStream方法,同步两表中的数据。
不需要部署Flink,可单独使用。

主要工作

工程下载连接,点击进入

  • 运行简单,只需要配置源数据库与目标数据库信息,运行MysqlCDC文件中的main函数即可使用。
  • 修复了mysql数据时区问题、修复了deatatime同步到数据库报错问题
  • 使用分为以下三步:

一、 修改源数据库信息

在这里插入图片描述

二、 修改目标数据库信息

在这里插入图片描述

三、 启动服务

在这里插入图片描述
主要源码:

package org.apache.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.jvnet.hk2.annotations.Service;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

/**
 * @author dake
 */
@Service
public class MysqlSink extends RichSinkFunction<String> {
    private Connection connection = null;
    Statement sqlExecute;



    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        try {
            if (connection == null) {
                Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
                connection = DriverManager.getConnection("jdbc:mysql://IP地址:3306/test_target?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=true"
                        , "账号", "密码");//获取连接
                sqlExecute = connection.createStatement();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void invoke(String value, Context context) {
        try {
            sqlExecute.execute(value);
        }catch (Exception e){
            e.printStackTrace();
        }
        System.out.println("value = " + value);
    }

    @Override
    public void close() throws Exception {
        super.close();

        if (connection != null) {
            connection.close();
        }
        super.close();
    }
}
package org.apache.flink;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.config.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;

public class MysqlCDC {
    public static void main(String[] args) throws Exception {

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("IP地址")
                .port(3306)
                .startupOptions(StartupOptions.initial()) //全量同步
                .scanNewlyAddedTableEnabled(true) // 开启支持新增表
                .databaseList("test_master_resource") // set captured database
                .tableList("test_master_resource.test1,test_master_resource.test2") // set captured table
                .username("账号")
                .password("密码")
                .debeziumProperties(getDebeziumProperties())
                .serverTimeZone("Asia/Shanghai")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        Configuration configuration = new Configuration();
        // 生产环境夏下,改成参数传进来
//        configuration.setString("execution.savepoint.path","file:///tmp/flink-ck/1980d53f557a886f885172bcdf4be8e8/chk-21");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

        // enable checkpoint
        env.enableCheckpointing(3000);
        // 设置本地
        env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-ck");
        env
                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(4)
                .addSink(new MysqlSink());
//                .print("==>").setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("Print MySQL Snapshot + Binlog");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/388193.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为机试题:HJ106 字符逆序(python)

文章目录&#xff08;1&#xff09;题目描述&#xff08;2&#xff09;Python3实现&#xff08;3&#xff09;知识点详解1、input()&#xff1a;获取控制台&#xff08;任意形式&#xff09;的输入。输出均为字符串类型。1.1、input() 与 list(input()) 的区别、及其相互转换方…

面试+算法:罗马数字及Excel列名与数字互相转换

概述 算法是一个程序员的核心竞争力&#xff0c;也是面试最重要的考查环节。 试题 判断一个罗马数字是否有效 罗马数字包含七种字符&#xff1a;I&#xff0c;V&#xff0c;X&#xff0c;L&#xff0c;C&#xff0c;D和M&#xff0c;如下 字符数值I1V5X10L50C100D500M1000…

libgdx导入blender模型

具体就是参考 官网 https://libgdx.com/wiki/graphics/3d/importing-blender-models-in-libgdx blender 教程可以看八个案例教程带你从0到1入门blender【已完结】 这里贴一下过程图。 1.初始环境搭建略过。 2.打开blender 选中摄像机和灯光&#xff0c;右键进行删除。 3.选中…

ES 7.7.0 数据迁移

本文使用 elasticdump 做数据迁移&#xff0c;支持在线和离线俩种方式&#xff0c;适用于数据量比较小的情况。 1、Node 安装 由于elasticdump 依赖于 node&#xff0c;首先需要安装下node。 1.1、 Linux 安装 $ wget https://nodejs.org/dist/v10.15.0/node-v10.15.0-linu…

[数据结构]:10-二叉排序树(无头结点)(C语言实现)

目录 前言 已完成内容 二叉排序树实现 01-开发环境 02-文件布局 03-代码 01-主函数 02-头文件 03-BinarySearchTreeCommon.cpp 04-BinarySearchTreeFunction.cpp 结语 前言 此专栏包含408考研数据结构全部内容&#xff0c;除其中使用到C引用外&#xff0c;全为C语言…

基于支持向量机SVM的房价预测,基于支持向量机SVM的回归分析

目录 支持向量机SVM的详细原理 SVM的定义 SVM理论 SVM应用实例,基于SVM的房价预测 支持向量机SVM的详细原理 SVM的定义 支持向量机(support vector machines, SVM)是一种二分类模型,它的基本模型是定义在特征空间上的间隔最大的线性分类器,间隔最大使它有别于感知机;…

【算法题】1958. 检查操作是否合法

插&#xff1a; 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 坚持不懈&#xff0c;越努力越幸运&#xff0c;大家一起学习鸭~~~ 题目&#xff1a; 给你一个下标从 0 开始的 8 x 8 网…

Vue响应式原理————Object.defineProperty()和proxy的用法分享

Vue框架一个比较核心的功能就是我们的数据是响应式的&#xff0c;这样我们在修改数据的时候&#xff0c;页面会自动帮我们更新&#xff0c;那么想要实现这个功能就要实现对一个数据的劫持&#xff0c;即在取值和设置值的同时我们能够检测到即数据劫持。vue2响应式的实现原理所依…

用原生js手写分页功能

分页功能如下&#xff1a; 数据分页显示&#xff0c;每页显示若干条数据&#xff0c;默认当前页码为第一页。例如&#xff1a;每页5条数据&#xff0c;则第一页显示 1-5 条&#xff0c;第二页显示 6-10 条&#xff0c;依此类推。当页码为第一页时&#xff0c;上一页为禁用状态…

IronPDF for .NET 2023.2.4 Crack

适用于 .NET 2023.2.4 的 IronPDF 添加对增量 PDF 保存的支持。 2023 年 3 月 2 日 - 10:23新版本 特征 添加了对 IronPdfEngine Docker 的支持。 添加了对增量 PDF 保存的支持。 重新设计了 PDF 签名和签名。 删除了 iTextSharp 依赖项。 在文本页眉/页脚中添加了 DrawDivider…

laravel8多模块、多应用和多应用路由

1、安装多应用模块 composer require nwidart/laravel-modules2、执行命令&#xff0c;config文件夹下生成一个modules.php配置文件 php artisan vendor:publish --provider"Nwidart\Modules\LaravelModulesServiceProvider"3、修改config文件夹下的modules.php&am…

python爬虫学习之路

【2023.3.3】一、爬虫概念 通过编写程序&#xff0c;模拟浏览器上网&#xff0c;然后让其去互联网上抓取数据的过程。 价值&#xff1a; 抓取互联网上的数据&#xff0c;为我所用&#xff0c;有了大量的数据&#xff0c;就如同有了一个数据银行一样&#xff0c;下一步做的就是如…

程序员怎么写出亮眼的简历?

要随时与正能量的人保持同行&#xff0c;因为他的心一直在靠近远方 什么是简历 我们先看下百度百科对于简历的描述&#xff1a; 可以看出&#xff0c;简历是受法律规定&#xff0c;因为简历本身的信息真实性较高&#xff0c;所以简历一直是早期被市场上进行数据交易的重要载…

windows 下 python 和repo 下载安装环境变量配置

repo 安装成功&#xff0c;但是下载代码 repo init的时候出错 不知道是不是repo windows版本有问题 python 最好下载2.6-2.7版本的 Python Releases for Windows | Python.org 不然下载代码会有问题&#xff0c;下不了&#xff0c;会提示安装2.6-2.7版本的 Windows下成功安…

人机界面艺术设计

人机界面艺术设计 2.1人机界面艺术设计思路 人们经常有意通过某种工具或创造来解决难题&#xff0c;然而这并不意味着人们乐于接受别人或其他事情&#xff0c;他们很难提出问题。在用户使用网页或软件的时候&#xff0c;他们有明确的目标&#xff0c;他们利用电脑来帮助自己达…

Hbase RegionServer的核心模块

RegionServer是HBase系统中最核心的组件&#xff0c;主要负责用户数据写入、读取等基础操作。RegionServer组件实际上是一个综合体系&#xff0c;包含多个各司其职的核心模块&#xff1a;HLog、MemStore、HFile以及BlockCache。 一、RegionServer内部结构 RegionServer是HBas…

Altium Designer PCB孤岛铜的去除方法教程

孤岛铜&#xff0c;也叫死铜&#xff0c;是指在PCB中孤立无连接的铜箔&#xff0c;一般都是在敷铜的时候产生&#xff0c;不利于生产。解决的办法比较简单&#xff0c;可以手工连线将其与同网络的铜箔相连&#xff0c;也可以通过打过孔的方式将其与同网络的铜箔相连。无法解决的…

Biomod2 (下):物种分布模型建模

这里写目录标题1.给出一个线性回归模型并求出因子贡献度2.biomod22.1 pseudo-absences:伪不存在点&#xff08;PA&#xff09;2.1.1 random2.2.2 disk2.2.3 user.defined method3.使用网格划分区域3.1 计算质心4. 完整案例1.给出一个线性回归模型并求出因子贡献度 ##---------…

【游戏逆向】FPS游戏玩家对象数据分析

玩家健康值 查找玩家健康值,玩家健康值是100,但是我们并不知道数值类型,我们可以使用精确搜索方式搜索100-所有类型 CE搜索 结果很多,我们可以使用手雷来减少血量 我们会得到两个结果 我们可以去尝试改变数值,最终发现一个是我们的客户端健康值,一个是服务器健康值,…

Java——N皇后问题

题目链接 leetcode在线oj题——N皇后 题目描述 按照国际象棋的规则&#xff0c;皇后可以攻击与之处在同一行或同一列或同一斜线上的棋子。 n 皇后问题 研究的是如何将 n 个皇后放置在 nn 的棋盘上&#xff0c;并且使皇后彼此之间不能相互攻击。 给你一个整数 n &#xff…