使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

news2025/1/22 19:38:31

使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

在本文中,将介绍如何构建一个实时数据pipeline,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能,以及Kafka和HDFS作为我们的数据传输和存储工具。
1、环境设置:
首先,确保在您的环境中正确安装并配置了mysql、Kafka和HDFS。同时需要在idea中构建依赖配置的pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>spark_project</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.12.12</scala.version>
        <spark.version>3.2.0</spark.version>
        <kafka.version>2.8.1</kafka.version>
    </properties>

    <dependencies>
        <!-- Spark dependencies -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Kafka dependencies -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>

        <!-- Scala library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>   
    </dependencies>
</project>

mysql中表结构:
在这里插入图片描述

2、从MySQL读取数据到Kafka:
我们将使用Spark的结构化流处理功能从MySQL数据库中读取数据,并将其转换为JSON格式,然后将数据写入到Kafka主题中。以下是相应的Scala代码:

package org.example.mysql2kafka2hdfs

import org.apache.spark.sql.SparkSession

import java.util.Properties

object Mysql2Kafka {

  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("MySQLToKafka")
      .master("local[*]")
      .getOrCreate()

    // 设置 MySQL 连接属性
    val mysqlProps = new Properties()
    mysqlProps.setProperty("user", "root")
    mysqlProps.setProperty("password", "12345678")
    mysqlProps.setProperty("driver", "com.mysql.jdbc.Driver")

    // 从 MySQL 数据库中读取数据
    val jdbcDF = spark.read.jdbc("jdbc:mysql://localhost:3306/mydb", "comment", mysqlProps)

    // 将 DataFrame 转换为 JSON 字符串
    val jsonDF = jdbcDF.selectExpr("to_json(struct(*)) AS value")


    // 将数据写入 Kafka
    jsonDF.show()
    jsonDF
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "comment")
      .save()

    // 停止 SparkSession
    spark.stop()
  }

}

以上代码首先创建了一个SparkSession,然后设置了连接MySQL所需的属性。接着,它使用jdbc.read从MySQL数据库中读取数据,并将数据转换为JSON格式,最后将数据写入到名为"comment"的Kafka主题中。提示:topic主题会被自动创建。

从Kafka消费数据并写入HDFS:
接下来,我们将设置Spark Streaming来消费Kafka中的数据,并将数据保存到HDFS中。以下是相应的Scala代码:

package org.example.mysql2kafka2hdfs

import com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

case class Comment(author_name:String,
                   fans:String,
                   comment_text:String,
                   comment_time:String,
                   location:String,
                   user_gender:String)

object kafka2Hdfs {
  def main(args: Array[String]): Unit = {
    // 设置 SparkConf
    val sparkConf = new SparkConf()
      .setAppName("KafkaToHDFS")
      .setMaster("local[*]")

    // 创建 StreamingContext,每秒处理一次
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // 设置 Kafka 相关参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092", // Kafka broker 地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "spark-consumer-group", // Spark 消费者组
      "auto.offset.reset" -> "earliest", // 从最新的偏移量开始消费
      "enable.auto.commit" -> (false: java.lang.Boolean) // 不自动提交偏移量
    )

    // 设置要订阅的 Kafka 主题
    val topics = Array("comment")

    // 创建 Kafka Direct Stream
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )


    // 从 Kafka 中读取消息,然后将其写入 HDFS
    stream.map({rdd=>
      val comment = JSON.parseObject(rdd.toString(), classOf[Comment])
      comment.author_name+","+comment.comment_text+","+comment.comment_time+","+comment.fans+","+comment.location+","+comment.user_gender
    }).foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        println(rdd)
        rdd.saveAsTextFile("hdfs://hadoop101:8020/tmp/")
      }
    }

    // 启动 Spark Streaming
    ssc.start()
    ssc.awaitTermination()
  }

}

以上代码设置了Spark Streaming来消费Kafka中的数据。它将JSON格式的数据解析为Comment类对象,并将其保存为逗号分隔的文本文件,最终存储在HDFS的/tmp目录中。
在这里插入图片描述

结论:
通过本文的介绍和示例代码,您现在应该了解如何使用Apache Spark构建一个实时数据流水线,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据保存到HDFS中。这个流水线可以应用于各种实时数据处理和分析场景中。

