SparkSQL数据源与数据存储综合实践

news2025/1/23 5:04:52

文章目录

  • 1. 打开项目
  • 2. 查看数据集
    • 2.1 查看JSON格式数据
    • 2.2 查看CSV格式数据
    • 2.3 查看TXT格式数据
  • 3. 添加单元测试依赖
  • 4. 创建数据加载与保存对象
    • 4.1 创建Spark会话对象
    • 4.2 创建加载JSON数据方法
    • 4.3 创建加载CSV数据方法
    • 4.4 创建加载Text数据方法
    • 4.5 创建加载JSON数据扩展方法
    • 4.6 创建加载CSV数据扩展方法
    • 4.7 创建加载Text数据扩展方法
    • 4.8 创建保存文本文件方法
    • 4.9 查看程序完整代码
  • 5. 实战小结

1. 打开项目

  • 打开SparkSQLDataSource项目
    在这里插入图片描述

2. 查看数据集

2.1 查看JSON格式数据

  • 查看users.json文件
    在这里插入图片描述
{"name": "李小玲", "gender": "女", "age": 45}
{"name": "童安格", "gender": "男", "age": 26}
{"name": "陈燕文", "gender": "女", "age": 18}
{"name": "王晓明", "gender": "男", "age": 32}
{"name": "张丽华", "gender": "女", "age": 29}
{"name": "刘伟强", "gender": "男", "age": 40}
{"name": "赵静怡", "gender": "女", "age": 22}
{"name": "孙强东", "gender": "男", "age": 35}

2.2 查看CSV格式数据

  • 查看users.csv文件
    在这里插入图片描述
name,gender,age
李小玲,,45
童安格,,26
陈燕文,,18
王晓明,,32
张丽华,,29
刘伟强,,40
赵静怡,,22
孙强东,,35

2.3 查看TXT格式数据

  • 查看users.txt文件
    在这里插入图片描述
李小玲 女 45
童安格 男 26
陈燕文 女 18
王晓明 男 32
张丽华 女 29
刘伟强 男 40
赵静怡 女 22
孙强东 男 35

3. 添加单元测试依赖

  • pom.xml里添加单元测试框架依赖
    在这里插入图片描述
<dependency>                                    
    <groupId>junit</groupId>                    
    <artifactId>junit</artifactId>              
    <version>4.13.2</version>                   
</dependency>                                   
  • 刷新项目依赖
    在这里插入图片描述

4. 创建数据加载与保存对象

  • 创建net.huawei.practice
    在这里插入图片描述
  • practice子包里创建DataLoadAndSave对象
    在这里插入图片描述
  • 创建DataLoadAndSave伴生类
    在这里插入图片描述

4.1 创建Spark会话对象

  • 创建spark常量
    在这里插入图片描述
// 获取或创建Spark会话对象                                  
val spark = SparkSession.builder() // 创建Builder对象  
  .appName("DataLoadAndSave") // 设置应用程序名称          
  .master("local[*]") // 运行模式:本地运行                 
  .getOrCreate() // 获取或创建Spark会话对象                 

4.2 创建加载JSON数据方法

  • 创建loadJSONData()方法
    在这里插入图片描述
// 加载JSON数据方法                                       
def loadJSONData(filePath: String): DataFrame = {   
  spark.read.json(filePath)                         
}                                                   
  • 在伴生类里创建单元测试方法testLoadJSONData()方法
    在这里插入图片描述
@Test                                                      
def testLoadJSONData(): Unit = {                           
  // 加载JSON数据                                              
  val df = DataLoadAndSave.loadJSONData("data/users.json") 
  // 显示数据                                                  
  df.show()                                                
}                                                          
  • 运行testLoadJSONData()测试方法,查看结果
    在这里插入图片描述

4.3 创建加载CSV数据方法

  • 创建loadCSVData()方法
    在这里插入图片描述
// 加载CSV数据方法                                           
def loadCSVData(filePath: String): DataFrame = {       
  spark.read                                           
    .option("header", "true")                          
    .option("inferSchema", "true")                     
    .csv(filePath)                                     
}                                                      
  • 在伴生类里创建单元测试方法testLoadCSVData()方法
    在这里插入图片描述
@Test                                                       
def testLoadCSVData(): Unit = {                             
  // 加载CSV数据                                                
  val df = DataLoadAndSave.loadCSVData("data/users.csv")    
  // 显示数据                                                   
  df.show()                                                 
}                                                           
  • 运行testLoadCSVData()测试方法,查看结果
    在这里插入图片描述

4.4 创建加载Text数据方法

  • 创建loadTextData()方法
    在这里插入图片描述
