一、默认数据源
案例演示读取Parquet文件
查看Spark的样例数据文件users.parquet
1、在Spark Shell中演示
启动Spark Shell
查看数据帧内容
查看数据帧模式
对数据帧指定列进行查询,查询结果依然是数据帧,然后通过write成员的save()方法写入HDFS指定目录
查看HDFS上的输出结果
执行SQL查询
查看HDFS上的输出结果
课堂练习1、将4.1节的student.txt文件转换成student.parquet
得到学生数据帧 - studentDF
将学生数据帧保存为parquet文件
查看生成的parquet文件
复制parquet文件到/datasource/input目录
课堂练习2、读取student.parquet文件得到学生数据帧,并显示数据帧内容
2、在IntelliJ IDEA里演示
在这里插入图片描述
将java目录改成scala目录
在pom.xml文件里添加相关依赖,设置源程序文件夹
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.cxf.sql</groupId>
<artifactId>SparkSQLDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.15</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
</build>
</project>
log4j.rootLogger=ERROR, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spark.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
<description>only config in clients</description>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
</property>
</configuration>
创建net.cxf.sql.day01包,在包里创建ReadParquetFile对象
package net.cxf.sql.day01
import org.apache.spark.sql.SparkSession
/**
* 功能:读取Parquet文件
* 作者:cxf
* 日期:2023年06月07日
*/
object ReadParquetFile {
def main(args: Array[String]): Unit = {
// 创建或得到Spark会话对象
val spark = SparkSession.builder()
.appName("ReadParquetFile")
.master("local[*]")
.getOrCreate()
// 加载student.parquet文件,得到数据帧
val studentDF = spark.read.load("hdfs://master:9000/datasource/input/student.parquet")
// 显示学生数据帧内容
studentDF.show
// 查询20岁以上的女生
val girlDF = studentDF.filter("gender = '女' and age > 20")
// 显示女生数据帧内容
girlDF.show
// 保存查询结果到HDFS(保证输出目录不存在)
girlDF.write.save("hdfs://master:9000/datasource/output")
}
}
运行程序,查看控制台结果
在HDFS查看输出结果
二、手动指定数据源
(一)案例演示读取不同数据源
1、读取csv文件
查看Spark的样例数据文件people.csv
将people.csv文件上传到HDFS的/datasource/input目录,然后查看文件内
读取人员csv文件,得到人员数据帧
查看人员数据帧内容
查看人员数据帧内容
2、读取json,保存为parquet
查看people.json文件
将people.json上传到HDFS的/datasource/input目录,并查看其内容
查看生成的parquet文件
现在读取/datasource/input/people.parquet文件得到人员数据帧
查看人员数据帧内容
3、读取jdbc数据源,保存为json文件
查看student数据库里的t_user表
报错,找不到数据库驱动程序com.mysql.jdbc.Driver
将数据库驱动程序上传到master虚拟机的/opt目录
将数据库驱动程序拷贝到$SPARK_HOME/jars目录,
加载jdbc数据源成功,但是有个警告,需要通过设置useSSL=false来消除
执行命令:userdf.show()
在虚拟机slave1查看生成的json文件
三、数据写入模式
案例演示不同写入模式
查看数据源
查询该文件name里,采用覆盖模式写入/result,/result目录里本来有东西的
导入SaveMode类
在slave1虚拟机上查看生成的json文件
查询age列
在slave1虚拟机上查看追加生成的json文件
四、分区自动推断
(一)分区自动推断概述
以people作为表名,gender和country作为分区列,给出存储数据的目录结构
(二)分区自动推断演示
1、建四个文件
在master虚拟机上/home里创建如下目录及文件,其中目录people代表表名,gender和country代表分区列,people.json存储实际人口数据
2、读取表数据
启动Spark Shell
3、输出Schema信息
4、显示数据帧内容