**如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等 **
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1672643.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[Linux][网络][高级IO][一][五种IO模型][同步通信][异步通信][非阻塞IO]详细讲解

目录 0.预备知识 && 思考问题1.五种IO模型0.形象理解五种模型1.阻塞IO2.非阻塞IO3.信号驱动IO4.多路转接/多路复用5.异步IO 2.高级IO重要概念1.同步通信 vs 异步通信2.阻塞 vs 非阻塞 3.非阻塞IO1.fcntl()2.实现SetNonBlock 0.预备知识 && 思考问题 网络通信本…

华为认证存储HCIE有用吗?

首先&#xff0c;对于个人来说&#xff0c;获得华为存储认证可以证明其具备信息存储技术的专业能力 1.专业认可&#xff1a;获得华为存储认证&#xff0c;尤其是HCIE-Storage级别的证书&#xff0c;意味着持有者对信息存储技术有着全面深入的理解&#xff0c;能够设计、部署、…

不同品牌的SSL证书价格差异大吗?

在数字化时代&#xff0c;网络安全的重要性日益凸显&#xff0c;SSL证书作为保护网站和用户数据安全的重要工具&#xff0c;其价值不言而喻。SSL证书通过加密技术保障网站与用户之间的通信安全&#xff0c;防止敏感信息的泄露和篡改。对于网站运营者而言&#xff0c;了解SSL证书…

LearnOpenGL(十三)之网格

一、网格 组合模型的每个单独的形状就叫做一个网格(Mesh)。比如说有一个人形的角色&#xff1a;艺术家通常会将头部、四肢、衣服、武器建模为分开的组件&#xff0c;并将这些网格组合而成的结果表现为最终的模型。一个网格是我们在OpenGL中绘制物体所需的最小单位&#xff08;…

Android Compose 三:切换主题

1 先瞅瞅lightColorScheme / darkColorScheme 创建项目后 自带一个Theme.kt 判断了 使用哪个ColorScheme 下面看看LightColorScheme &#xff08;亮/暗主题差不多&#xff0c;就是颜色不一样罢了&#xff09; private val LightColorScheme lightColorScheme(primary Pur…

StackQueue+泛型简单理解

&#x1f341; 个人主页&#xff1a;爱编程的Tom&#x1f4ab; 本篇博文收录专栏&#xff1a;Java专栏&#x1f449; 目前其它专栏&#xff1a;c系列小游戏 c语言系列--万物的开始_ &#x1f389; 欢迎 &#x1f44d;点赞✍评论⭐收藏&#x1f496;三连支持一…

网络安全快速入门(十一)vi/vim

11.1 了解vi 前面我i们已经在基础命令中大致了解了vi&#xff0c;本章我们针对vi来细讲一下&#xff0c;vi和vim 11.1.1 什么是vi/vim&#xff1f; vi和vim&#xff0c;都是一个模块化的文本编辑工具&#xff0c;换句话讲&#xff0c;通过vi下的一系列的命令&#xff0c;可以实…

群晖NAS本地搭建Bitwarden密码管理服务并实现远程同步密码托管

文章目录 1. 拉取Bitwarden镜像2. 运行Bitwarden镜像3. 本地访问4. 群晖安装Cpolar5. 配置公网地址6. 公网访问Bitwarden7. 固定公网地址8. 浏览器密码托管设置 Bitwarden是一个密码管理器应用程序&#xff0c;适用于在多个设备和浏览器之间同步密码。自建密码管理软件bitwarde…

如何用时尚新姿讲好中国品牌故事?

品牌建设在推动高质量发展中扮演了双重角色&#xff0c;既是高质量发展的重要“承载者”&#xff0c;也是强有力的“助推器”。5月10日-11日&#xff0c;中国时尚品牌URBAN REVIVO&#xff08;以下简称UR&#xff09;以中国品牌日为起点&#xff0c;联合天猫超级品牌日&#xf…

paddle ocr v4 2.6.1实战笔记

目录 效果图&#xff1a; 安装 模型权重是自动下载&#xff0c;如果提前下载会报错。 识别orc&#xff0c;并opencv可视化结果&#xff0c;支持中文可视化 官方原版预测可视化&#xff1a; 效果图&#xff1a; 安装 安装2.5.2识别结果为空 pip install paddlepaddle-gpu…

