一、SparkSQL
前面的文章中使用 RDD
进行数据的处理,优点是非常的灵活,但需要了解各个算子的场景,需要有一定的学习成本,而 SQL
语言是一个大家十分熟悉的语言,如果可以通过编写 SQL
而操作RDD
,学习的成本便会大大降低,在大数据领域 SQL
已经是数一个非常重要的范式,在 Hadoop
生态圈中,我们可以通过 Hive
进而转换成 MapReduces
进行数据分析,在后起之秀的 Flink
中也有 FlinkSQL
来简化数据的操作。
SparkSQL
可以理解成是将 SQL
解析成:RDD + 优化
再执行。
SparkSQL 对比 Hive
SparkSQL | Hive | |
---|---|---|
计算方式 | 基于 RDD 在内存计算 | 转化为 MapReduces 需要磁盘IO读写 |
计算引擎 | Spark | MR、Spark、Tez |
性能 | 快 | 慢 |
元数据 | 无自身的元数据,可以与Hive metastore连接 | Hive metastore |
缓存表 | 支持 | 不支持 |
视图 | 支持 | 支持 |
ACID | 不支持 | 支持(hive 0.14) |
分区 | 支持 | 支持 |
分桶 | 支持 | 支持 |
SparkSQL 的适用场景
数据类型 | 说明 |
---|---|
结构化数据 | 有固定的 Schema ,例如:关系型数据库的表 |
半结构化数据 | 没有固定的 Schema,但是有结构,数据一般是自描述的,例如:JSON 数据 |
理解 DataFrame 和 DataSet
SparkSQL
的数据抽象是 DataFrame
和 DataSet
,底层都是RDD
。
DataFrame
可以理解为是一个分布式表,包括:RDD - 泛型 + Schema约束(指定了字段名和类型) + SQL操作 + 优化
。
DataSet
在 DataFrame
的基础上增加了泛型的概念。
例如:有文本数据,读取为 RDD
后,可以拥有如下数据:
1 | 小明 | 110@qq.com |
---|---|---|
2 | 小张 | 120@qq.com |
3 | 小王 | 130@qq.com |
如果转化为 DataFrame
,那就就可以拥有下面数据:
ID:bigint | 姓名:String | 邮箱:String |
---|---|---|
1 | 小明 | 110@qq.com |
2 | 小张 | 120@qq.com |
3 | 小王 | 130@qq.com |
如果转化为 DataSet ,那就就可以拥有下面数据:
ID:bigint | 姓名:String | 邮箱:String | 泛型 |
---|---|---|---|
1 | 小明 | 110@qq.com | user |
2 | 小张 | 120@qq.com | user |
3 | 小王 | 130@qq.com | user |
DataSet
跟DataFrame
还是有挺大区别的,DataFrame
开发都是写SQL
,但是DataSet
可以使用类似RDD
的API
。也可以理解成DataSet
就是存了个数据类型的RDD
。
二、通过 RDD 使用SparkSQL
如果是使用 Scala
或 Java
语言开发,需要引入 SparkSQL
的依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.1</version>
</dependency>
假如现在有如下文本文件,分别对应含义为:ID、名称、年龄、邮箱
1 小明 20 110.@qq.com
2 小红 29 120.@qq.com
3 李四 25 130.@qq.com
4 张三 30 140.@qq.com
5 王五 35 150.@qq.com
6 赵六 40 160.@qq.com
下面还是使用前面文章的方式读取文本为 RDD
,不过不同的是,我们将 RDD 转为 DataFrame
使用 SQL
的方式处理:
- Scala:
object SQLRddScala {
case class User(id: Int, name: String, age: Int, email: String)
def main(args: Array[String]): Unit = {
//声明 SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("sparksql")
.master("local[*]")
.getOrCreate()
//通过 SparkSession 获取 SparkContext
val sc = spark.sparkContext
//读取文件为 RDD
val text = sc.textFile("D://test/input1/")
//根据空格拆分字段
val rdd = text.map(_.split(" ")).map(s => User(s(0).toInt, s(1), s(2).toInt, s(3)))
//转化为 DataFrame,并指定 Schema
val dataFrame = spark.createDataFrame(rdd)
//打印 Schema
dataFrame.printSchema()
//查看数据
dataFrame.show()
//DSL 风格查询
dataFrame.select("id","name").filter("age >= 30").show()
//SQL 风格
//注册表
dataFrame.createOrReplaceTempView("user")
//执行 SQL 语言
spark.sql("select * from user where age >= 30").show()
//关闭资源
spark.stop()
}
}
- Java:
public class SQLRddJava {
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class User {
private Integer id;
private String name;
private Integer age;
private String email;
}
public static void main(String[] args) {
// 声明 SparkSession
SparkSession spark = SparkSession
.builder()
.appName("sparksql")
.master("local[*]")
.getOrCreate();
// 通过 SparkSession 获取 SparkContext
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
// 读取文件为 RDD
JavaRDD<String> text = sc.textFile("D://test/input1/");
//根据空格拆分字段
JavaRDD<User> rdd = text.map(s -> s.split(" ")).map(s -> new User(Integer.parseInt(s[0]), s[1], Integer.parseInt(s[2]), s[3]));
//转化为 DataFrame,并指定 Schema
Dataset<Row> dataFrame = spark.createDataFrame(rdd, User.class);
//打印 Schema
dataFrame.printSchema();
// 查看数据
dataFrame.show();
//DSL 风格查询
dataFrame.select("id","name").filter("age >= 30").show();
// SQL 风格
dataFrame.createOrReplaceTempView("user");
// 注册表
spark.sql("select * from user where age >= 30").show();
// 执行 SQL 语言
spark.stop();
}
}
- Python:
from pyspark.sql import SparkSession
import findspark
if __name__ == '__main__':
findspark.init()
# 声明 SparkSession
spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
# 通过 SparkSession 获取 SparkContext
sc = spark.sparkContext
# 读取文件为 RDD
text = sc.textFile("D:/test/input1/")
# 根据空格拆分字段
rdd = text.map(lambda s: s.split(" "))
# 转化为 DataFrame,并指定 Schema
dataFrame = spark.createDataFrame(rdd, ["id", "name", "age", "email"])
# 打印 Schema
dataFrame.printSchema()
# 查看数据
dataFrame.show()
# DSL 风格查询
dataFrame.select(["id","name"]).filter("age >= 30").show()
# SQL 风格
# 注册表
dataFrame.createOrReplaceTempView("user")
# 执行 SQL 语言
spark.sql("select * from user where age >= 30").show()
#关闭资源
spark.stop()
打印的 Schema 信息:
全部数据内容:
DSL 查询结果:
SQL 查询结果:
三、多数据源交互
在 SparkSession
中可以通过: spark.read.格式(路径)
的方式, 获取 SparkSQL
中的外部数据源访问框架 DataFrameReader
,DataFrameReader
有两种访问方式,一种是使用 load
方法加载,使用 format
指定加载格式, 还有一种是使用封装方法, 类似 csv, json, jdbc
等,这两种方式本质上一样,都是 load 的封装。
注意:如果使用 load
方法加载数据, 但是没有指定 format
的话, 默认是按照 Parquet
文件格式读取。
对于写数据SparkSQL
中增加了一个新的数据写入框架 DataFrameWriter
,同样也有两种使用方式,一种是使用 format
配合 save
,还有一种是使用封装方法,例如 csv, json, saveAsTable
等,参数如下:
组件 | 说明 |
---|---|
source | 写入目标, 文件格式等, 通过 format 方法设定 |
mode | 写入模式, 例如一张表已经存在, 如果通过 DataFrameWriter 向这张表中写入数据, 是覆盖表呢, 还是向表中追加呢? 通过 mode 方法设定 |
extraOptions | 外部参数, 例如 JDBC 的 URL, 通过 options, option 设定 |
partitioningColumns | 类似 Hive 的分区, 保存表的时候使用, 这个地方的分区不是 RDD 的分区, 而是文件的分区, 或者表的分区, 通过 partitionBy 设定 |
bucketColumnNames | 类似 Hive 的分桶, 保存表的时候使用, 通过 bucketBy 设定 |
sortColumnNames | 用于排序的列, 通过 sortBy 设定 |
其中一个很重要的参数叫做 mode
,表示指定的写入模式,可以传入Scala 对象表示或字符串表示,有如下几种方式:
Scala 对象表示 | 字符串表示 | 说明 |
---|---|---|
SaveMode.ErrorIfExists | “error” | 将 DataFrame 保存到 source 时, 如果目标已经存在, 则报错 |
SaveMode.Append | “append” | 将 DataFrame 保存到 source 时, 如果目标已经存在, 则添加到文件或者 Table 中 |
SaveMode.Overwrite | “overwrite” | 将 DataFrame 保存到 source 时, 如果目标已经存在, 则使用 DataFrame 中的数据完全覆盖目标 |
SaveMode.Ignore | “ignore” | 将 DataFrame 保存到 source 时, 如果目标已经存在, 则不会保存 DataFrame 数据, 并且也不修改目标数据集, 类似于 CREATE TABLE IF NOT EXISTS |
注意:如果没有指定 format
, 默认的 format
是 Parquet
1. 读写 CSV 格式
准备 CSV 文件:
- Scala:
object SQLCSV {
def main(args: Array[String]): Unit = {
//声明 SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("sparksql")
.master("local[*]")
.getOrCreate()
//读取 CSV
val csv = spark
.read
.schema("id int, name string, age int, email string")
.option("header", "true") //第一行为标题
.csv("D:/test/input1/test.csv")
csv.printSchema()
csv.show()
// SQL 操作
csv.createOrReplaceTempView("csv")
spark.sql("select * from csv where age >= 30").show()
//如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符
val csv1 = spark
.read
.schema("id int, name string, age int, email string")
.option("delimiter", " ")
.csv("D:/test/input1/test.txt")
csv1.printSchema()
csv1.show()
//写出CSV文件
csv1.write.mode(SaveMode.Overwrite).json("D:/test/output")
//写出查询结果
spark.sql("select * from csv where age <= 30")
.write.mode(SaveMode.Overwrite).csv("D:/test/output1")
spark.stop()
}
}
- Java:
public class SQLCSVJava {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("sparksql")
.master("local[*]")
.getOrCreate();
//读取 CSV
Dataset<Row> csv = spark.read()
.schema("id int, name string, age int, email string")
.option("header", "true") //第一行为标题
.csv("D:/test/input1/test.csv");
csv.printSchema();
csv.show();
// SQL 操作
csv.createOrReplaceTempView("csv");
spark.sql("select * from csv where age >= 30").show();
//如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符
Dataset<Row> csv1 = spark.read()
.schema("id int, name string, age int, email string")
.option("delimiter", " ") //第一行为标题
.csv("D:/test/input1/test.txt");
csv1.printSchema();
csv1.show();
//写出CSV文件
csv1.write().mode(SaveMode.Overwrite).json("D:/test/output");
//写出查询结果
spark.sql("select * from csv where age <= 30")
.write().mode(SaveMode.Overwrite).csv("D:/test/output1");
spark.close();
}
}
- Python:
from pyspark.sql import SparkSession,DataFrameWriter
import findspark
if __name__ == '__main__':
findspark.init()
# 声明 SparkSession
spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
# 读取 CSV
csv = spark.read \
.schema("id int, name string, age int, email string") \
.option("header", "true") \
.csv("D:/test/input1/test.csv")
csv.printSchema()
csv.show()
# SQL 操作
csv.createOrReplaceTempView("csv")
spark.sql("select * from csv where age >= 30").show()
# 如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符
csv1 = spark.read \
.schema("id int, name string, age int, email string") \
.option("delimiter", " ") \
.csv("D:/test/input1/test.txt")
csv1.printSchema()
csv1.show()
# 写出CSV文件
csv1.write.mode("overwrite").json("D:/test/output")
# 写出查询结果
spark.sql("select * from csv where age <= 30").write.mode("overwrite").csv("D:/test/output1")
#关闭资源
spark.stop()
存储的 csv :
2. 读写Parquet 格式文件
先将上面 csv
文件转为 Parquet
文件:
//读取 CSV
Dataset<Row> csv = spark.read()
.schema("id int, name string, age int, email string")
.option("header", "true") //第一行为标题
.csv("D:/test/input1/test.csv");
// 转化为 Parquet 文件
csv.write().mode(SaveMode.Overwrite).parquet("D:/test/output3");
将该文件名修改为 test.parquet
方便下面测试:
读取 Parquet 格式文件:
- Scala:
object SQLParquet {
def main(args: Array[String]): Unit = {
//声明 SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("sparksql")
.master("local[*]")
.getOrCreate()
//读取 parquet
val parquet = spark.read.parquet("D:/test/output3/test.parquet")
parquet.printSchema()
parquet.show()
// SQL 操作
parquet.createOrReplaceTempView("parquet")
spark.sql("select * from parquet where age >= 30").show()
//写入 Parquet 的时候指定分区
parquet.write.mode(SaveMode.Overwrite).partitionBy("age").csv("D:/test/output5")
spark.stop()
}
}
- Java
public class SQLParquetJava {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("sparksql")
.master("local[*]")
.getOrCreate();
//读取 parquet
Dataset<Row> parquet = spark.read().parquet("D:/test/output3/test.parquet");
parquet.printSchema();
parquet.show();
// SQL 操作
parquet.createOrReplaceTempView("parquet");
spark.sql("select * from parquet where age >= 30").show();
//写入 Parquet 的时候指定分区
parquet.write().mode(SaveMode.Overwrite).partitionBy("age").parquet("D:/test/output5");
spark.close();
}
}
- Python:
from pyspark.sql import SparkSession,DataFrameWriter
import findspark
if __name__ == '__main__':
findspark.init()
# 声明 SparkSession
spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
# 读取 parquet
parquet = spark.read.parquet("D:/test/output3/test.parquet")
parquet.printSchema()
parquet.show()
# SQL操作
parquet.createOrReplaceTempView("parquet")
spark.sql("select * from parquet where age >= 30").show()
# 写入Parquet的时候指定分区
parquet.write.mode("overwrite").partitionBy("age").csv("D:/test/output5")
#关闭资源
spark.stop()
SQL 查询结果:
输出目录:
3. 读写 JSON 格式文件
将上面CSV
数据转化为 JSON
:
//读取 CSV
Dataset<Row> csv = spark.read()
.schema("id int, name string, age int, email string")
.option("header", "true") //第一行为标题
.csv("D:/test/input1/test.csv");
// 转化为 JSON 文件
csv.write().mode(SaveMode.Overwrite).json("D:/test/output6");
读写 JSON 格式文件:
- Scala:
object SQLJson {
def main(args: Array[String]): Unit = {
//声明 SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("sparksql")
.master("local[*]")
.getOrCreate()
//读取 JSON
val json = spark.read.json("D:/test/output6/test.json")
json.printSchema()
json.show()
// SQL 操作
json.createOrReplaceTempView("parquet")
spark.sql("select * from parquet where age >= 30").show()
//写入 JSON
json.filter("age < 30 ").write.json("D:/test/output7")
spark.stop()
}
}
- Java:
public class SQLJsonJava {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("sparksql")
.master("local[*]")
.getOrCreate();
//读取 JSON
Dataset<Row> json = spark.read().json("D:/test/output6/test.json");
json.printSchema();
json.show();
// SQL 操作
json.createOrReplaceTempView("parquet");
spark.sql("select * from parquet where age >= 30").show();
//写入 JSON
json.filter("age < 30 ").write().json("D:/test/output7");
spark.close();
}
}
- Python:
from pyspark.sql import SparkSession,DataFrameWriter
import findspark
if __name__ == '__main__':
findspark.init()
# 声明 SparkSession
spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
# 读取JSON
json = spark.read.json("D:/test/output6/test.json")
json.printSchema()
json.show()
# SQL操作
json.createOrReplaceTempView("parquet")
spark.sql("select * from parquet where age >= 30").show()
# 写入JSON
json.filter("age < 30 ").write.json("D:/test/output7")
#关闭资源
spark.stop()
4. 读写 MySQL 格式文件
Scala
和 Scala
项目需要引入 MySQL
的依赖:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
创建表:
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`email` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4;
写入测试数据:
INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (1, '小明', 20, '110.@qq.com');
INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (2, '小红', 29, '120.@qq.com');
INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (3, '李四', 25, '130.@qq.com');
INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (4, '张三', 30, '140.@qq.com');
INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (5, '王五', 35, '150.@qq.com');
INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (6, '赵六', 40, '160.@qq.com');
读写 MySQL 格式文件:
- Scala:
object SQLMySql {
def main(args: Array[String]): Unit = {
//声明 SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName("sparksql")
.master("local[*]")
.getOrCreate()
//读取 mysql
val prop = new Properties
prop.setProperty("user", "root")
prop.setProperty("password", "root")
val user = spark.read.jdbc(
"jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
"user",
prop)
user.printSchema()
user.show()
// SQL 操作
user.createOrReplaceTempView("user")
spark.sql("select * from user where age >= 30").show()
//写入表信息,没有表自动创建
user.filter("age < 30 ")
.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
"user2",
prop)
spark.stop()
}
}
- Java:
public class SQLMySqlJava {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("sparksql")
.master("local[*]")
.getOrCreate();
//读取 mysql
Properties prop = new Properties();
prop.setProperty("user","root");
prop.setProperty("password","root");
Dataset<Row> user = spark.read().jdbc(
"jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
"user",
prop);
user.printSchema();
user.show();
// SQL 操作
user.createOrReplaceTempView("user");
spark.sql("select * from user where age >= 30").show();
//写入表信息,没有表自动创建
user.filter("age < 30 ").write().mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
"user2",
prop
);
spark.close();
}
}
- Python
使用Python
读写MySql
需要将MySql
的驱动放到java
安装目录的jre\lib\ext
目录下:
from pyspark.sql import SparkSession, SQLContext
import findspark
if __name__ == '__main__':
findspark.init()
# 声明 SparkSession
spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
properties = {'user': 'root', 'password': 'root'}
url = "jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
user = spark.read.jdbc(url=url, table="user", properties=properties)
user.printSchema()
user.show()
# SQL操作
user.createOrReplaceTempView("user")
spark.sql("select * from user where age >= 30").show()
# 写入表信息,没有表自动创建
user.filter("age < 30 ").write.mode("overwrite").jdbc(url=url, table="user2", properties=properties)
# 关闭资源
spark.stop()
SQL 查询结果:
MySQL 表信息:
四、SparkOnHive
在 Hive
中可以将运算引擎改为 Spark
,也就是 HiveONSpark
不过这种方式严重依赖 Hive
,已经淘汰,而 SparkOnHvie
是在 SparkSQL
诞生之后提出的,仅仅使用 Hive
的元数据(库、表、字段、位置等),剩下的全部由 Spark
进行语法解析、物理执行计划、SQL
优化等。
由于远程模式下 Hive
的元数据是由 metastore
服务控制,因此确保metastore
服务正常启动,如果对此不了解,可以参考下面文章:
https://xiaobichao.blog.csdn.net/article/details/127717080
注意:spark3.0.1
整合hive
要求hive
版本>=2.3.7
Scala
和 Java
项目需要引入 spark-hive
的依赖:
<!--SparkSQL+ Hive依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.1</version>
</dependency>
- Scala:
object SparkOnHive {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val spark = SparkSession.builder
.appName("sparksql")
.master("local[*]")
// 实际开发中可以根据集群规模调整大小,默认200
.config("spark.sql.shuffle.partitions", "8")
// 指定 Hive 数据库在 HDFS 上的位置
.config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")
// hive metastore 的地址
.config("hive.metastore.uris", "thrift://node1:9083")
// 开启对hive语法的支持
.enableHiveSupport
.getOrCreate
// 查询全部数据库
spark.sql("show databases").show()
// 使用 bxc 库
spark.sql("use bxc").show()
// 查询全部表
spark.sql("show tables").show()
// 创建表
spark.sql("create table if not exists `user2`(" +
" id int comment 'ID'," +
" name string comment '名称'," +
" age int comment '年龄'," +
" email string comment '邮箱'" +
") comment '用户表'" +
"row format delimited " +
"fields terminated by ',' " +
"lines terminated by '\n' ").show()
spark.sql("show tables").show()
//查询数据
spark.sql("select * from `user`").show()
spark.stop()
}
}
- Java
public class SparkOnHiveJava {
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME","root");
SparkSession spark = SparkSession
.builder()
.appName("sparksql")
.master("local[*]")
// 实际开发中可以根据集群规模调整大小,默认200
.config("spark.sql.shuffle.partitions", "8")
// 指定 Hive 数据库在 HDFS 上的位置
.config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")
// hive metastore 的地址
.config("hive.metastore.uris", "thrift://node1:9083")
// 开启对hive语法的支持
.enableHiveSupport()
.getOrCreate();
// 查询全部数据库
spark.sql("show databases").show();
// 使用 bxc 库
spark.sql("use bxc").show();
// 查询全部表
spark.sql("show tables").show();
// 创建表
spark.sql(
"create table if not exists `user2`(" +
" id int comment 'ID'," +
" name string comment '名称'," +
" age int comment '年龄'," +
" email string comment '邮箱'" +
") comment '用户表'" +
"row format delimited " +
"fields terminated by ',' " +
"lines terminated by '\n' ").show();
// 查询全部表
spark.sql("show tables").show();
//查询数据
spark.sql("select * from `user`").show();
spark.stop();
}
}
- Python:
from pyspark.sql import SparkSession, SQLContext
import findspark
if __name__ == '__main__':
findspark.init()
# 声明 SparkSession
spark = SparkSession.builder\
.appName('sparksql')\
.master("local[*]") \
.config("spark.sql.shuffle.partitions", "8") \
.config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse") \
.config("hive.metastore.uris", "thrift://node1:9083") \
.enableHiveSupport() \
.getOrCreate()
# 查询全部数据库
spark.sql("show databases").show()
# 使用bxc库
spark.sql("use bxc").show()
# 查询全部表
spark.sql("show tables").show()
# 创建表
spark.sql("create table if not exists `user2`(" +
" id int comment 'ID'," +
" name string comment '名称'," +
" age int comment '年龄'," +
" email string comment '邮箱'" +
") comment '用户表'" +
"row format delimited " +
"fields terminated by ',' " +
"lines terminated by '\n' ").show()
spark.sql("show tables").show()
# 查询数据
spark.sql("select * from `user`").show()
spark.stop()
查看所有库:
查看全部表:
查询表信息: