Spark SQL数据源的基本操作(更新ing)

news2025/1/11 23:35:40

文章目录

  • 一、基本操作
  • 二、默认数据源
    • (一)默认数据源Parquet
    • (二)案例演示读取Parquet文件
      • 1、在Spark Shell中演示
        • 练习1、将`student.txt`文件转换成`student.parquet`
        • 练习2、读取`student.parquet`文件得到学生数据帧,并显示数据帧内容
      • 2、在IntelliJ IDEA里演示
  • 三、手动指定数据源
    • (一)format()与option()方法概述
    • (二)案例演示读取不同数据源
      • 1、读取csv文件
      • 2、读取json,保存为parquet
      • 3、读取jdbc数据源,保存为json文件
  • 四、数据写入模式
    • (一)mode()方法
    • (二)枚举类SaveMode
    • (三)案例演示不同写入模式
  • 五、分区自动推断
    • (一)分区自动推断概述
    • (二)分区自动推断演示
      • 1、建四个文件
      • 2、读取表数据
      • 3、输出Schema信息
      • 4、显示数据帧内容
    • (三)分区自动推断注意事项


一、基本操作

Spark SQL提供了两个常用的加载数据和写入数据的方法:load()方法和save()方法。load()方法可以加载外部数据源为一个DataFrame,save()方法可以将一个DataFrame写入指定的数据源。

二、默认数据源

(一)默认数据源Parquet

默认情况下,load()方法和save()方法只支持Parquet格式的文件,Parquet文件是以二进制方式存储数据的,因此不可以直接读取,文件中包括该文件的实际数据和Schema信息,也可以在配置文件中通过参数spark.sql.sources.default对默认文件格式进行更改。Spark SQL可以很容易地读取Parquet文件并将其数据转为DataFrame数据集。

(二)案例演示读取Parquet文件

执行命令: cd $SPARK_HOME/examples/src/main/resources,查看Spark的样例数据文件users.parquet
在这里插入图片描述

用cat命令显示users.parquet文件内容,只会显示乱码
启动hdfs:start-dfs.sh
在这里插入图片描述

将数据文件users.parquet上传到HDFS的/datasource/input目录
在这里插入图片描述

1、在Spark Shell中演示

启动spark服务:start-all.sh
在这里插入图片描述

启动Spark Shell,执行命令:spark-shell --master spark://master:7077
在这里插入图片描述
执行命令:val userdf = spark.read.load("hdfs://master:9000/datasource/input/users.parquet")
在这里插入图片描述
执行命令:userdf.show,查看数据帧内容
在这里插入图片描述
执行命令:userdf.printSchema,查看数据帧模式
在这里插入图片描述
执行命令:userdf.select("name", "favorite_color").write.save("hdfs://master:9000/datasource/output"),对数据帧指定列进行查询,查询结果依然是数据帧,然后通过write成员的save()方法写入HDFS指定目录
在这里插入图片描述

查看HDFS上的输出结果
在这里插入图片描述

除了使用select()方法查询外,也可以使用SparkSession对象的sql()方法执行SQL语句进行查询,该方法的返回结果仍然是一个DataFrame。

基于数据帧创建临时视图,执行命令:userdf.createTempView("t_user")
在这里插入图片描述
执行SQL查询,将结果写入HDFS,执行命令:spark.sql("select name, favorite_color from t_user").write.save("hdfs://master:9000/datasource/output2")
在这里插入图片描述
查看HDFS上的输出结果
在这里插入图片描述

练习1、将student.txt文件转换成student.parquet

解决思路:将student.txt转成studentDF,利用数据帧的save()方法保存到/datasource/output3目录,然后将文件更名复制到/datasource/input目录

得到学生数据帧 - studentDF
在这里插入图片描述

val ds = spark.read.textFile("hdfs://master:9000/student/input/student.txt")
case class Student(id: Int, name: String, gender: String, age: Int)
import spark.implicits._
val studentDS = ds.map(line => {
      val fields = line.split(",")
      val id = fields(0).toInt
      val name = fields(1)
      val gender = fields(2)
      val age = fields(3).toInt
      Student(id, name, gender, age)
   }
)
val studentDF = studentDS.toDF()
studentDF.show

