使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

news2024/11/23 9:39:18

使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

CSDN开发云
实现数据同步的ETL(抽取、转换、加载)过程通常涉及从源系统(如数据库、消息队列或文件)中抽取数据,进行必要的转换,然后将数据加载到目标系统(如另一个数据库或数据仓库)。在这里,我们将展示如何使用Apache Flink来实现一个从MySQL或Oracle数据库抽取数据并同步到另一个MySQL数据库的ETL过程。

  • 1. 从源数据库(MySQL和Oracle)实时抽取数据
  • 2. 对数据进行清洗和转换
  • 3. 将转换后的数据写入目标数据库(MySQL)
    请添加图片描述

我们将使用Apache Flink来实现这个流程。Flink具有强大的数据流处理能力,适合处理实时数据同步和转换任务。

环境准备

  • 确保MySQL和Oracle数据库运行**,并创建相应的表。
  • 创建Spring Boot项目,并添加Flink、MySQL JDBC、和Oracle JDBC驱动的依赖。

第一步:创建源和目标数据库表

假设我们有以下三个表:

  • source_mysql_table(MySQL中的源表)
  • source_oracle_table(Oracle中的源表)
  • target_table(目标MySQL表)

MySQL源表

CREATE DATABASE source_mysql_db;
USE source_mysql_db;

CREATE TABLE source_mysql_table (
    id INT AUTO_INCREMENT PRIMARY KEY,
    user_id VARCHAR(255) NOT NULL,
    action VARCHAR(255) NOT NULL,
    timestamp VARCHAR(255) NOT NULL
);

Oracle源表

CREATE TABLE source_oracle_table (
    id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY,
    user_id VARCHAR2(255) NOT NULL,
    action VARCHAR2(255) NOT NULL,
    timestamp VARCHAR2(255) NOT NULL,
    PRIMARY KEY (id)
);

目标MySQL表

CREATE DATABASE target_db;
USE target_db;

CREATE TABLE target_table (
    id INT AUTO_INCREMENT PRIMARY KEY,
    user_id VARCHAR(255) NOT NULL,
    action VARCHAR(255) NOT NULL,
    timestamp VARCHAR(255) NOT NULL
);

第二步:添加项目依赖

在pom.xml中添加Flink、MySQL和Oracle相关的依赖:

<dependencies>
    <!-- Spring Boot dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <!-- Apache Flink dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>

    <!-- MySQL JDBC driver -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.23</version>
    </dependency>

    <!-- Oracle JDBC driver -->
    <dependency>
        <groupId>com.oracle.database.jdbc</groupId>
        <artifactId>ojdbc8</artifactId>
        <version>19.8.0.0</version>
    </dependency>
</dependencies>

第三步:编写Flink ETL任务

创建一个Flink任务类来实现ETL逻辑。

创建一个POJO类表示数据结构

package com.example.flink;

public class UserAction {
    private int id;
    private String userId;
    private String action;
    private String timestamp;

    // Getters and setters
    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }
}

编写Flink任务类

package com.example.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

@Component
public class FlinkETLJob implements CommandLineRunner {

    @Override
    public void run(String... args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从MySQL读取数据
        DataStream<UserAction> mysqlDataStream = env.addSource(new MySQLSource());

        // 从Oracle读取数据
        DataStream<UserAction> oracleDataStream = env.addSource(new OracleSource());

        // 合并两个数据流
        DataStream<UserAction> mergedStream = mysqlDataStream.union(oracleDataStream);

        // 清洗和转换数据
        DataStream<UserAction> transformedStream = mergedStream.map(new MapFunction<UserAction, UserAction>() {
            @Override
            public UserAction map(UserAction value) throws Exception {
                // 进行清洗和转换
                value.setAction(value.getAction().toUpperCase());
                return value;
            }
        });

        // 将数据写入目标MySQL数据库
        transformedStream.addSink(new MySQLSink());

        // 执行任务
        env.execute("Flink ETL Job");
    }

