利用Spark将Kafka数据流写入HDFS

news2025/1/19 16:16:12

利用Spark将Kafka数据流写入HDFS

在当今的大数据时代,实时数据处理和分析变得越来越重要。Apache Kafka作为一个分布式流处理平台,已经成为处理实时数据的事实标准。而Apache Spark则是一个强大的大数据处理框架,它提供了对数据进行复杂处理的能力。
本篇博客将介绍如何使用Spark来读取Kafka中的数据流,并将这些数据以CSV格式写入到HDFS中。
环境准备
在开始之前,确保你的开发环境中已经安装了以下软件:

Apache Kafka

#启动zookeeper
zkServer start
#启动kafka服务
kafka-server-start /opt/homebrew/etc/kafka/server.properties

Apache Spark

<properties>
      <scala.version>2.12.17</scala.version>
      <spark.version>3.0.0</spark.version>
 <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- Kafka Streaming dependency -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

Hadoop HDFS

#启动hdfs
start-dfs.sh

Java开发环境
此外,你需要在项目中包含Spark和Kafka的依赖库。

代码实现
首先,我们定义一个Scala case class Job 来表示从Kafka读取的每条记录的数据结构。

case class Job(
  Position: String,
  Company: String,
  Salary: String,
  Location: String,
  Experience: String,
  Education: String,
  Detail: String
)

接下来,我们编写一个Kafka2Hdfs对象,并在其中实现main方法。这个方法将创建一个SparkSession,配置Kafka读取选项,并从Kafka中读取数据流。

object Kafka2Hdfs {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Kafka2Hdfs")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val kafkaOptions = Map[String, String](
      "kafka.bootstrap.servers" -> "127.0.0.1:9092",
      "subscribe" -> "flume",
      "startingOffsets" -> "earliest"
    )

    val stream = spark.readStream
      .format("kafka")
      .options(kafkaOptions)
      .load()

我们使用subscribe选项指定Kafka中的topic名称,这里我们使用的是flume。startingOffsets选项设置为earliest,意味着我们从最早的记录开始读取数据。

接下来,我们将Kafka中的数据转换成DataFrame。我们首先将每条记录的value字段转换为字符串,然后使用map函数将每条记录解析为Job对象。

val jobDs = stream.selectExpr("CAST(value AS STRING)")
  .as[String]
  .map(line => {
    val fields = line.split(",")
    Job(
      Position = fields(0),
      Company = fields(1).trim,
      Salary = fields(2).trim,
      Location = fields(3).trim,
      Experience = fields(4).trim,
      Education = fields(5).trim,
      Detail = fields(6).trim
    )
  }).toDF()

现在,我们已经有了一个包含Job对象的DataFrame。接下来,我们将这个DataFrame以CSV格式写入到HDFS中。我们使用writeStream方法,并设置format为csv,同时指定输出路径和检查点位置。

val query: StreamingQuery = jobDs.writeStream
  .format("csv")
  .option("header", "false")
  .option("path", "/")
  .option("checkpointLocation", "/ck")
  .start()

注意,我们在这里将header选项设置为false,因为我们不打算在CSV文件中包含列名。path选项指定了输出文件的存储路径,而checkpointLocation选项指定了检查点的存储路径,这对于流处理的可靠性非常重要。

最后,我们调用awaitTermination方法来等待流处理的结束。在实际的生产环境中,你可能希望将这个流处理任务部署到一个集群上,并让它持续运行。

query.awaitTermination()

总结
在这篇博客中,我们介绍了如何使用Spark读取Kafka中的数据流,并将这些数据以CSV格式写入到HDFS中。这种方法可以用于各种实时数据处理场景,例如日志分析、事件监控等。通过这种方式,我们可以将实时数据转换为静态数据,以便进行更深入的分析和处理。

完整代码:

package com.lhy.sparkkafka2hdfs

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Row, SparkSession}



case class Job(Position:String,Company:String,Salary:String,Location:String,Experience:String,Education:String,Detail:String)
object Kafka2Hdfs{
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Kafka2Hdfs")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val kafkaOptions = Map[String, String](
      "kafka.bootstrap.servers" -> "127.0.0.1:9092",
      "subscribe" -> "flume",
      "startingOffsets" -> "earliest"
    )

    val stream = spark.readStream
      .format("kafka")
      .options(kafkaOptions)
      .load()


    val jobDs = stream.selectExpr("CAST(value AS STRING)")
      .as[String]
      .map(line => {
        val fields = line.split(",")
        Job(
          Position = fields(0),
          Company = fields(1).trim,
          Salary = fields(2).trim,
          Location = fields(3).trim,
          Experience = fields(4).trim,
          Education = fields(5).trim,
          Detail = fields(6).trim
        )
      }).toDF()
//    val query = jobDs.writeStream.format("console").start()

    val query: StreamingQuery = jobDs.writeStream
      .format("csv")
      .option("header", "false")
      .option("path", "/")
      .option("checkpointLocation", "/ck")
      .start()

    query.awaitTermination()

  }

在这里插入图片描述
如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1568845.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux操作系统之nfs网络文件系统

目录 一、NFS简介 1.2 安装配置NFS 一、NFS简介 nfs类似于windows文件共享 将linux的一个目录共享到网络中&#xff0c;网络中的其他所有主机都可以使用这个共享目录中的文件 samba 文件共享 可以在linux中通过samba共享一个目录&#xff0c;然后在linux中可以访问这个共享 …

55 npm run serve 和 npm run build 的分包策略

前言 这里我们来看一下 vue 这边 打包的时候的一些 拆分包的一些策略 我们经常会使用到 npm run build 进行服务的打包 然后 打包出来的情况, 可能如下, 可以看到 chunk-vendors 是进行了包的拆分, 我们这里就是 来看一下 这里 npm run build 的时候的, 一个分包的策略 测试…

【HTML】简单制作一个唱片动画效果

目录 前言 开始 HTML部分 CSS部分 效果图 总结 前言 无需多言&#xff0c;本文将详细介绍一段代码&#xff0c;具体内容如下&#xff1a; 开始 首先新建文件夹&#xff0c;创建两个文本文档&#xff0c;其中HTML的文件名改为[index.html]&#xff0c;CSS的…

Matlab|储能辅助电力系统调峰的容量需求研究

目录 1 主要内容 目标函数 约束条件 2 部分代码 3 程序结果 4 下载链接 1 主要内容 该程序参考文献《储能辅助电力系统调峰的容量需求研究》&#xff0c;主要是对火电、风电和储能等电力设备主体进行优化调度&#xff0c;在调峰能力达不到时采用弃负荷&#xff0c;程序以…

第十四届省赛大学B组(C/C++)子串简写

原题链接&#xff1a;子串简写 程序猿圈子里正在流行一种很新的简写方法&#xff1a; 对于一个字符串&#xff0c;只保留首尾字符&#xff0c;将首尾字符之间的所有字符用这部分的长度代替。 例如 internationalization 简写成 i18n&#xff0c;Kubernetes 简写成 K8s&#…

【贪玩巴斯】Mac的M芯片(M1/2...)下载homebrew方法(24年最新且已验证可行)

1. 按照目前广为流传的方法&#xff08;M1会出现一些问题&#xff09;&#xff1a; 终端输入&#xff1a; /bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)" 使用国内镜像下载。 2. 输入后按照要求步骤执行即可&#xff…

docker-compose运行springinitializr用来创建springboot2

前言 spring initializr官方的地址是: https://start.spring.io/ &#xff0c;这是一个用来创建springboot脚手架的一个工具&#xff0c;但是目前这个工具已经更新到springboot3&#xff0c;而我还没学springboot3&#xff0c;目前还想继续创建springboot2&#xff0c;我就想能…

如何预防自己网站被流量劫持?HTTPS加密是否可行?

如何预防自己网站被流量劫持&#xff1f;HTTPS加密是否可行&#xff1f; 文章背景&#xff1a; 所谓的流量劫持&#xff0c;就是利用各种恶意软件修改浏览器、锁定主页或不停弹出新窗口&#xff0c;强制用户访问某些网站&#xff0c;从而造成用户流量损失的情形。 流量劫持是一…

前端三剑客 —— CSS (第一节)

目录 CSS 什么是CSS CSS的几种写法&#xff1a; 行内样式 内嵌样式 外链样式 import 加载顺序 CSS选择器*** 基本选择器 ID选择器 标签选择器 类选择器 通用选择器 包含选择器 上节内容中提到了 前端三剑客 —— HTML 超文本标记语言&#xff0c;这节内容 跟大家…

开发一个Java项目常用的工具类推荐

文章目录 新建Java项目pom.xml添加依赖添加代理仓库项目打jar包并上传配置Springboot依赖MySQL数据库相关依赖lombok接口文档Swagger相关其他常用工具类Hutool插件 完整的pom参考其他参考&#xff1a; 每次新起一个Java项目&#xff0c;都需要经历一系列的繁琐步骤去初始化这个…

jupyter Notebook 默认路径修改

1. anaconda prompt 中运行 jupyter notebook --generate-config 命令&#xff0c;将在 C:\Users\Think\.jupyter文件下生成 jupyter_notebook_config.py 文件。 2.在jupyter_notebook_config.py 文件中&#xff0c;找c.NotebookApp.notebook_dir 这个变量&#xff0c; (1)若…

基于Python的微博舆论分析,微博评论情感分析可视化系统,附源码

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

【SaaS,PaaS? XaaS -微参考】

介绍 以下是关于各种云服务模式的简要介绍&#xff0c;包括全称、定义、典型场景和应用&#xff1a; 缩写全称定义关键词典型场景和应用SaaSSoftware as a Service将软件以服务的形式交付给用户&#xff0c;用户通过互联网访问软件。提供软件电子邮件、在线办公套件&#xff…

Golang | Leetcode Golang题解之第3题无重复字符的最长子串

题目&#xff1a; 题解&#xff1a; func lengthOfLongestSubstring(s string) int {// 哈希集合&#xff0c;记录每个字符是否出现过m : map[byte]int{}n : len(s)// 右指针&#xff0c;初始值为 -1&#xff0c;相当于我们在字符串的左边界的左侧&#xff0c;还没有开始移动r…

Jenkins首次安装选择推荐插件时出现”No such plugin cloudbees-folder”解决方案

安装Jenkins成功之后&#xff0c;首次启动Jenkins后台管理&#xff0c;进入到安装插件的步骤&#xff0c;选择"推荐安装"&#xff0c;继续下一步的时候出现错误提示&#xff1a; 出现一个错误 安装过程中出现一个错误&#xff1a;No such plugin&#xff1a;cloudb…

C++ //练习 11.14 扩展你在11.2.1节练习(第378页)中编写的孩子姓到名的map,添加一个pair的vector,保存孩子的名和生日。

C Primer&#xff08;第5版&#xff09; 练习 11.14 练习 11.14 扩展你在11.2.1节练习&#xff08;第378页&#xff09;中编写的孩子姓到名的map&#xff0c;添加一个pair的vector&#xff0c;保存孩子的名和生日。 环境&#xff1a;Linux Ubuntu&#xff08;云服务器&#x…

Java中生成一个唯一的文件名的方法

使用java.util.UUID&#xff08;通用唯一识别码&#xff09;的randomUUID()方法&#xff1a; import java.util.UUID;public class Test {public static void main(String[] args) {for (int i 0; i < 100; i) {String fileName UUID.randomUUID().toString();System.out…

【JAVA】JAVA快速入门(长期维护)

下面是java的一些入门基础知识&#xff0c;有需要借鉴即可。 课程&#xff1a;B站黑马程序员&#xff0c;JAVA入门LINK 一、初识JAVA 1.java概述 概念&#xff1a;java是由sun公司研发&#xff0c;在2009年被oracle收购&#xff0c;祖师爷詹姆斯高斯林&#xff0c;是一种高级…

腾讯云轻量4核8G价格多少钱?4核8G12M轻量服务器646元一年3个月

2024年腾讯云4核8G服务器租用优惠价格&#xff1a;轻量应用服务器4核8G12M带宽646元15个月&#xff0c;CVM云服务器S5实例优惠价格1437.24元买一年送3个月&#xff0c;腾讯云4核8G服务器活动页面 txybk.com/go/txy 活动链接打开如下图&#xff1a; 腾讯云4核8G服务器优惠价格 轻…

51单片机入门之独立按键

目录 1.按键简介 2.独立按键控制LED亮灭 3.独立按键控制LED移位 1.按键简介 在生活中&#xff0c;我们常常会见到各种按键&#xff0c;我们的开发板上也有按键&#xff0c;就在左下角有四个按键&#xff0c;我们把它们叫做独立按键。 独立按键的原理比较简单&…