// 加载TEXT数据方法                                       
def loadTextData(filePath: String): DataFrame = {   
  spark.read.text(filePath)                         
}                                                   
  • 在伴生类里创建单元测试方法testLoadTextData()方法
    在这里插入图片描述
  • 运行testLoadTextData()测试方法,查看结果
    在这里插入图片描述

4.5 创建加载JSON数据扩展方法

  • 创建loadJSONDataExpand()方法
    在这里插入图片描述
// 加载JSON数据扩展方法                                         
def loadJSONDataExpand(filePath: String): DataFrame = { 
  spark.read.format("json").load(filePath)              
}                                                       
  • 在伴生类里创建单元测试方法testLoadJSONDataExpand()方法
    在这里插入图片描述
  • 运行testLoadJSONDataExpand()测试方法,查看结果
    在这里插入图片描述

4.6 创建加载CSV数据扩展方法

  • 创建loadCSVDataExpand()方法
    在这里插入图片描述
// 加载CSV数据扩展方法                                            
def loadCSVDataExpand(filePath: String): DataFrame = {    
  spark.read.format("csv")                                
    .option("header", "true")                             
    .option("inferSchema", "true")                        
    .load(filePath)                                       
}                                                         
  • 在伴生类里创建单元测试方法testLoadCSVDataExpand()方法
    在这里插入图片描述
  • 运行testLoadCSVDataExpand()测试方法,查看结果
    在这里插入图片描述

4.7 创建加载Text数据扩展方法

  • 创建loadTextDataExpand()方法
    在这里插入图片描述
//  加载TEXT数据扩展方法                                          
def loadTextDataExpand(filePath: String): DataFrame = {   
  spark.read.format("text").load(filePath)                
}                                                         
  • 在伴生类里创建单元测试方法testLoadTextDataExpand()方法
    在这里插入图片描述
  • 运行testLoadTextDataExpand()测试方法,查看结果
    在这里插入图片描述

4.8 创建保存文本文件方法

  • 创建saveTextFile()方法
    在这里插入图片描述
// 保存数据到文本文件方法                                                   
def saveTextFile(inputPath: String, outputPath: String): Unit = {
  // 加载文本数据                                                      
  val df = spark.read.format("text").load(inputPath)             
  // 保存文本数据                                                      
  df.write.mode("overwrite").format("text").save(outputPath)     
}                                                                
  • 在伴生类里创建单元测试方法testSaveTextFile()方法
    在这里插入图片描述
  • 运行testSaveTextFile()测试方法,查看结果
    在这里插入图片描述

4.9 查看程序完整代码

package net.huawei.practice

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.junit.Test

/**
 * 功能:数据加载与保存
 * 作者:华卫
 * 日期:2025年01月18日
 */
object DataLoadAndSave {
  // 获取或创建Spark会话对象
  val spark = SparkSession.builder() // 创建Builder对象
    .appName("DataLoadAndSave") // 设置应用程序名称
    .master("local[*]") // 运行模式:本地运行
    .getOrCreate() // 获取或创建Spark会话对象

  // 加载JSON数据方法
  def loadJSONData(filePath: String): DataFrame = {
    spark.read.json(filePath)
  }

  // 加载CSV数据方法
  def loadCSVData(filePath: String): DataFrame = {
    spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(filePath)
  }

  // 加载TEXT数据方法
  def loadTextData(filePath: String): DataFrame = {
    spark.read.text(filePath)
  }

  // 加载JSON数据扩展方法
  def loadJSONDataExpand(filePath: String): DataFrame = {
    spark.read.format("json").load(filePath)
  }

