使用Flink实现MySQL到Kafka的数据流转换

news2025/1/18 3:30:32

使用Flink实现MySQL到Kafka的数据流转换

本篇博客将介绍如何使用Flink将数据从MySQL数据库实时传输到Kafka,这是一个常见的用例,适用于需要实时数据connector的场景。
在这里插入图片描述

环境准备

在开始之前,确保你的环境中已经安装了以下软件:
Apache Flink 准备相关pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>EastMoney</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>
    </dependencies>

</project>

MySQL数据库,初始化mysql表

CREATE TABLE `t_stock_code_price` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
  `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
  `close` double DEFAULT NULL COMMENT '最新价',
  `change_percent` double DEFAULT NULL COMMENT '涨跌幅',
  `change` double DEFAULT NULL COMMENT '涨跌额',
  `volume` double DEFAULT NULL COMMENT '成交量(手)',
  `amount` double DEFAULT NULL COMMENT '成交额',
  `amplitude` double DEFAULT NULL COMMENT '振幅',
  `turnover_rate` double DEFAULT NULL COMMENT '换手率',
  `peration` double DEFAULT NULL COMMENT '市盈率',
  `volume_rate` double DEFAULT NULL COMMENT '量比',
  `hign` double DEFAULT NULL COMMENT '最高',
  `low` double DEFAULT NULL COMMENT '最低',
  `open` double DEFAULT NULL COMMENT '今开',
  `previous_close` double DEFAULT NULL COMMENT '昨收',
  `pb` double DEFAULT NULL COMMENT '市净率',
  `create_time` varchar(64) NOT NULL COMMENT '写入时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5605 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

Kafka消息队列

1. 启动zookeeper
 zkServer start
2. 启动kafka服务
 kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 创建topic
 kafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic east_money
4. 消费数据
 kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic east_money --from-beginning

步骤解释

获取流执行环境:首先,我们通过StreamExecutionEnvironment.getExecutionEnvironment获取Flink的流执行环境,并设置其运行模式为流处理模式。

创建流表环境:接着,我们通过StreamTableEnvironment.create创建一个流表环境,这个环境允许我们使用SQL语句来操作数据流。

val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

定义MySQL数据源表:我们使用一个SQL语句创建了一个临时表t_stock_code_price,这个表代表了我们要从MySQL读取的数据结构和连接信息。

val source_table =
      """
        |CREATE TEMPORARY TABLE t_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 't_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin

    tEnv.executeSql(source_table)

定义Kafka目标表:然后,我们定义了一个Kafka表re_stock_code_price_kafka,指定了Kafka的连接参数和表结构。

tEnv.executeSql(
      "CREATE TABLE re_stock_code_price_kafka (" +
        "`id` BIGINT," +
        "`code` STRING," +
        "`name` STRING," +
        "`close` DOUBLE," +
        "`change_percent` DOUBLE," +
        "`change` DOUBLE," +
        "`volume` DOUBLE," +
        "`amount` DOUBLE," +
        "`amplitude` DOUBLE," +
        "`turnover_rate` DOUBLE," +
        "`operation` DOUBLE," +
        "`volume_rate` DOUBLE," +
        "`high` DOUBLE," +
        "`low` DOUBLE," +
        "`open` DOUBLE," +
        "`previous_close` DOUBLE," +
        "`pb` DOUBLE," +
        "`create_time` STRING," +
        "rise int"+
        ") WITH (" +
        "'connector' = 'kafka'," +
        "'topic' = 'east_money'," +
        "'properties.bootstrap.servers' = '127.0.0.1:9092'," +
        "'properties.group.id' = 'mysql2kafka'," +
        "'scan.startup.mode' = 'earliest-offset'," +
        "'format' = 'csv'," +
        "'csv.field-delimiter' = ','" +
        ")"
    )

数据转换和写入:最后,我们执行了一个插入操作,将从MySQL读取的数据转换(这里通过case when语句添加了一个新字段rise)并写入到Kafka中。这个可以实现任何的sql etl 来满足我们的需求。

    tEnv.executeSql("insert into re_stock_code_price_kafka select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")

全部代码

package org.east

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Mysql2Kafka {

  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

    val source_table =
      """
        |CREATE TEMPORARY TABLE t_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 't_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin

    tEnv.executeSql(source_table)

    val result = tEnv.executeSql("select * from t_stock_code_price")
    result.print()


    tEnv.executeSql(
      "CREATE TABLE re_stock_code_price_kafka (" +
        "`id` BIGINT," +
        "`code` STRING," +
        "`name` STRING," +
        "`close` DOUBLE," +
        "`change_percent` DOUBLE," +
        "`change` DOUBLE," +
        "`volume` DOUBLE," +
        "`amount` DOUBLE," +
        "`amplitude` DOUBLE," +
        "`turnover_rate` DOUBLE," +
        "`operation` DOUBLE," +
        "`volume_rate` DOUBLE," +
        "`high` DOUBLE," +
        "`low` DOUBLE," +
        "`open` DOUBLE," +
        "`previous_close` DOUBLE," +
        "`pb` DOUBLE," +
        "`create_time` STRING," +
        "rise int"+
        ") WITH (" +
        "'connector' = 'kafka'," +
        "'topic' = 'east_money'," +
        "'properties.bootstrap.servers' = '127.0.0.1:9092'," +
        "'properties.group.id' = 'mysql2kafka'," +
        "'scan.startup.mode' = 'earliest-offset'," +
        "'format' = 'csv'," +
        "'csv.field-delimiter' = ','" +
        ")"
    )
    tEnv.executeSql("insert into re_stock_code_price_kafka select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")

  }
}

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1557579.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【微服务】OpenFeign+Sentinel集中处理远程调用异常

文章目录 1.微服务基本环境调整1.对10004模块的application.yml调整2.启动nacos以及一个消费者两个提供者3.测试1.输入http://localhost:8848/nacos/index.html 来查看注册情况2.浏览器访问 http://localhost:81/member/nacos/consumer/get/13.结果 2.使用OpenFeign实现微服务模…

wpsword求和操作教程

wpsword求和怎么操作&#xff1a; 1、首先&#xff0c;单纯的数据是无法求和的&#xff0c;所以我们必须要“插入”一个“表格” 2、接着将需要求和的数据填入到表格中。 3、填完后&#xff0c;进入“布局”选项卡。 4、然后打开其中的“公式” 5、在其中选择求和公式“SUM”并…

【C语言】Infiniband驱动mlx4_reset

一、注释 这个 mlx4_reset 函数负责重置 Mellanox 设备。它保存了设备的 PCI 头信息&#xff0c;然后重置了设备&#xff0c;之后还原保存的 PCI 头信息。请注意&#xff0c;该函数是用英文注释的&#xff0c;下面提供中文注释的版本。以下是该函数的流程&#xff1a; 1. 为保…

制造出海,灵途科技助力割草机器人、泳池清洁机器人全方位感知

近年来&#xff0c;越来越多的中国企业开始对外开拓&#xff0c;走向海外市场、挖掘和满足全球消费者的需求。在消费机器人领域&#xff0c;中国企业出海成绩亮眼&#xff01;在2024 ces 和上海AWE展会上&#xff0c;多家机器人公司展示了家用智能割草机器人、泳池清洁机器人的…

vue2 el-table指定某些数据不参与排序

vue2 el-table指定某些数据不参与排序 1、需求描述2、配置属性方法3、详细代码如下 1、需求描述 最后一行总计不参与排序 2、配置属性方法 el-table 需要配置 sort-change"soltHandle" 方法 el-table-column 需要配置 sortable"custom"属性3、详细代码如…

牛客周赛 Round 38(A,B,C,D,E,F,G)

比赛链接 官方讲解&#xff08;不分P不分段直接两小时怼上来是坏文明 &#xff09; 这场的题很棒&#xff0c;思维有难度&#xff0c;考察的知识点广泛&#xff0c;有深度&#xff0c;很透彻。感觉学到了很多。建议补题。 A 小红的正整数自增 思路&#xff1a; 签到。 可以…

uniapp开发微信小程序分包问题

现象 当我们开发完成小程序后&#xff0c;上传时&#xff0c;出现上传失败&#xff0c;此时就需要我们进行分包处理。 1.未分包之前 我们可以点击本地代码&#xff0c;进行查看 可以看到都是主包&#xff0c;表示没有进行分包处理。 2.在HBuilder X中点击源码视图 3.在mp-we…

rabbitMQ版本问题与下载

都到现在了&#xff0c;大家不会安装东西还是不看版本吧 云服务器买的是centos7&#xff0c;而erlang在24版本后不支持centos7了 所以需要找24版本以下的erlang&#xff0c;而不同erlang对应不同rabbitmq所以需要对应 下载erlang 说实话&#xff0c;自己安装&#xff0c;还是…

机器学习 - 创建多类别的数据

可以用到 scilit-learn 里的 make_blobs() 方法。这个方法用于生成聚类数据集&#xff0c;也用于测试和调试聚类算法。 import torch import matplotlib.pyplot as plt from sklearn.datasets import make_blobs from sklearn.model_selection import train_test_split NUM…

Mybatis-特殊SQL的执行

1. 模糊查询 在MyBatis中进行模糊查询时&#xff0c;有以下三种常见的实现方式&#xff1a; 1.1. 错误示范 先来个准备操作&#xff0c;并做一个错误示例 根据姓名&#xff0c;模糊查询用户&#xff0c;(x小x) 更新数据表 SQLMapper.java package com.sakurapaid.mybatis3…

pytest--python的一种测试框架--pytest初阶

前言 使用pytest去做测试时我们对文件名的命名其实是有规范的&#xff0c;要用test_开头&#xff01;&#xff01;&#xff01; 一、pytest初阶 def test_one():expect1actual1assert expectactual#测试专用语句&#xff1a;assert&#xff0c;识别期望与实际值是否相等这个…

Vue系列-el挂载

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>el:挂载点</title> </head> <body&g…

Mac上Matlab_R2023b ARM 版 启动闪退(意外退出)解决方法

安装好后&#xff0c;使用 "libmwlmgrimpl.dylib" 文件替换掉"/Applications/Matlab_R2023b.app/bin/maca64/matlab_startup_plugins/lmgrimpl"文件夹下的同名文件 在终端下执行如下命令&#xff1a; codesign --verbose --force --deep -s - /Applicat…

npm淘宝镜像源更新

目录 前情提要&#xff1a; 背景&#xff1a; 镜像源更新&#xff1a; 清楚缓存&#xff1a; 直接切换镜像源&#xff1a; 注&#xff1a;npm 补充&#xff1a; 错误解释&#xff1a; 解决方法&#xff1a; 前情提要&#xff1a; 2024 /1 /22 &#xff0c;registry.npm…

Python面对对象 - 类的反射机制

Python面对对象类的反射机制是面向对象编程语言中比较重要的功能&#xff0c;可以动态获取对象信息以及动态调用对象。通过字符串形式的类名或属性来访问对应类或属性。 一、对象的反射 1. getattr 获取指定字符串名称的对象属性、方法&#xff1a; 当访问的属性不存在时&#…

IPv6-重定向,PMTU,GRE隧道

IPv6-重定向&#xff0c;PMTU&#xff08;路径最大传输单元&#xff09;&#xff0c;GRE隧道&#xff08;Generic Routing Encapsulation&#xff0c;通用路由封装协议&#xff09; 重定向过程 触发重定向的条件&#xff1a; 1、报文的入接口&#xff0c;等于自身路由之后的…

深入理解SQLite:存储引擎、索引、事务与锁

文章目录 一、存储引擎二、索引的数据结构和类型2.1 B-Tree2.2 其他类型的索引2.3 小结 三、事务处理中的一致性问题3.1 脏读&#xff08;Dirty Read&#xff09;3.2 不可重复读&#xff08;Non-repeatable Read&#xff09;3.3 幻读&#xff08;Phantom Read&#xff09;3.4 小…

RVM安装Ruby笔记(Mac)

环境 硬件&#xff1a;Macbook Pro 系统&#xff1a;macOS 14.1 安装公钥 通过gpg安装公钥失败&#xff0c;报错如下&#xff1a; 换了几个公钥地址&#xff08;hkp://subkeys.pgp.net&#xff0c;hkp://keys.gnupg.net&#xff0c;hkp://pgp.mit.edu&#xff09;&#xff0c;…

mac怎么删除python

mac 默认安装了python2&#xff1b;自己后面又安装了python3&#xff1b;为了方便&#xff0c;现在想将python3换成Anaconda3。 Anaconda是一个开源的Python发行版本&#xff0c;其包含了conda、Python等180多个科学包及其依赖项。 Python3安装之后&#xff0c;在系统中不同目…

记录el-table的表格合并问题

项目需求背景&#xff1a; 利用el-table进行数据展示时&#xff0c;时常会有需要表格合并的情景&#xff0c;比如一个表格由5列组成&#xff1a; 类型 地区 金额 重量 长度 在这个表格里&#xff0c;如果同金额、重量的列可以合并到一起&#xff0c;这时应该怎么做呢&#xff1…