Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

news2025/1/21 12:13:56

文章目录

  • 零、本讲学习目标
  • 一、基本操作
  • 二、默认数据源
    • (一)默认数据源Parquet
    • (二)案例演示读取Parquet文件
      • 1、在Spark Shell中演示
      • 2、通过Scala程序演示
  • 三、手动指定数据源
    • (一)format()与option()方法概述
    • (二)案例演示读取不同数据源
      • 1、读取房源csv文件
      • 2、读取json,保存为parquet
      • 3、读取jdbc数据源,保存为json文件
  • 四、数据写入模式
    • (一)mode()方法
    • (二)枚举类SaveMode
    • (三)案例演示不同写入模式
  • 五、分区自动推断
    • (一)分区自动推断概述
    • (二)分区自动推断演示
      • 1、建四个文件
      • 2、读取表数据
      • 3、输出Schema信息
      • 4、显示数据帧内容
    • (三)分区自动推断注意事项

零、本讲学习目标

  1. 学会使用默认数据源
  2. 学会手动指定数据源
  3. 理解数据写入模式
  4. 掌握分区自动推断

Spark SQL支持通过DataFrame接口对各种数据源进行操作。DataFrame可以使用相关转换算子进行操作,也可以用于创建临时视图。将DataFrame注册为临时视图可以对其中的数据使用SQL查询。

一、基本操作

  • Spark SQL提供了两个常用的加载数据和写入数据的方法:load()方法和save()方法。load()方法可以加载外部数据源为一个DataFrame,save()方法可以将一个DataFrame写入指定的数据源。

二、默认数据源

(一)默认数据源Parquet

  • 默认情况下,load()方法和save()方法只支持Parquet格式的文件,Parquet文件是以二进制方式存储数据的,因此不可以直接读取,文件中包括该文件的实际数据和Schema信息,也可以在配置文件中通过参数spark.sql.sources.default对默认文件格式进行更改。Spark SQL可以很容易地读取Parquet文件并将其数据转为DataFrame数据集。

(二)案例演示读取Parquet文件

  • 将数据文件users.parquet上传到master虚拟机/home
    在这里插入图片描述
  • 将数据文件users.parquet上传到HDFS的/input目录
    在这里插入图片描述

1、在Spark Shell中演示

  • 启动Spark Shell,执行命令:spark-shell --master spark://master:7077
    在这里插入图片描述
  • 加载parquet文件,返回数据帧
  • 执行命令:val userdf = spark.read.load("hdfs://master:9000/input/users.parquet")
    在这里插入图片描述
  • 执行命令:userdf.show(),查看数据帧内容
    在这里插入图片描述
  • 执行命令:userdf.select("name", "favorite_color").write.save("hdfs://master:9000/result"),对数据帧指定列进行查询,查询结果依然是数据帧,然后通过save()方法写入HDFS指定目录
    在这里插入图片描述
  • 查看HDFS上的输出结果
    在这里插入图片描述
  • 除了使用select()方法查询外,也可以使用SparkSession对象的sql()方法执行SQL语句进行查询,该方法的返回结果仍然是一个DataFrame。
  • 基于数据帧创建临时视图,执行命令:userdf.createTempView("t_user")在这里插入图片描述
  • 执行SQL查询,将结果写入HDFS,执行命令:spark.sql("select name, favorite_color from t_user").write.save("hdfs://master:9000/result2")
    在这里插入图片描述
  • 查看HDFS上的输出结果
    在这里插入图片描述

2、通过Scala程序演示

  • 创建Maven项目 - SparkSQLDemo
    在这里插入图片描述
  • 在pom.xml文件里添加依赖与插件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>net.hw.sparksql</groupId>
    <artifactId>SparkSQLDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.3.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  • 创建net.hw.sparksql包,在包里创建ReadParquet对象
    在这里插入图片描述
package net.hw.sparksql

import org.apache.spark.sql.SparkSession

/**
 * 功能:Parquet数据源
 * 作者:华卫
 * 日期:2022年05月01日
 */