将学生数据帧保存为parquet文件,studentDF.write.save(“hdfs://master:9000/datasource/output3”)
在这里插入图片描述
查看生成的parquet文件
在这里插入图片描述
复制parquet文件到/datasource/input目录
在这里插入图片描述

练习2、读取student.parquet文件得到学生数据帧,并显示数据帧内容

执行命令:val studentDF = spark.read.load(“hdfs://master:9000/datasource/input/student.parquet”)
在这里插入图片描述
执行命令:studentDF.show
在这里插入图片描述

2、在IntelliJ IDEA里演示

创建Maven项目
在这里插入图片描述

设置项目相关信息
在这里插入图片描述单击【Finish】按钮
在这里插入图片描述
将java目录改成scala目录
在这里插入图片描述

在pom.xml文件里添加相关依赖,设置源程序文件夹

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>net.army.sql</groupId>
    <artifactId>SparkSQLDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.15</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.1.3</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
    </build>
    
</project>

在resources目录里添加日志属性文件
在这里插入图片描述

log4j.rootLogger=ERROR, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spark.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

在resources目录里添加HFDS配置文件
在这里插入图片描述

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property>
        <description>only config in clients</description>
        <name>dfs.client.use.datanode.hostname</name>
        <value>true</value>
    </property>
</configuration>

创建net.army.sql.day01包,在包里创建ReadParquetFile对象
在这里插入图片描述

package net.army.sql.day01

import org.apache.spark.sql.SparkSession

/**
 * 功能:读取Parquet文件
 * 作者:梁辰兴
 * 日期:20230612*/
object ReadParquetFile {
  def main(args: Array[String]): Unit = {
    // 创建或得到Spark会话对象
    val spark = SparkSession.builder()
      .appName("ReadParquetFile")
      .master("local[*]")
      .getOrCreate()
    // 加载student.parquet文件,得到数据帧
    val studentDF = spark.read.load("hdfs://master:9000/datasource/input/student.parquet")
    // 显示学生数据帧内容
    studentDF.show
    // 查询20岁以上的女生
    val girlDF = studentDF.filter("gender = '女' and age > 20")
    // 显示女生数据帧内容
    girlDF.show
    // 保存查询结果到HDFS(保证输出目录不存在)
    girlDF.write.save("hdfs://master:9000/datasource/output")
    // 关闭Spark会话对象
    spark.stop()
  }
}

运行程序,查看控制台结果

三、手动指定数据源

(一)format()与option()方法概述

使用format()方法可以手动指定数据源。数据源需要使用完全限定名(例如org.apache.spark.sql.parquet),但对于Spark SQL的内置数据源,也可以使用它们的缩写名(JSON、Parquet、JDBC、ORC、Libsvm、CSV、Text)。

通过手动指定数据源,可以将DataFrame数据集保存为不同的文件格式或者在不同的文件格式之间转换。

在指定数据源的同时,可以使用option()方法向指定的数据源传递所需参数。例如,向JDBC数据源传递账号、密码等参数。

(二)案例演示读取不同数据源

1、读取csv文件

执行命令:cd $SPARK_HOME/examples/src/main/resources,查看Spark的样例数据文件people.csv
在这里插入图片描述
将people.csv文件上传到HDFS的/datasource/input目录,然后查看文件内
在这里插入图片描述
在Spark Shell里,执行命令:val peopleDF = spark.read.format(“csv”).load(“hdfs://master:9000/datasource/input/people.csv”),读取人员csv文件,得到人员数据帧
在这里插入图片描述执行命令:peopleDF.show,查看人员数据帧内容
在这里插入图片描述

大家可以看到,people.csv文件第一行是字段名列表,但是转成数据帧之后,却成了第一条记录,这样显然是不合理的,怎么办呢?就需要用到option()方法来传递参数,告诉Spark第一行是表头header,而不是表记录。

执行命令:val peopleDF = spark.read.format(“csv”).option(“header”, “true”).load(“hdfs://master:9000/datasource/input/people.csv”)
在这里插入图片描述执行命令:peopleDF.show,查看人员数据帧内容
在这里插入图片描述
由于csv文件默认分隔符是逗号,而people.csv的分隔符是分号,因此要利用option(“delimiter”, “;”)告诉Spark

执行命令:val peopleDF = spark.read.format(“csv”).option(“header”, “true”).option(“delimiter”, “;”).load(“hdfs://master:9000/datasource/input/people.csv”)
在这里插入图片描述执行命令:peopleDF.show,查看人员数据帧内容
在这里插入图片描述

2、读取json,保存为parquet

查看people.json文件
在这里插入图片描述
将people.json上传到HDFS的/datasource/input目录,并查看其内容
在这里插入图片描述在Spark Shell里,执行命令:val peopleDF = spark.read.format(“json”).load(“hdfs://master:9000/datasource/input/people.json”)
在这里插入图片描述

执行命令:peopleDF.show
在这里插入图片描述
执行命令:peopleDF.select(“name”, “age”).write.format(“parquet”).save(“hdfs://master:9000/datasource/output4”) (注意:format(“parquet”)其实可以省掉的)
在这里插入图片描述
查看生成的parquet文件(/datasource/output4/part-00000-a1e62c69-59e5-40b6-8391-89bdfffe61ff-c000.snappy.parquet)
在这里插入图片描述
将该parquet文件更名拷贝到/datasource/input目录,执行命令: hdfs dfs -cp /datasource/output4/part-00000-d0adfd21-9f55-49fc-a3dd-93bd313ea8e2-c000.snappy.parquet /datasource/input/people.parquet
在这里插入图片描述
现在读取/datasource/input/people.parquet文件得到人员数据帧
在这里插入图片描述
查看人员数据帧内容
在这里插入图片描述

3、读取jdbc数据源,保存为json文件

启动master的mysql服务
在这里插入图片描述

在Navicat创建mastermysql连接,连接到master虚拟机上安装的MySQL
在这里插入图片描述
查看student数据库里的user表
在这里插入图片描述
执行命令

val userDF = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://master:3306/student")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "user")  
  .option("user", "root")  
  .option("password", "")
  .load()

结果报错,没有找到数据库驱动程序
在这里插入图片描述
上传数据驱动程序到$SPARK_HOME/jars目录(每个节点都需要上传)
在这里插入图片描述

查看上传的数据库驱动程序
在这里插入图片描述

执行命令

val userDF = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://master:3306/student")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "user")  
  .option("user", "root")  
  .option("password", "")
  .load()

执行之后会有一个警告信息,通过设置useSSL=true来消除
在这里插入图片描述

执行命令

val userDF = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://master:3306/student?useSSL=false")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "user")  
  .option("user", "root")  
  .option("password", "")
  .load()

执行之后得到用户数据帧
在这里插入图片描述
执行命令:userDF.show,查看用户数据帧内容
在这里插入图片描述
执行命令:userDF.write.format(“json”).save(“hdfs://master:9000/datasource/output5”)
在这里插入图片描述
在虚拟机slave1查看生成的json文件,执行命令:hdfs dfs -cat /datasource/output5/*
在这里插入图片描述

四、数据写入模式

(一)mode()方法

在写入数据时,可以使用mode()方法指定如何处理已经存在的数据,该方法的参数是一个枚举类SaveMode。

使用SaveMode类,需要import org.apache.spark.sql.SaveMode;

(二)枚举类SaveMode

SaveMode.ErrorIfExists:默认值。当向数据源写入一个DataFrame时,如果数据已经存在,就会抛出异常。

SaveMode.Append:当向数据源写入一个DataFrame时,如果数据或表已经存在,会在原有的基础上进行追加。

SaveMode.Overwrite:当向数据源写入一个DataFrame时,如果数据或表已经存在,就会将其覆盖(包括数据或表的Schema)。

SaveMode.Ignore:当向数据源写入一个DataFrame时,如果数据或表已经存在,就不会写入内容,类似SQL中的CREATE TABLE IF NOT EXISTS。

(三)案例演示不同写入模式

查看数据源:people.json
在这里插入图片描述
查询该文件name里,采用覆盖模式写入/result,/result目录里本来有东西的

五、分区自动推断

(一)分区自动推断概述

(二)分区自动推断演示

1、建四个文件

2、读取表数据

3、输出Schema信息

4、显示数据帧内容

(三)分区自动推断注意事项

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/643885.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

K8S minikube本地安装

一. mac安装K8S 1.brew安装 brew install kubectl 2.查看版本 kubectl version --outputjson { "clientVersion": { "major": "1", "minor": "27", "gitVersion": "v1.27.2", &…

基于深度学习的高精度奶牛检测识别系统(PyTorch+Pyside6+YOLOv5模型)

摘要&#xff1a;基于深度学习的高精度奶牛检测识别系统可用于日常生活中或野外来检测与定位奶牛目标&#xff0c;利用深度学习算法可实现图片、视频、摄像头等方式的奶牛目标检测识别&#xff0c;另外支持结果可视化与图片或视频检测结果的导出。本系统采用YOLOv5目标检测模型…

locked勒索病毒利用零日漏洞,企业服务器数据瞬间遭受致命加密

目录 引言&#xff1a; 事件概述&#xff1a; .locked勒索病毒加密算法&#xff1a; 数据恢复建议&#xff1a; locked勒索病毒数据恢复案例&#xff1a; 什么叫零日漏洞&#xff1f; 对策建议&#xff1a; 引言&#xff1a; 近日&#xff0c;网络安全界再次爆发了一起…

RK3588平台开发系列讲解(系统篇)开机启动原因

文章目录 一、系统开机启动原因二、开机启动场景沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇章主要讲解平台系统开机启动原因介绍。 一、系统开机启动原因 开机原因记录文件在/proc/sys/kernel/boot_reason,那么开机后可以从这个文件中读取数值来获知本次开机…

锁升级:无锁、偏向锁、轻量级锁、重量级锁

锁升级 JDK 1.6之前&#xff0c;synchronized 还是一个重量级锁&#xff0c;是一个效率比较低下的锁。但是在JDK 1.6后&#xff0c;JVM为了提高锁的获取与释放效率对synchronized 进行了优化&#xff0c;引入了偏向锁和轻量级锁 &#xff0c;从此以后锁的状态就有了四种&#…

开源SCRM营销平台MarketGo-数据管理

一、概述 企业在私域运营的场景下&#xff0c;系统在运行中会产生一些用户数据和行为数据。 用户数据包含年龄&#xff0c;性别&#xff0c;生日&#xff0c;电话&#xff0c;用户标签&#xff0c;还有用户和员工的关系等信息。行为数据包含在SCRM中创建活动的用户事件&#…

自学黑客(网络安全)?一般人我劝你还是算了吧!

前言 博主本人 18年就读于一所普通的本科学校&#xff0c;21年 6 月在三年经验的时候顺利通过校招实习面试进入大厂&#xff0c;现就职于某大厂安全联合实验室。 我为啥说自学黑客&#xff08;网络安全&#xff09;&#xff0c;一般人我还是劝你算了吧。因为我就是那个不一般的…

【C++】c++11的新特性——右值引用/移动语义/lambda表达式

文章目录 C11介绍1. 统一的列表初始化1.1 {}初始化1.2 std::initializer_list 2. 一些关键字2.1 auto2.2 decltype2.3 nullptr 3. 范围for4. 右值引用和移动语义&#xff08;重点&#xff09;4.1 左值引用和右值引用4.2 右值引用的应用4.3 总结 5. 万能引用和完美转发6. 新的类…

mysql小表驱动大表

摘要&#xff1a; 小表驱动大表为了减少匹配时的数据量 判断谁做小表时&#xff0c;是比较算上过滤条件后的数量 left join时手动设置小表驱动大表 inner join时优化器会自动小表驱动大表 course–100条数据 student_info–100w条数据 优化器会选择小表驱动大表&#xff08;这里…

使用VMware Workstation一步一步安装Rocky Linux 9

目录 目录 背景 准备阶段 新建虚拟机 安装Rocky Linux 进入系统 背景 Rocky Linux 简介 企业Linux&#xff0c;社区方式。 Rocky Linux是一个开源的企业操作系统&#xff0c;旨在与红帽企业Linux100%兼容。社区正在大力发展。 Rocky Linux 9.2 于2023年5月16日发布&a…

计算机组成原理(六)指令系统

一、指令的基本格式 1.1机器指令的相关概念 指令集(Instruction Set) 某机器所有机器指令的集合 *定长指令集 指令集中的所有指令长度均相同!取指令控制简单*不定长指令集 指令集中的所有指令长度有长、有短 操作码 (1)长度固定 用于指令字长较长的情况RISC 如IBM370操作码8位…

第四章 Linux网络编程 4.1 网络结构模式 4.2MAC地址、IP地址、端口

第四章 Linux网络编程 4.1 网络结构模式 C/S结构 简介 服务器 - 客户机&#xff0c;即 Client - Server&#xff08;C/S&#xff09;结构。C/S 结构通常采取两层结构。服务器负责数据的管理&#xff0c;客户机负责完成与用户的交互任务。客户机是因特网上访问别人信息的机器…

Ubuntu16.04.7+Qt15.5.0环境配置(一条龙讲解)

目录 1、下载并安装Ubuntu 2、Qt下载与安装 3、Qt环境配置 4、设置编译套件 5、创建qt快速启动脚本 1、下载并安装Ubuntu Ubuntu16.04.7下载链接https://releases.ubuntu.com/xenial/ 安装步骤省略。 2、Qt下载与安装 在Qt5.15之后的版本&#xff0c;官方都不提供离线安装…

Allegro因为精度问题导致走线连接不上的解决办法

Allegro因为精度问题导致走线连接不上的解决办法 在用Allegro做PCB设计的时候,尤其是从其它单板上导数据过来的时候,有时会因为精度不一致导致连接不上,如下图 线和过孔因为精度有微小的连接偏差 一般来说,可以逐个重新连接一下,但是如果连接点位比较多的话,需要花费较多…

在windos中同时使用gitee与github

1.为什么这样做&#xff1f; 原因非常简单&#xff0c;我们遇到自己喜欢的git仓库后&#xff0c;通常会将他们克隆到我们本地电脑上&#xff0c;但这个时候会有一个问题&#xff0c;就是我们喜欢的仓库有可能是gitee仓库&#xff0c;也有可能是github仓库&#xff0c;这个时候…

Windows YOLO v8训练自己的数据集

YOLO v8 训练自己的数据集 环境准备YOLO v8创建自己的数据集1.首先准备了VOC 格式的数据集2.然后确定用于训练、测试的数据3.将VOC格式标注转为YOLO 标注4.配置数据文件 yaml 配置 YOLO v8安装和训练安装依赖包训练 环境准备 这里我的环境是Windows 环境 YOLO v8 下载链接&a…

dma-fence使用demo

dma-fence是内核中一种比较常用的同步机制,本身的实现和使用并不复杂&#xff0c;其只有两种状态signaled和unsignaled。可能正是因为其本身的精简&#xff0c;在融入其他概念中时&#xff0c;在不同的环境下&#xff0c;赋予了dma-fence不同的含义。所以通常需要根据dma-fence…

DragGAN论文阅读

文章目录 摘要问题3. 算法&#xff1a;3.1 基于点的交互式操作3.2 运动监督3.3 点跟踪 4. 实验4.1 质量评估4.2 量化评估4.3 讨论 结论 论文&#xff1a; 《Drag Your GAN: Interactive Point-based Manipulation on the Generative Image Manifold》 github&#xff1a; htt…

一文学会Git管理代码仓库

文章目录 一、预备知识1.Linux常用指令2.vim编辑器基本使用 二、Git基础1.工作区、暂存区、本地仓库和远程仓库2.git init3.git add4.git status5.git commit6.git push7.git pull8.git 分支管理(branch、checkout、merge)9.git clone和log10.git diff11.git fetch12.git rm13.…

汽车IVI中控开发入门及进阶(九):显示屏

前言: 显示屏Display panel和触控屏Touch panel,可以说随着汽车四化的进展,屏越来越多,越来越大,越来越高清,成为IVI中控、智能座舱系统的重要组成部分。比如如下一个电阻触摸屏。 正文: 显示屏 主要功能就是显示,车载内容和信息的传递全靠显示屏,目前车载显示屏的…