使用Flink实现Kafka到MySQL的数据流转换:一个基于Flink的实践指南

news2025/1/18 13:56:59

使用Flink实现Kafka到MySQL的数据流转换

在现代数据处理架构中,Kafka和MySQL是两种非常流行的技术。Kafka作为一个高吞吐量的分布式消息系统,常用于构建实时数据流管道。而MySQL则是广泛使用的关系型数据库,适用于存储和查询数据。在某些场景下,我们需要将Kafka中的数据实时地写入到MySQL数据库中,本文将介绍如何使用Apache Flink来实现这一过程。

在这里插入图片描述

环境准备

在开始之前,请确保你的开发环境中已经安装并配置了以下组件:
Apache Flink 准备相关pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>EastMoney</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>
    </dependencies>

</project>

Kafka消息队列

1. 启动zookeeper
 zkServer start
2. 启动kafka服务
 kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 创建topic
 kafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic east_money
6. 生产数据
 kafka-console-producer --broker-list localhost:9092 --topic east_money

MySQL数据库
初始化mysql表

CREATE TABLE `t_stock_code_price` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
  `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
  `close` double DEFAULT NULL COMMENT '最新价',
  `change_percent` double DEFAULT NULL COMMENT '涨跌幅',
  `change` double DEFAULT NULL COMMENT '涨跌额',
  `volume` double DEFAULT NULL COMMENT '成交量(手)',
  `amount` double DEFAULT NULL COMMENT '成交额',
  `amplitude` double DEFAULT NULL COMMENT '振幅',
  `turnover_rate` double DEFAULT NULL COMMENT '换手率',
  `peration` double DEFAULT NULL COMMENT '市盈率',
  `volume_rate` double DEFAULT NULL COMMENT '量比',
  `hign` double DEFAULT NULL COMMENT '最高',
  `low` double DEFAULT NULL COMMENT '最低',
  `open` double DEFAULT NULL COMMENT '今开',
  `previous_close` double DEFAULT NULL COMMENT '昨收',
  `pb` double DEFAULT NULL COMMENT '市净率',
  `create_time` varchar(64) NOT NULL COMMENT '写入时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5605 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

步骤解释

获取流执行环境:首先,我们通过StreamExecutionEnvironment.getExecutionEnvironment获取Flink的流执行环境,并设置其运行模式为流处理模式。

创建流表环境:接着,我们通过StreamTableEnvironment.create创建一个流表环境,这个环境允许我们使用SQL语句来操作数据流。

val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

定义Kafka数据源表:我们使用一个SQL语句创建了一个Kafka表re_stock_code_price_kafka,这个表代表了我们要从Kafka读取的数据结构和连接信息。

tEnv.executeSql(
      "CREATE TABLE re_stock_code_price_kafka (" +
        "`id` BIGINT," +
        "`code` STRING," +
        "`name` STRING," +
        "`close` DOUBLE NULL," +
        "`change_percent` DOUBLE," +
        "`change` DOUBLE," +
        "`volume` DOUBLE," +
        "`amount` DOUBLE," +
        "`amplitude` DOUBLE," +
        "`turnover_rate` DOUBLE," +
        "`operation` DOUBLE," +
        "`volume_rate` DOUBLE," +
        "`high` DOUBLE ," +
        "`low` DOUBLE," +
        "`open` DOUBLE," +
        "`previous_close` DOUBLE," +
        "`pb` DOUBLE," +
        "`create_time` STRING," +
        "rise int"+
        ") WITH (" +
        "'connector' = 'kafka'," +
        "'topic' = 'east_money'," +
        "'properties.bootstrap.servers' = '127.0.0.1:9092'," +
        "'properties.group.id' = 'mysql2kafka'," +
        "'scan.startup.mode' = 'earliest-offset'," +
        "'format' = 'csv'," +
        "'csv.field-delimiter' = ','" +
        ")"
    )

    val result = tEnv.executeSql("select * from re_stock_code_price_kafka")

定义MySQL目标表:然后,我们定义了一个MySQL表re_stock_code_price,指定了与MySQL的连接参数和表结构。

val sink_table: String =
      """
        |CREATE TEMPORARY TABLE re_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  rise int,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 're_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin
    tEnv.executeSql(sink_table)

数据转换和写入:最后,我们执行了一个插入操作,将从Kafka读取的数据转换并写入到MySQL中。

tEnv.executeSql("insert into re_stock_code_price select * from re_stock_code_price_kafka")

result.print()

全部代码

package org.east

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Kafka2Mysql {
  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

    tEnv.executeSql(
      "CREATE TABLE re_stock_code_price_kafka (" +
        "`id` BIGINT," +
        "`code` STRING," +
        "`name` STRING," +
        "`close` DOUBLE NULL," +
        "`change_percent` DOUBLE," +
        "`change` DOUBLE," +
        "`volume` DOUBLE," +
        "`amount` DOUBLE," +
        "`amplitude` DOUBLE," +
        "`turnover_rate` DOUBLE," +
        "`operation` DOUBLE," +
        "`volume_rate` DOUBLE," +
        "`high` DOUBLE ," +
        "`low` DOUBLE," +
        "`open` DOUBLE," +
        "`previous_close` DOUBLE," +
        "`pb` DOUBLE," +
        "`create_time` STRING," +
        "rise int"+
        ") WITH (" +
        "'connector' = 'kafka'," +
        "'topic' = 'east_money'," +
        "'properties.bootstrap.servers' = '127.0.0.1:9092'," +
        "'properties.group.id' = 'mysql2kafka'," +
        "'scan.startup.mode' = 'earliest-offset'," +
        "'format' = 'csv'," +
        "'csv.field-delimiter' = ','" +
        ")"
    )

    val result = tEnv.executeSql("select * from re_stock_code_price_kafka")


    val sink_table: String =
      """
        |CREATE TEMPORARY TABLE re_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  rise int,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 're_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin
    tEnv.executeSql(sink_table)
    tEnv.executeSql("insert into re_stock_code_price select * from re_stock_code_price_kafka")


    result.print()
    print("数据打印完成!!!")
  }
}

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1557463.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微信小程序如何进行npm导入组件

文章目录 目录 文章目录 前言 一、安装node 二、微信小程序通过npm安装组件&#xff08;以Vant-weapp为例&#xff09; 一、Vant-weapp下载 二 、修改 app.json 三 、修改 project.config.json 四 、 构建 npm 包 前言 微信小程序使用npm导入有很多的教程&#xff0c;我…

Vue_08事件处理

最近在学一点前端的Vue。这篇文章来说说Vue中事件处理 我理解的事件处理就是说能够让用户与我们的系统实现交互操作&#xff0c;我们人发出的动作就是事件&#xff0c;我们需要编写vue来处理我们人类发出的事件。 一、事件处理基础 v-on指令 这里使用的是使用按钮触发一个弹…

Kafka入门到实战-第四弹

Kafka入门到实战 Kafka集群搭建官网地址Kafka概述使用Kraft搭建Kafka集群更新计划 Kafka集群搭建 官网地址 声明: 由于操作系统, 版本更新等原因, 文章所列内容不一定100%复现, 还要以官方信息为准 https://kafka.apache.org/Kafka概述 Apache Kafka 是一个开源的分布式事件…

计算机网络——31数据链路层和局域网引论和服务

数据链路层和局域网 WAN&#xff1a;网络形式采用点到点链路 带宽大&#xff0c;距离远&#xff08;延迟大&#xff09; 贷款延迟积大 如果采用多点连接方式 竞争方式&#xff1a;一旦冲突代价大令牌等协调方式&#xff1a;在其中协调节点的发送代价大 点到点链路的链路层服…

学习日记(SSM整合流程_SpringMVC_part_two)

目录 大致流程如下 1、创建工程 2、SSM配置类结构 3、功能模块 代码部分 整体结构 Jdbc.Config MyBatisConfig ServletConfig SpringConfig SpringMvcConfig BookController BookDao Book BusinessException SystemException Cord Result BookService BookserviceImpl jd…

手写红黑树【数据结构】

手写红黑树【数据结构】 前言版权推荐手写红黑树一、理论知识红黑树的特征增加删除 二、手写代码初始-树结点初始-红黑树初始-遍历初始-判断红黑树是否有效查找增加-1.父为黑&#xff0c;直接插入增加-2. 父叔为红&#xff0c;颜色调换增加-3. 父红叔黑&#xff0c;颜色调换&am…

java Web洗衣店管理系统用eclipse定制开发mysql数据库BS模式java编程jdbc

一、源码特点 JSP 洗衣店管理系统是一套完善的web设计系统&#xff0c;对理解JSP java 编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,eclipse开发&#xff0c;数据库为Mysql5.0&#xff0c;使用…

优化选址问题 | 基于帝国企鹅算法求解工厂-中心-需求点三级选址问题含Matlab源码

目录 问题代码问题 "帝国企鹅算法"并不是一个广为人知的优化算法,可能是一个特定领域或者特定情境下提出的方法。不过,对于工厂-中心-需求点三级选址问题,它可能是一种启发式优化方法,用于在多个候选位置中选择最优的工厂、中心和需求点位置。 这类问题通常涉及…

IPv4地址

IP v4 由32位二进制构成、可以用点分十进制表示。 例如&#xff1a;192.168.1.1 11000000101010000000000100000001 由网络位和主机位组成。为了区分网络位和主机位&#xff0c;需要用子网掩码&#xff0c;子网掩码也是由32位二进制构成&#xff0c;连续的1对应网络位&#…

PHP的定时任务框架的taskPHP3.0学习记录2(环境要求、配置Redis、crontab执行时间语法、命令操作以及Screen全屏窗口管理器)

环境要求 php版本> 5.5开启socket扩展开启pdo扩展开启shmop扩展 echo <pre>; echo --; $requiredVersion 5.6.0; $currentVersion phpversion(); if (version_compare($currentVersion, $requiredVersion, >)) {echo "1.PHP版本满足要求&#xff0c;当前版…

【笔记】动⼿学深度学习(花书)|| Aston Zhang Mu Li Zachary C. LiptonAlexander J. Smola

系列文章目录 提示&#xff1a;这里可以添加系列文章的所有文章的目录&#xff0c;目录需要自己手动添加 前言 第一章 深度学习简介 第二章 P 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 系列文章目录前言本书…

python实战之异常篇

一. try-except语句 二. 多个except代码块 三. 多重异常捕获 四. try-except语句嵌套 五. 使用finally代码块释放资源 六. 自定义异常类 七. 手动引发异常 使用关键字 raise 手动抛异常

Android RecyclerView canScrollVertically方向与返回值,Kotlin

Android RecyclerView canScrollVertically方向与返回值&#xff0c;Kotlin import android.os.Bundle import android.util.Log import android.view.LayoutInflater import android.view.View import android.view.ViewGroup import android.widget.TextView import androidx…

Node | Node.js 版本升级

目录 Step1&#xff1a;下载 Step2&#xff1a;安装 Step3&#xff1a;换源 发现其他博客说的 n 模块不太行&#xff0c;所以老老实实地手动安装 Step1&#xff1a;下载 Node 中文官网&#xff1a;https://nodejs.cn/download 点击后&#xff0c;将会下载得到一个 .msi 文件…

Go-React做一个todolist(服务端)【一】项目初始化

后端仓库地址 地址 项目依赖 # gin go get -u github.com/gin-gonic/gin # viper日志 go get -u github.com/spf13/viper # 数据库和gorm go get -u gorm.io/driver/mysql go get -u gorm.io/gorm # uuid go get -u github.com/google/uuid # token go get -u github.com/go…

蓝桥杯2015年第十三届省赛真题-三羊献瑞

一、题目 观察下面的加法算式&#xff1a; 祥 瑞 生 辉 三 羊 献 瑞 ---------------------- 三 羊 生 瑞 气 (如果有对齐问题&#xff0c;可以参看【图1】) 其中&#xff0c;相同的汉字代表相同的数字&#xff0c;不同的汉字代表不同的数字。 请你填写“三羊献瑞”所…

C语言-编译和链接

目录 1.前言2.编译2.1预处理&#xff08;预编译&#xff09;2.1.1 #define 定义常量2.1.2 #define 定义宏2.1.3带有副作用的宏参数2.1.4宏替换规则2.1.5 #和##2.1.5.1 #运算符2.1.5.2 ## 运算符 2.1.6 命名约定2.1.7 #undef2.1.8 条件编译2.1.9 头文件的包含2.1.9.1 本地文件包…

电商系列之取消订单

> 插&#xff1a;AI时代&#xff0c;程序员或多或少要了解些人工智能&#xff0c;前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 坚持不懈&#xff0c;越努力越幸运&#xff0c;大家…

每日面经分享(Spring Boot: part2 DAO层)

1. Spring Boot DAO层的作用 a. 封装数据访问逻辑&#xff1a;DAO层的主要责任是封装与数据访问相关的逻辑。负责处理与数据库的交互&#xff0c;包括数据的增删改查等操作。通过将数据访问逻辑统一封装在DAO层中&#xff0c;可以提高代码的可维护性和可重用性。 b. 解耦业务逻…

java Web 疫苗预约管理系统用eclipse定制开发mysql数据库BS模式java编程jdbc

一、源码特点 JSP 疫苗预约管理系统是一套完善的web设计系统&#xff0c;对理解JSP java 编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,eclipse开发&#xff0c;数据库为Mysql5.0&#xff0c;使…