object ReadParquet {
  def main(args: Array[String]): Unit = {
    // 本地调试必须设置,否则会报Permission Denied错误
    System.setProperty("HADOOP_USER_NAME", "root")
    // 创建或得到SparkSession
    val spark = SparkSession.builder()
      .appName("ReadParquet")
      .master("local[*]")
      .getOrCreate()
    // 加载parquet文件,返回数据帧
    val usersdf = spark.read.load("hdfs://master:9000/input/users.parquet")
    // 显示数据帧内容
    usersdf.show()
    // 查询DataFrame中指定列,结果写入HDFS
    usersdf.select("name","favorite_color")
      .write.save("hdfs://master:9000/result3")
  }
}
  • 运行程序,查看控制台结果
    在这里插入图片描述
  • 在HDFS查看输出结果
    在这里插入图片描述

三、手动指定数据源

(一)format()与option()方法概述

  • 使用format()方法可以手动指定数据源。数据源需要使用完全限定名(例如org.apache.spark.sql.parquet),但对于Spark SQL的内置数据源,也可以使用它们的缩写名(JSON、Parquet、JDBC、ORC、Libsvm、CSV、Text)。
  • 通过手动指定数据源,可以将DataFrame数据集保存为不同的文件格式或者在不同的文件格式之间转换。
  • 在指定数据源的同时,可以使用option()方法向指定的数据源传递所需参数。例如,向JDBC数据源传递账号、密码等参数。

(二)案例演示读取不同数据源

1、读取房源csv文件

  • 查看HDFS上/input目录里的house.csv文件
    在这里插入图片描述
  • 在spark shell里,执行命令:val house_csv_df = spark.read.format("csv").load("hdfs://master:9000/input/house.csv"),读取房源csv文件,得到房源数据帧
    在这里插入图片描述
  • 执行命令:house_csv_df.show(),查看房源数据帧内容
    在这里插入图片描述
  • 大家可以看到,house.csv文件第一行是字段名列表,但是转成数据帧之后,却成了第一条记录,这样显然是不合理的,怎么办呢?就需要用到option()方法来传递参数,告诉Spark第一行是表头header,而不是表记录。
  • 执行命令:val house_csv_df = spark.read.format("csv").option("header", "true").load("hdfs://master:9000/input/house.csv")
    在这里插入图片描述
  • 执行命令:house_csv_df.show(),查看房源数据帧内容
    在这里插入图片描述

2、读取json,保存为parquet

  • people.json上传到HDFS的/input目录
    在这里插入图片描述
  • 执行命令:val peopledf = spark.read.format("json").load("hdfs://master:9000/input/people.json")
    在这里插入图片描述
  • 执行命令:peopledf.show()
    在这里插入图片描述
  • 执行命令:peopledf.select("name", "age").write.format("parquet").save("hdfs://master:9000/result4")
    在这里插入图片描述
  • 查看生成的parquet文件
    在这里插入图片描述

3、读取jdbc数据源,保存为json文件

  • 查看student数据库里的t_user
    在这里插入图片描述
  • 执行命令
val userdf = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://master:3306/student")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "t_user")  
  .option("user", "root")  
  .option("password", "903213")
  .load()
  • 报错,找不到数据库驱动程序com.mysql.jdbc.Driver
    在这里插入图片描述
  • 解决问题,将数据库驱动程序拷贝到$SPARK_HOME/jars目录
    在这里插入图片描述
  • 将数据驱动程序分发到slave1和slave2虚拟机
    在这里插入图片描述
  • 执行命令
val userdf = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://master:3306/student")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "t_user")  
  .option("user", "root")  
  .option("password", "903213")
  .load()
  • 加载jdbc数据源成功,但是有个警告,需要通过设置useSSL=false来消除
    在这里插入图片描述
  • 执行命令
val userdf = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://master:3306/student?useSSL=false")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "t_user")  
  .option("user", "root")  
  .option("password", "903213")
  .load()