  // 加载CSV数据扩展方法
  def loadCSVDataExpand(filePath: String): DataFrame = {
    spark.read.format("csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(filePath)
  }

  //  加载TEXT数据扩展方法
  def loadTextDataExpand(filePath: String): DataFrame = {
    spark.read.format("text").load(filePath)
  }

  // 保存数据到文本文件方法
  def saveTextFile(inputPath: String, outputPath: String): Unit = {
    // 加载文本数据
    val df = spark.read.format("text").load(inputPath)
    // 保存文本数据
    df.write.mode("overwrite").format("text").save(outputPath)
  }
}

// 伴生类
class DataLoadAndSave {
  @Test
  def testLoadJSONData(): Unit = {
    // 加载JSON数据
    val df = DataLoadAndSave.loadJSONData("data/users.json")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadCSVData(): Unit = {
    // 加载CSV数据
    val df = DataLoadAndSave.loadCSVData("data/users.csv")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadTextData(): Unit = {
    // 加载TEXT数据
    val df = DataLoadAndSave.loadTextData("data/users.txt")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadJSONDataExpand(): Unit = {
    // 加载JSON数据
    val df = DataLoadAndSave.loadJSONDataExpand("data/users.json")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadCSVDataExpand(): Unit = {
    // 加载CSV数据
    val df = DataLoadAndSave.loadCSVDataExpand("data/users.csv")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadTextDataExpand(): Unit = {
    // 加载TEXT数据
    val df = DataLoadAndSave.loadTextDataExpand("data/users.txt")
    // 显示数据
    df.show()
  }

  @Test
  def testSaveTextFile(): Unit = {
    // 保存数据到文本文件
    DataLoadAndSave.saveTextFile("data/users.txt", "result/users")
  }
}

5. 实战小结

  • 在本次实战中,我们通过SparkSQLDataSource项目深入学习了如何使用Spark SQL加载和保存不同格式的数据。首先,我们查看了JSON、CSV和TXT格式的数据集,并通过DataLoadAndSave对象实现了数据的加载与保存功能。我们创建了多个方法,如loadJSONData()loadCSVData()loadTextData(),分别用于加载不同格式的数据,并通过单元测试验证了这些方法的正确性。此外,我们还扩展了数据加载方法,使用format()方法灵活加载数据,并实现了数据保存功能,如saveTextFile()方法,将数据保存为文本文件。通过本次实战,我们掌握了Spark SQL处理多种数据格式的基本操作,为后续的数据处理和分析打下了坚实基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2280708.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【回忆迷宫——处理方法+DFS】

题目 代码 #include <bits/stdc.h> using namespace std; const int N 250; int g[N][N]; bool vis[N][N]; int dx[4] {0, 0, -1, 1}; int dy[4] {-1, 1, 0, 0}; int nx 999, ny 999, mx, my; int x 101, y 101; //0墙 (1空地 2远方) bool jud(int x, int y) {if…

项目中使用的是 FastJSON(com.alibaba:fastjson)JSON库

从你的 pom.xml 文件中可以看到&#xff0c;项目明确依赖了以下 JSON 库&#xff1a; FastJSON&#xff1a; <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version> </depende…

高效安全文件传输新选择!群晖NAS如何实现无公网IP下的SFTP远程连接

文章目录 前言1. 开启群晖SFTP连接2. 群晖安装Cpolar工具3. 创建SFTP公网地址4. 群晖SFTP远程连接5. 固定SFTP公网地址6. SFTP固定地址连接 前言 随着远程办公和数据共享成为新常态&#xff0c;如何高效且安全地管理和传输文件成为了许多人的痛点。如果你正在寻找一个解决方案…

Windows第一次上手鸿蒙周边

端云一体所需装备 很重要&#xff1a;C/D/E/F盘要有二三十G的可用空间&#xff01; 硬件&#xff1a;华为鸿蒙实验箱&#xff08;基础版&#xff09;》飞机板核心板环境监测板 软件&#xff1a;Visual Studio Code写代码 终端编译 Hiburn烧录到开发板 MobaXterm &#xff08…

使用AI生成金融时间序列数据:解决股市场的数据稀缺问题并提升信噪比

“GENERATIVE MODELS FOR FINANCIAL TIME SERIES DATA: ENHANCING SIGNAL-TO-NOISE RATIO AND ADDRESSING DATA SCARCITY IN A-SHARE MARKET” 论文地址&#xff1a;https://arxiv.org/pdf/2501.00063 摘要 金融领域面临的数据稀缺与低信噪比问题&#xff0c;限制了深度学习在…

【Qt】05-菜单栏

做菜单 前言一、创建文件二、菜单栏 QMenuBar2.1 示例代码2.2 运行结果 三、工具栏 QToolBar3.1 运行代码3.2 结果分析 四、状态栏 QStatusBar4.1 运行代码4.2 运行结果 五、文本编辑框 QTextEdit5.1 运行代码5.2 运行结果 六、浮动窗口 addDockWidget6.1 运行代码6.2 运行结果…

细说STM32F407单片机电源低功耗StandbyMode待机模式及应用示例

目录 一、待机模式基础知识 1、进入待机模式 2、待机模式的状态 3、退出待机模式 二、待机模式应用示例 1、示例功能和CubeMX项目设置 &#xff08;1&#xff09; 时钟 &#xff08;2&#xff09; DEBUG、LED1、KeyRight、USART6、CodeGenerator &#xff08;3&#x…

中国综合算力指数(2024年)报告汇总PDF洞察(附原数据表)

原文链接&#xff1a; https://tecdat.cn/?p39061 在全球算力因数字化技术发展而竞争加剧&#xff0c;我国积极推进算力发展并将综合算力作为数字经济核心驱动力的背景下&#xff0c;该报告对我国综合算力进行研究。 中国算力大会发布的《中国综合算力指数&#xff08;2024年…

w-form-select.vue(自定义下拉框组件)(与后端字段直接相关性)

文章目录 1、w-form-select.vue 组件中每个属性的含义2、实例3、源代码 1、w-form-select.vue 组件中每个属性的含义 好的&#xff0c;我们来详细解释 w-form-select.vue 组件中每个属性的含义&#xff0c;并用表格列出它们是否与后端字段直接相关&#xff1a; 属性解释表格&…

前沿技术趋势洞察:2024年技术的崭新篇章与未来走向!

引言 时光飞逝&#xff0c;2024年已经来临&#xff0c;回顾过去一年&#xff0c;科技的迅猛进步简直让人目不暇接。 在人工智能&#xff08;AI&#xff09;越来越强大的今天&#xff0c;我们不再停留在幻想阶段&#xff0c;量子计算的雏形开始展示它的无穷潜力&#xff0c;Web …

消息队列篇--原理篇--RabbmitMQ(Exchange,消息转换器、docker部署,绑定和确认机制等)

RabbitMQ是一个基于AMQP协议的消息队列系统&#xff0c;支持多种消息传递模式&#xff0c;包括点对点&#xff08;P2P&#xff09;、发布/订阅&#xff08;Pub/Sub&#xff09;和路由模式。RabbitMQ 的设计目标是提供高可用性、可扩展性和可靠性&#xff0c;适用于企业级应用集…

C++入门 详细版

欢迎来到干货小仓库&#xff01;&#xff01; 一分耕耘一分收获&#xff0c;离自己的目标越来越近。 passion&#xff01;passion&#xff01;&#xff01;passion&#xff01;&#xff01;&#xff01; 1.命名空间 由于C语言无法避免名字或者函数重复等问题&#xff0c;当有多…

一文大白话讲清楚webpack基本使用——9——预加载之prefetch和preload以及webpackChunkName的使用

文章目录 一文大白话讲清楚webpack基本使用——9——预加载之prefetch和preload1. 建议按文章顺序从头看&#xff0c;一看到底&#xff0c;豁然开朗2. preload和prefetch的区别2. prefetch的使用3. preload的使用4. webpackChunkName 一文大白话讲清楚webpack基本使用——9——…

Android AutoMotive --CarService

1、AAOS概述 Android AutoMotive OS是谷歌针对车机使用场景打造的操作系统&#xff0c;它是基于现有Android系统的基础上增加了新特性&#xff0c;最主要的就是增加了CarService&#xff08;汽车服务&#xff09;模块。我们很容易把Android AutoMotive和Android Auto搞混&…

(三)线性代数之二阶和三阶行列式详解

在前端开发中&#xff0c;尤其是在WebGL、图形渲染、或是与地图、模型计算相关的应用场景里&#xff0c;行列式的概念常常在计算变换矩阵、进行坐标变换或进行图形学算法时被使用。理解二阶和三阶行列式对于理解矩阵运算、旋转、平移等操作至关重要。下面&#xff0c;我将结合具…

Linux Bash 中使用重定向运算符的 5 种方法

注&#xff1a;机翻&#xff0c;未校。 Five ways to use redirect operators in Bash Posted: January 22, 2021 | by Damon Garn Redirect operators are a basic but essential part of working at the Bash command line. See how to safely redirect input and output t…

蓝桥杯算法日常|c\c++常用竞赛函数总结备用

一、字符处理相关函数 大小写判断函数 islower和isupper&#xff1a;是C标准库中的字符分类函数&#xff0c;用于检查一个字符是否为小写字母或大写字母&#xff0c;需包含头文件cctype.h&#xff08;也可用万能头文件包含&#xff09;。返回布尔类型值。例如&#xff1a; #…

MySQL(4)多表查询

引言&#xff1a;为什么需要多表的查询&#xff1f; A&#xff1a;提高效率&#xff0c;多线进行。 高内聚、低耦合。 一、多表查询的条件 1、错误的多表查询&#xff1a; SELECT employee_id,department_name FROM employees,departments; SELECT employee_id,department…

‘list‘ object has no attribute ‘mul‘

原来运行得好好的&#xff0c;突然出现错误&#xff1a;list object has no attribute mul &#xff0c;更换一个输入路径&#xff0c;又没问题&#xff0c;改一个路径&#xff0c;还是出现错误&#xff0c;很奇怪&#xff0c;后来又没有问题&#xff0c;记录一下。 参考这文…

【含开题报告+文档+PPT+源码】基于SpringBoot+Vue的旅行社在线预订与行程管理系统

开题报告 本文旨在设计研究与开发一个旅行社在线预订与行程管理系统。首先&#xff0c;本文介绍了旅行社在线预订与行程管理系统相关技术集成开发环境、软硬件环境&#xff0c;SpringBoot框架、Vue.js框架的基本原理和优势&#xff0c;以及在旅行社在线预订与行程管理系统开发…