操作系统实战(三)(linux+C语言实现)

实验目的 加深对进程调度概念的理解&#xff0c;体验进程调度机制的功能&#xff0c;了解Linux系统中进程调度策略的使用方法。 练习进程调度算法的编程和调试技术。 实验说明 1.在linux系统中调度策略分为3种 SCHED_OTHER&#xff1a;默认的分时调度策略&#xff0c;值为0…

弹幕播放器源码

下 载 地 址 &#xff1a; runruncode.com/php/19761.html 1. 将弹幕播放器的源码上传到服务器。 2. 通过访问你的域名/dmku/install/index.php来进行弹幕库的安装。 3. 修改播放器后台的密码&#xff0c;配置文件为/config.php&#xff0c;并配置json接口。 4. 后台账号为…

国内环境也可以开发好玩的LLM应用 - 环境准备篇

在开发基于LLM(大语言模型)的AI应用前, 我们首先要准备好必要的环境. 主要就是Python环境以及大模型应用开发部署环境. 01 Python开发环境准备 Python开发环境有如下四种, 根据个人喜好选其一即可: 本地安装Python及IDE, 适合学习测试开发; 本地安装Python环境, 使用Jupyter …

享元模式详解

享元模式 1 概述 定义&#xff1a; ​ 运用共享技术来有效地支持大量细粒度对象的复用。它通过共享已经存在的对象来大幅度减少需要创建的对象数量、避免大量相似对象的开销&#xff0c;从而提高系统资源的利用率。 2 结构 享元&#xff08;Flyweight &#xff09;模式中存…

【卫星影像三维重建-全流程代码实现】点云Mesh重构

点云—>Mesh模型 1.介绍1.1 背景1.2 效果示意 2 算法实现2.1 依赖库2.2 实验数据2.3 代码实现2.4 实验效果 3.总结 1.介绍 1.1 背景 &#xff08;1&#xff09;本文主要内容是将三维点云&#xff08;离散的三维点&#xff09;进行表面重建生成Mesh网格&#xff0c;之前有篇…

UIKit常用API:Transform

需求 使用Transform系列的API&#xff0c;该API中提供了旋转、平移等功能。其中函数中带make的效果是只变化一次&#xff0c;不带make可变化多次。此外&#xff0c;还有恢复函数&#xff1a;CGAffineTransformIdentity。 代码实现 注意按钮绑定的是同一个响应事件&#xff0…

【AIGC】Mac Intel 本地 LLM 部署经验汇总(CPU Only)

书接上文&#xff0c;在《【AIGC】本地部署 ollama(gguf) 与项目整合》章节的最后&#xff0c;我在 ollama 中部署 qwen1_5-14b-chat-q4_k_m.gguf 预量化模型&#xff0c;在非 Stream 模式下需要 89 秒才完成一轮问答&#xff0c;响应速度实在是太慢&#xff0c;后续需要想办法…

Qt与QWebEngineView 交互-调试窗口-JS拓扑图完整示例参考

1&#xff1a;介绍&#xff1a; Qt与QWebEngineView的交互 简介之前文章解释过&#xff0c;链接在下面 传送门&#xff1a;Qt与QWebEngineView 交互完整示例参考_qt qwebview-CSDN博客 一般在使用这种方式时&#xff0c;可能会出现各种问题而不好调试&#xff0c;如果能够像…

【C++】继承相关(基类与派生类的继承关系以及细节整理)

目录 00.引言 01.继承的定义 02.基类和派生类对象 03.继承中的作用域 04.派生类的默认成员函数 05.友元、静态成员 00.引言 继承是面向对象编程中的一个重要概念&#xff0c;它的作用是创建一个新的类&#xff0c;该类可以从一个已存在的类&#xff08;父类/基类&#x…

sipeed 的 MaixCam显示图片

WiFi联网后&#xff0c;把固件升级到最新 一根tpyc-c连接线为MaixCam供电&#xff0c;点击液晶屏settings 在WiFi中设置确保联网&#xff0c;在更新MaixPy中升级固件 可以选择国内源加速&#xff0c;将固件升级到最新版 MaixVision的操作 1&#xff0c;在MaixVision左下角…