    public static class MySQLSource implements SourceFunction<UserAction> {
        private static final String JDBC_URL = "jdbc:mysql://localhost:3306/source_mysql_db";
        private static final String JDBC_USER = "source_user";
        private static final String JDBC_PASSWORD = "source_password";
        private volatile boolean isRunning = true;

        @Override
        public void run(SourceContext<UserAction> ctx) throws Exception {
            try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {
                while (isRunning) {
                    String sql = "SELECT * FROM source_mysql_table";
                    try (PreparedStatement statement = connection.prepareStatement(sql);
                         ResultSet resultSet = statement.executeQuery()) {
                        while (resultSet.next()) {
                            UserAction userAction = new UserAction();
                            userAction.setId(resultSet.getInt("id"));
                            userAction.setUserId(resultSet.getString("user_id"));
                            userAction.setAction(resultSet.getString("action"));
                            userAction.setTimestamp(resultSet.getString("timestamp"));
                            ctx.collect(userAction);
                        }
                    }
                    Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次
                }
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    public static class OracleSource implements SourceFunction<UserAction> {
        private static final String JDBC_URL = "jdbc:oracle:thin:@localhost:1521:orcl";
        private static final String JDBC_USER = "source_user";
        private static final String JDBC_PASSWORD = "source_password";
        private volatile boolean isRunning = true;

        @Override
        public void run(SourceContext<UserAction> ctx) throws Exception {
            try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {
                while (isRunning) {
                    String sql = "SELECT * FROM source_oracle_table";
                    try (PreparedStatement statement = connection.prepareStatement(sql);
                         ResultSet resultSet = statement.executeQuery()) {
                        while (resultSet.next()) {
                            UserAction userAction = new UserAction();
                            userAction.setId(resultSet.getInt("id"));
                            userAction.setUserId(resultSet.getString("user_id"));
                            userAction.setAction(resultSet.getString("action"));
                            userAction.setTimestamp(resultSet.getString("timestamp"));
                            ctx.collect(userAction);
                        }
                    }
                    Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次
                }
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    public static class MySQLSink extends RichFlatMapFunction<UserAction, Void> {
        private static final String JDBC_URL = "jdbc:mysql://localhost:3306/target_db";
        private static final String JDBC_USER = "target_user";
        private static final String JDBC_PASSWORD = "target_password";
        private transient Connection connection;
        private transient PreparedStatement statement;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
            String sql = "INSERT INTO target_table (user_id, action, timestamp) VALUES (?, ?, ?)";
            statement = connection.prepareStatement(sql);
        }

        @Override
        public void flatMap(UserAction value, Collector<Void> out) throws Exception {
            statement.setString(1, value.getUserId());
            statement.setString(2, value.getAction());
            statement.setString(3, value.getTimestamp());
            statement.executeUpdate();
        }

        @Override
        public void close() throws Exception {
            super.close();
            if (statement != null) {
                statement.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

第四步:配置Spring Boot

在application.properties中添加必要的配置:

# Spring Boot configuration
server.port=8080

第五步:运行和测试

  • 启动MySQL和Oracle数据库:确保你的源和目标数据库已经运行,并且创建了相应的数据库和表。
  • 启动Spring Boot应用:启动Spring Boot应用程序,会自动运行Flink ETL任务。
  • 测试Flink ETL任务:插入一些数据到源数据库的表中,验证数据是否同步到目标数据库的表中。

总结

通过上述步骤,你可以在Spring Boot项目中集成Flink并实现实时数据同步和ETL流程。这个示例展示了如何从MySQL和Oracle源数据库实时抽取数据,进行数据清洗和转换,并将结果加载到目标MySQL数据库中。根据你的具体需求,你可以扩展和修改这个示例,处理更复杂的数据转换和加载逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1834783.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于VTK9.3.0+Visual Studio2017 c++实现DICOM影像MPR多平面重建

开源库&#xff1a;VTK9.3.0 开发工具&#xff1a;Visual Studio2017 开发语言&#xff1a;C 实现过程&#xff1a; void initImageActor(double* Matrix, double* center, vtkSmartPointer<vtkImageCast> pImageCast,vtkSmartPointer<vtkImageReslice> imageRe…

Python 库PySpark,一个超级强大的数据处理引擎

目录 01初识 PySpark 为什么选择 PySpark? 安装 PySpark 配置 PySpark 02基本操作 创建 RDD 基本 RDD 操作 03DataFrame 和 Spark SQL 创建 DataFrame 基本 DataFrame 操作 使用 Spark SQL 04机器学习与流处理 …

MacOS - 3 招快速去除桌面上的图标文件

在平时用 Mac 电脑的时候&#xff0c;会产生许多我们不用的或废弃的图标、文件&#xff0c;在 Mac 桌面上显得很乱&#xff0c;不仅影响美观也直接影响了我们工作的心情。下面我们分享 3 招快速去除桌面上的图标或文件的方法&#xff0c;有需要的朋友可以试一试。 1. 右键删除&…

QPushButton、QCheckBox、QRadioPutton、QLineEdit用法

实现LineEdit 文本的 居左、居中、居右设置 实现LineEdit 文本的粗体、斜体、下划线设置 实现LineEdit 控件的 ReadOnly、Enable、ClearButtonEnable的设置 创建资源文件&#xff0c;引入button需要的icon 总体布局 窗体使用垂直布局&#xff0c;每个组合控件内部是水平布局 2个…

游泳耳机品牌排行榜,10大实力超群的游泳耳机分享!

在当今快节奏的生活中&#xff0c;运动已成为许多人不可或缺的一部分&#xff0c;不仅为了健康&#xff0c;也是释放压力、提升生活品质的有效方式。而随着科技与健身的深度融合&#xff0c;智能穿戴设备尤其是专为运动设计的耳机&#xff0c;正逐渐成为运动爱好者的新宠。对于…

nodejs爬虫小红书评论区

发现好像还是爬虫的知识热度比较高&#xff0c;最近一直在加强JS这块。这两天脚本模拟爬BOSS的时候也想着怎么用nodejs&#xff0c;昨天都没更新文章&#xff0c;Q-Q&#xff0c;因为一直failed没啥成果。 使用模块 这边可以看到使用的模块其实也挺多&#xff0c;但主要还是ht…

vue大作业-实现学校官网

vue大作业-实现学校官网 基于vue2实现的学校官网 项目展示 学校官网介绍 欢迎访问我们学校的官方网站&#xff0c;这里为您提供了全面的信息和资源&#xff0c;帮助您更好地了解我们的教育理念、教学资源和学术活动。 首页 首页是您了解我们学校的起点。这里展示了学校的最…

单元测试的思考与实践

1. 什么是单元测试 通常来说单元测试&#xff0c;是一种自动化测试&#xff0c;同时包含一下特性&#xff1a; 验证很小的一段代码&#xff08;业务意义 或者 代码逻辑 上不可再分割的单元&#xff09;&#xff0c;能够更准确的定位到问题代码的位置 能够快速运行&#xff08;…

初始化一个Android项目时,Android Studio会自动生成一些文件和目录结构,以帮助你快速上手开发

当你初始化一个Android项目时&#xff0c;Android Studio会自动生成一些文件和目录结构&#xff0c;以帮助你快速上手开发。这些文件和目录各自有其特定的功能和用途。下面我为你解释一下这些自动生成的内容&#xff1a; 1. app 目录 这是你的应用模块的根目录&#xff0c;包…

C++之模板(三)

1、缺省模板参数 可以将数据结构类型传递进来&#xff0c;比如vectop<T>&#xff08;如果没传就是默认&#xff09; 把vector当作类型参数来传递&#xff0c;从而使用它的接口然后适配出新的接口。实际上这个Stack称为适配器。有时候可能需要vector&#xff0c;但是又需…

深入解析知识付费平台的核心功能模块:满足个性化学习需求的数字化教育新星

在数字化学习的大潮中&#xff0c;知识付费平台已成为教育行业的一颗新星&#xff0c;它以满足用户需求为核心&#xff0c;提供便捷高效的学习渠道。该平台汇聚了各类专业知识&#xff0c;覆盖职业技能、生活兴趣和人文社科等多个领域&#xff0c;满足不同用户的学习需求。同时…

【二】【QT开发应用】QMake和CMake介绍,GN,QT三个窗口类的区别,QMainWindow, QWidget,QDialog

QMake和CMake介绍 qmake&#xff1a;qt独有的代码构建工具, 是一种简洁的构建工具&#xff0c;主要用于生成 Qt 项目的跨平台编译配置文件&#xff0c;语法简单&#xff0c;适合小型和中型项目。 cmake&#xff1a;C通用的代码构建工具&#xff0c;绝大部分C开源项目都使用cm…

MySQL 8.0 版本更新 要点 列表 (8.0-8.0.23)

开头还是介绍一下群&#xff0c;如果感兴趣 PolarDB ,MongoDB ,MySQL ,PostgreSQL ,Redis, Oceanbase, Sql Server 等有问题&#xff0c;有需求都可以加群群内有各大数据库行业大咖&#xff0c;可以解决你的问题。加群请联系 liuaustin3 &#xff0c;&#xff08;共 2370 人左右…

楼顶气膜体育馆建设的关键问题解析—轻空间

随着城市化进程的加快和土地资源的日益紧张&#xff0c;楼顶气膜体育馆作为一种新兴的建筑形式备受关注。其轻盈美观、节省用地、施工便捷等特点&#xff0c;使其成为城市空间利用的理想选择。那么&#xff0c;在楼顶建设气膜体育馆有哪些关键问题需要考虑呢&#xff1f; 一、楼…

Simulink代码生成: 状态机的其他建模方法

本文研究状态机建模的一些方法和技巧。 文章目录 1 引入2 状态机建模方法2.1 状态机中的计时2.2 状态机中的计数2.3 转移顺序 3 总结 1 引入 博主一直很喜欢用Simulink中的状态机建模&#xff0c;在这里想记录一下自己平时使用Stateflow建模的心得。因为自身行业所限&#xff…

LayUI使用(二)处理表格会出现下拉框的问题

一、问题描述 如下&#xff0c;layui的表格渲染后&#xff0c;当鼠标悬停在表格项时会出现右侧的下拉框&#xff0c;layui版本较老&#xff0c;原因未知 二、处理办法 在cols里面加上width&#xff0c;也不用每个都加&#xff0c;加一部分表格项即可 注意&#xff1a;若想禁止…

全功能知识付费小程序源码系统 界面支持万能DIY装修 带完整的安装代码包以及搭建部署教程

系统概述 在当今数字化时代&#xff0c;知识付费已经成为一种重要的商业模式。为了满足市场对于便捷、高效、个性化的知识付费解决方案的需求&#xff0c;小编给大家分享一款全功能知识付费小程序源码系统。这一系统不仅具备界面支持万能 DIY 装修的独特优势&#xff0c;还配备…

推荐系统三十六式学习笔记:原理篇.矩阵分解11|facebook是怎么为十亿人互相推荐好友的?

目录 回顾矩阵分解交替最小二乘原理&#xff08;ALS&#xff09;隐式反馈推荐计算总结 上一篇中&#xff0c;我们聊到了矩阵分解&#xff0c;在这篇文章的开始&#xff0c;我再为你回顾一下矩阵分解。 回顾矩阵分解 矩阵分解要将用户物品评分矩阵分解成两个小矩阵&#xff0c…

帕金森患者在饮食上需要注意什么

帕金森病患者在饮食上应该遵循以下几个基本原则&#xff1a; 饮食清淡&#xff1a;应多吃新鲜的水果和蔬菜&#xff0c;如苹果、芹菜、菠菜等&#xff0c;以补充维生素和促进胃肠道蠕动。营养均衡&#xff1a;应多吃富含优质蛋白的食物&#xff0c;如鸡蛋、牛奶&#xff0c;以…

需要用来做3D家具展示的软件哪个网站更专业?

国内外的3D家具展示软件网站并且值得推荐的也就那么几家&#xff1a; 1、Cedreo&#xff0c;Cedreo 是一个在线3D家居设计平台&#xff0c;适合专业的房屋建筑商、改造商和室内设计师。它允许用户创建2D和3D平面图以及室内外效果图&#xff0c;拥有7000多件可定制的3D家具、材…