在这里插入图片描述

  • 执行命令:userdf.show()
    在这里插入图片描述
  • 执行命令:userdf.write.format("json").save("hdfs://master:9000/result5")
    在这里插入图片描述
  • 在虚拟机slave1查看生成的json文件,执行命令:hdfs dfs -cat /result5/*,

四、数据写入模式

(一)mode()方法

  • 在写入数据时,可以使用mode()方法指定如何处理已经存在的数据,该方法的参数是一个枚举类SaveMode
  • 使用SaveMode类,需要import org.apache.spark.sql.SaveMode;

(二)枚举类SaveMode

  • SaveMode.ErrorIfExists默认值。当向数据源写入一个DataFrame时,如果数据已经存在,就会抛出异常。
  • SaveMode.Append:当向数据源写入一个DataFrame时,如果数据或表已经存在,会在原有的基础上进行追加。
  • SaveMode.Overwrite:当向数据源写入一个DataFrame时,如果数据或表已经存在,就会将其覆盖(包括数据或表的Schema)。
  • SaveMode.Ignore:当向数据源写入一个DataFrame时,如果数据或表已经存在,就不会写入内容,类似SQL中的CREATE TABLE IF NOT EXISTS

(三)案例演示不同写入模式

  • 查看数据源:people.json
    在这里插入图片描述
  • 查询该文件name里,采用覆盖模式写入/result/result目录里本来有东西的
    在这里插入图片描述
  • 执行命令:val peopledf = spark.read.format("json").load("hdfs://master:9000/input/people.json")
    在这里插入图片描述
  • 导入SaveMode类,执行命令:peopledf.select("name").write.mode(SaveMode.Overwrite).format("json").save("hdfs://master:9000/result")
    在这里插入图片描述
  • 在slave1虚拟机上查看生成的json文件
    在这里插入图片描述
  • 查询age列,以追加模式写入HDFS的/result目录,执行命令:peopledf.select("age").write.mode(SaveMode.Append).format("json").save("hdfs://master:9000/result")
    在这里插入图片描述
  • 在slave1虚拟机上查看追加生成的json文件
    在这里插入图片描述

五、分区自动推断

(一)分区自动推断概述

  • 表分区是Hive等系统中常用的优化查询效率的方法(Spark SQL的表分区与Hive的表分区类似)。在分区表中,数据通常存储在不同的分区目录中,分区目录通常以“分区列名=值”的格式进行命名。
  • 以people作为表名,gender和country作为分区列,给出存储数据的目录结构
    在这里插入图片描述

(二)分区自动推断演示

1、建四个文件

  • 在master虚拟机上/home里创建如下目录及文件,其中目录people代表表名,gendercountry代表分区列,people.json存储实际人口数据
    在这里插入图片描述

2、读取表数据

  • 执行命令:spark-shell,启动Spark Shell
    在这里插入图片描述
  • 执行命令:val peopledf = spark.read.format("json").load("file:///home/people")
    在这里插入图片描述

3、输出Schema信息

  • 执行命令:peopledf.printSchema()
    在这里插入图片描述

4、显示数据帧内容

  • 执行命令:peopledf.show()
    在这里插入图片描述
  • 从输出的Schema信息和表数据可以看出,Spark SQL在读取数据时,自动推断出了两个分区列gendercountry,并将这两列的值添加到了数据帧peopledf中。

(三)分区自动推断注意事项

  • 分区列的数据类型是自动推断的,目前支持数字、日期、时间戳、字符串数据类型。若不希望自动推断分区列的数据类型,则可以在配置文件中将spark.sql.sources.partitionColumnTypeInference.enabled的值设置为false(默认为true,表示启用)。当禁用自动推断时,分区列将使用字符串数据类型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/527382.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解密Netty中的Reactor模式

文章目录 单线程Reactor模式多线程Reactor模式Reactor模式中IO事件的处理流程Netty中的通道ChannelNetty中的反应器ReactorNetty中的处理器HandlerNetty中的通道Channel和处理器Handler的协作组件Pipeline Reactor(反应器)模式是高性能网络编程在设计和架构方面的基础模式.Doug…

【MySQL速通篇004】这可能最详细的关于MySQL基础知识点的文章了

&#x1f341;前言 &#x1f451;作者主页&#xff1a;CSDN丨博客园 &#x1f3c6;学习交流&#xff1a;在下周周ovoの社区 &#x1f48e;这篇8000多字的博客也是花了我比较久的时间&#xff0c;基本覆盖很大一部分的MySQL知识点&#xff0c;可以说是非常的详细&#xff0c;感谢…

【我的创作纪念日】IC人仍在路上,不停歇……

机缘 平台今天提示我已经坚持创作3年了。如果不提醒的话&#xff0c;我自己都没什么感觉。三年时间说长也不长&#xff0c;说短呢&#xff0c;其实也不短了。截止今天我在CSDN累计发文213篇&#xff0c;上传资源117个。涉及领域包含&#xff1a;数字信号处理、FPGA设计、IC设计…

【自制视频课程】C++OpnecV基础35讲——序言

OpenCV简介 OpenCV是一个开源的计算机视觉库&#xff0c;它可以用于图像处理、计算机视觉、机器学习等领域。OpenCV最初是由英特尔公司开发的&#xff0c;后来成为了开源项目&#xff0c;现在由OpenCV开源社区维护。OpenCV提供了丰富的图像处理和计算机视觉算法&#xff0c;包括…

【YOLO】Windows 下 YOLOv8 使用 TensorRT 进行模型加速部署

本文全文参考文章为 win10下 yolov8 tensorrt模型加速部署【实战】 本文使用的代码仓库为 TensorRT-Alpha 注&#xff1a;其他 Yolov8 TensorRT 部署项目&#xff1a;YOLOv8 Tensorrt Python/C部署教程 一、前期准备工作 安装Visual Studio 2019或者Visual Studio 2022、Nvidi…

Shell脚本文本三剑客之awk编辑器(人类从不掩饰探索星空的愿望)

文章目录 一、awk简介二、awk工作原理三、awk命令格式四、awk命令的使用1.print操作按行输出文本2.print操作按字段截取输出文本3.使用BEGIN和END指定操作5.使用操作getline6.使用操作OFS7.配合数组使用 一、awk简介 awk是linux的一个强大的命令&#xff0c;具备强大的文本格式…

puppeteer-不需重构,无痛加强vue单页面应用的SEO,提升百度收录排名

背景 最近产品觉得我们网站在百度收录上排名太靠后了&#xff0c;又不肯花钱&#xff0c;就让我们想办法提升网站的SEO。由于项目是用vue3写的&#xff0c;并且已经迭代多个版本了&#xff0c;用nuxt实在不适宜&#xff0c;当然俺的开发水平也不够&#xff0c;周期也会拉得很长…

字典翻译EasyTrans简单使用分享

前言 最近太忙了&#xff0c;一直按在项目上摩擦&#xff0c;都没有时间写分享了。今天终于市把所有负责的模块都写完了&#xff0c;本次迭代引入了字典翻译&#xff0c;借这个机会顺便分享下。 一、什么是字典翻译 所谓的字典翻译其实简单理解就是一些不常更新的有键值对属性的…

什么是 Java 的内存模型?如何保证安全

Java 的内存模型定义了多线程程序中&#xff0c;不同线程之间如何共享和访问共享变量的规则。Java 内存模型的设计旨在保证线程安全和可见性&#xff0c;同时保证程序的性能。本文将介绍 Java 内存模型的基本概念、线程安全的实现方法以及如何使用 synchronized 和 volatile 关…

yolo 训练

这里写目录标题 分配训练集&Validation数量数据集读取读取全部文件夹替换路径 loss weightNMSBBox_IOUEIou Optimizer 分配训练集&Validation数量 validation_size training_size * validation_ratio / (1 - validation_ratio)training_size 219 validation_ratio …

基于Java+SpringMVC+vue+element实现前后端分离校园失物招领系统详细设计

基于JavaSpringMVCvueelement实现前后端分离校园失物招领系统详细设计 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末获…

单轴丝杠平台实现搬运功能

1. 功能说明 本文示例将实现R279样机单轴丝杠平台搬运的功能。 该机构是由一个丝杠模组和一个 舵机关节模组 组合而成&#xff0c;关节模组上安装了一个电磁铁。 注意限位开关【①触碰传感器、②近红外传感器】的安装位置&#xff1a; 2. 丝杠传动机构原理 丝杠传动机构是一个将…

基于海思Hi3531 ARM+K7 FPGA高性能综合视频图像处理平台

板卡概述 XM703是自主研制的一款基于PCIE总线架构的高性能综合视频图像处理平台&#xff0c;该平台采用Xilinx的高性能Kintex UltraScale系列FPGA加上华为海思的高性能视频处理器来实现。 华为海思的HI3531DV200是一款集成了ARM A53四核处理器性能强大的神经网络引擎&#xff…

最新域名查询-中文域名注册到期查询软件

最新域名查询 最新域名查询指的是查询最新注册的域名或者快速确认某个域名是否被注册等相关信息的工具。以下是一些常用的最新域名查询工具&#xff1a; 域名Whois查询工具&#xff1a;Whois查询是一种查询域名注册信息的方式&#xff0c;可以查询已经注册的域名的所有信息&am…

http/https

http 基本概念 超文本传输协议&#xff0c;是互联网应用最广泛的协议之一&#xff0c;用于从 WWW 服务器传输超文本到本地浏览器的传输协议&#xff0c;它可以使浏览器更加高效&#xff0c;使网络传输减少。 https 基本概念 HTTPS是HTTP over SSL的简称&#xff0c;即工作…

LTI连续线性时不变系统能控性证明(格拉姆判据、秩判据)

一、能控性和能达性 1.1、能控性和能达性的定义 能控性&#xff1a;如果在一个有限的时间间隔内&#xff0c;可以用幅值没有限制的输入作用&#xff0c;使偏离系统平衡状态的某个初始状态回复到平衡状态&#xff0c;就称这个初始状态是能控的。 能达性&#xff1a;系统在外控…

【网红营销】海外网红营销怎么做?及注意事项?

随着互联网的发展和全球化的进程&#xff0c;海外网红营销逐渐成为企业推广产品和服务的重要途径。海外网红可以借助其社交媒体平台上的影响力&#xff0c;帮助企业扩大品牌知名度、提升销售业绩。然而&#xff0c;海外网红营销存在着一定的挑战和风险&#xff0c;企业需要制定…

探索将大语言模型用作推荐系统

编者按&#xff1a;目前大语言模型主要问答、对话等场景&#xff0c;进行被动回答。是否可以将大模型应用于推荐系统&#xff0c;进行主动推送呢&#xff1f; 这篇文章回顾了可以将大模型作为推荐系统的理论基础&#xff0c;并重点描述了基于英文和阿拉伯语的购物数据集微调T5-…

菜鸟健身-新手使用哑铃锻炼手臂的动作与注意事项

目录 一、前言 二、哑铃锻炼手臂的好处 三、哑铃锻炼手臂的注意事项 四、哑铃锻炼手臂的基本动作 1. 哑铃弯举 2. 哑铃推举 3. 哑铃飞鸟 五、哑铃锻炼手臂的进阶动作 1. 哑铃侧平举 2. 哑铃俯身划船 六、哑铃锻炼手臂的训练计划 七、总结 一、前言 哑铃是一种非常…

2023年5月天津/南京/成都/深圳CDGA/CDGP数据治理认证报名

6月18日DAMA-CDGA/CDGP数据治理认证考试开放报名中&#xff01; 考试开放地区&#xff1a;北京、上海、广州、深圳、长沙、呼和浩特、杭州、南京、济南、成都、西安。其他地区凑人数中… DAMA-CDGA/CDGP数据治理认证班进行中&#xff0c;报名从速&#xff01; DAMA认证为数据管…