这是本人的学习过程,看到的同道中人祝福你们心若有所向往,何惧道阻且长;
但愿每一个人都像星星一样安详而从容的,不断沿着既定的目标走完自己的路程;
最后想说一句君子不隐其短,不知则问,不能则学。
如果大家觉得我写的还不错的话希望可以收获关注、点赞、收藏(谢谢大家)
文章目录
- 一、SparkSQL概述
- 1.1 什么是SparkSQL
- 1.2 为什么要有SparkSQL
- 1.3 SparkSQL的发展
- 1.4 SparkSQL的特点
- 二、SparkSQL 编程
- 2.1 SparkSession 新的起始点
- 2.2 常用方式
- 2.2.1 方法调用案例实操
- 2.2.2 SQL使用方式
- 2.2.3 DSL特殊语法
- 2.3 SQL语法的用户自定义函数
- 2.3.1 UDF
- 2.3.2 UDAF
- 2.3.3 UDTF(没有)
- 3、SparkSQL数据的加载与保存
- 3.1 读取和保持文件
- 3.1.1 CSV文件
- 3.1.2 JSON文件
- 3.1.3 Parquet文件
- 3.2 与MySQL交互(前提是自己的数据要有相关的表)
- 3.3 与Hive交互
- 3.3.1 Linux中的交互
- 3.3.2 IDEA中的交互
一、SparkSQL概述
1.1 什么是SparkSQL
Spark是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,SparkSQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,SparkSQL使用这些额外的信息来执行额外的优化。与SparkSQL交互的方式有很多种,包括SQL和DatasetAPI。结算时,使用相同的执行引擎,与你用于表计算的API/语言无关。
1.2 为什么要有SparkSQL
1.3 SparkSQL的发展
1、发展历史
RDD(Spark1.0)=> Dateframe(Spark1.3) =>Dataset(Spark1.6)
如果同样的数据都给到这三个数据结构,它们分别计算之后,都会给出相同的结果。
不同的是它们执行效率和执行方式。在现在的版本中,dataset性能最好,已经成为了唯一使用的接口。其中Dataframe已经在底层被看作是特殊泛型的DataSet。
2、三者的共性
(1)RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理大型数据通过便利。
(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子实,三者才会开始遍历运算。
(3)三者有许多共同的函数,例如filter,sortby等
(4)三者都会根据Spark的内存情况自动缓存运算。
(5)三者都有分区的概念
1.4 SparkSQL的特点
1、易整合:无缝的整合了SQL查询和Spark编程
2、统一的数据访问方式:使用相同的方式连接不同的数据源
3、兼容Hive:在已有的仓库上直接运行SQL或者HQL
4、标准的数据连接:通过JDBC或者ODBC来连接
二、SparkSQL 编程
2.1 SparkSession 新的起始点
在老的版本中,SparkSQL提供两种SQL查询起始点:
(1) 一个叫SQLContext,用于Spark自己提供的SQL查询;
(2)一个叫HiveContext,用于连接Hive的查询。
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。
SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。
2.2 常用方式
2.2.1 方法调用案例实操
1、创建一个maven工程SparkSQL
2、输入文件夹准备:在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.json。并输入如下内容:
{"age":20,"name":"qiaofeng"}
{"age":19,"name":"xuzhu"}
{"age":18,"name":"duanyu"}
{"age":22,"name":"qiaofeng"}
{"age":11,"name":"xuzhu"}
{"age":12,"name":"duanyu"}
4、需求
统计人次和统计每人最大年龄
5、在pom.xml文件中添加spark-sql的依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
6、代码实现
(1)添加javaBean的User
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
/**
* @ClassName User
* @Description TODO
* @Author Zouhuiming
* @Date 2023/6/30 20:45
* @Version 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
public class User implements Serializable {
private String name;
private Long age;
}
(2)统计每个人的最大年龄
import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import scala.Tuple2;
/**
* @ClassName Test01_MaxAge
* @Description TODO
* @Author Zouhuiming
* @Date 2023/6/30 20:48
* @Version 1.0
*/
public class Test01_MaxAge {
public static void main(String[] args) {
//1、创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_MaxAge");
//2、创建SparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
//3、获取DS
Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
//4、将json中的行转换为User对象
Dataset<User> mapDS = jsonDS.map(new MapFunction<Row, User>() {
@Override
public User call(Row value) throws Exception {
return new User(value.getString(1), value.getLong(0));
}
}, Encoders.bean(User.class));
System.out.println("--------由jsonDS转换为UserDS后输出--------");
mapDS.show();
//5、分组
KeyValueGroupedDataset<String, User> groupByKeyDS = mapDS.groupByKey(new MapFunction<User, String>() {
@Override
public String call(User value) throws Exception {
return value.getName();
}
}, Encoders.STRING());
//6、统计,取最大值
Dataset<Tuple2<String, User>> resultDS = groupByKeyDS.reduceGroups(new ReduceFunction<User>() {
@Override
public User call(User v1, User v2) throws Exception {
return new User( v1.getName(),Math.max(v1.getAge(), v2.getAge()));
}
});
System.out.println("--------------输出结果-------------");
resultDS.show();
//x、关闭资源
spark.close();
}
}
运行结果:
(3)统计人次代码编写
import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
/**
* @ClassName Test02_UserVisitsCount
* @Description TODO
* @Author Zouhuiming
* @Date 2023/6/30 21:18
* @Version 1.0
*/
public class Test02_UserVisitsCount {
/**
* 在sparkSql中DS直接支持的转换算子有:
* map(底层已经优化为mapPartition)、mapPartition、flatMap、groupByKey(聚合算子全部由groupByKey开始)、
* filter、distinct、coalesce、repartition、sort和orderBy(不是函数式的算子,不过不影响使用)。
*/
public static void main(String[] args) {
//1、创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_UserVisitsCount");
//2、创建SparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
//3、获取DS(从文件中按行读取数据文件)
Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
//4、将jsonDS中每行数据转换为对象
Dataset<User> UserDS = jsonDS.as(Encoders.bean(User.class));
System.out.println("-------------由jsonDS转换为UserDS输出-------------");
UserDS.show();
//5、根据字段分组
RelationalGroupedDataset groupByDS = UserDS.groupBy(new Column("name"));
System.out.println("--------由UserDS根据字段Name分组后输出----------");
System.out.println(groupByDS.toString());
//6、统计次数
Dataset<Row> countDS = groupByDS.count();
System.out.println("--------------由分组后的groupDS进行count统计后输出-------------");
countDS.show();
//x、关闭资源
spark.close();
}
}
运行结果:
在SparkSQL中DS直接支持的转换算子有:
map(底层已经优化为mapPartition)、mapPartition·、flatMap、groupByKey(聚合算子全部由groupByKey开始)、filter、distinct、coalesce、repartition、sort和orderBy(不是函数式算子,不够不影响使用)。
2.2.2 SQL使用方式
1、需求:查询年龄大于18岁的用户信息
2、代码编写
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* @ClassName Test03_AgeGt18
* @Description TODO
* @Author Zouhuiming
* @Date 2023/7/1 9:24
* @Version 1.0
*/
public class Test03_AgeGt18_sql {
public static void main(String[] args) {
//1、创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_AgeGt18_sql");
//2、创建SparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
//3、获取DS
Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
System.out.println("------------读取数据之后输出jsonDS------------");
jsonDS.show();
//4、根据jsonDS创建临时视图,视图的生命周期和sparkSession绑定,"orReplace"表示可覆盖
jsonDS.createOrReplaceTempView("user_info");
//5、查询数据中年龄大于18的用户
Dataset<Row> sql = spark.sql("select age,name from user_info where age>18");
//6、输出结果
System.out.println("---------------输出年龄大于18的结果---------------");
sql.show();
//x、关闭资源
spark.close();
}
}
运行结果:
2.2.3 DSL特殊语法
1、需求:查询年龄大于18的用户信息
2、特点:需要导入特殊的依赖:import static org.apache.spark.sql.functions.col;
3、代码实现
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* @ClassName Test04_AgeGt18_DSL
* @Description TODO
* @Author Zouhuiming
* @Date 2023/7/1 9:31
* @Version 1.0
*/
public class Test04_AgeGt18_DSL {
public static void main(String[] args) {
//1、创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test04_AgeGt18_DSL");
//2、创建SparkSession
SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
//3、读取文件内容,每行按照jsonLine解析
Dataset<Row> jsonDS = sparkSession.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
System.out.println("------------读取数据之后输出jsonDS------------");
jsonDS.show();
//4、查询年龄大于18的用户信息
Dataset<Row> whereDS = jsonDS.select(new Column("name"), new Column("age")).where(new Column("age").gt(18));
System.out.println("-------------------输出whereDS的结果-----------------");
whereDS.show();
Dataset<Row> filterDS = jsonDS.select(new Column("name"), new Column("age").plus(1).as("newAge")).filter(new Column("age").gt(18));
System.out.println("-----------------输出filterDS的结果------------------");
filterDS.show();
//5、关闭sparkSession
sparkSession.close();
}
}
运行结果:
2.3 SQL语法的用户自定义函数
2.3.1 UDF
1、UDF:一进一出
2、特点:需要导入依赖:import static org.apache.spark.sql.functions.udf;
3、代码实现
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import static org.apache.spark.sql.functions.udf;
/**
* @ClassName Test05_UDF
* @Description TODO
* @Author Zouhuiming
* @Date 2023/7/1 9:41
* @Version 1.0
*/
public class Test05_UDF {
public static void main(String[] args) {
//1、创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test05_UDF");
//2、创建SparkSession
SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
//3、读取文件,每行按照jsonLine解析
Dataset<Row> jsonDS = sparkSession.read().json("input/user.json");
//4、创建UDF函数addName:为输出参数拼接 is numberOne
UserDefinedFunction addName = udf(new UDF1<String, String>() {
@Override
public String call(String s) throws Exception {
return s + " is numberone";
}
}, DataTypes.StringType);
//5、为创建的函数进行注册
sparkSession.udf().register("addName",addName);
//6、根据jsonDS创建视图user_info
jsonDS.createOrReplaceTempView("user_info");
//7、在SQL中使用函数
Dataset<Row> sqlDS = sparkSession.sql("select addName(name) as newName,age from user_info");
//8、输出结果
System.out.println("------------------输出结果--------------");
sqlDS.show();
}
}
运行结果
2.3.2 UDAF
1、UDAF:输入多行,返回一行,通常和groupBy一起使用,如果直接使用UDAF函数,默认将所有数据合并在一起。
2、特点:需要引入依赖:import static org.apache.spark.sql.functions.udaf
3、Spark3.x推荐使用extends Aggregator自定义UDAF,属于强类型的Dataset方式。
4、Spark2.x使用extends UserDefinedAggregateFunction,属于弱类型的DataFrame
5、代码实现
(1)创建Buffer类
package code;
import bean.Buffer;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
/**
* @ClassName Test06_UDAF
* @Description TODO
* @Author Zouhuiming
* @Date 2023/7/1 9:51
* @Version 1.0
*/
public class Test06_Buffer extends Aggregator<Long, Buffer,Double> {
public Test06_Buffer() {
}
//初始化
@Override
public Buffer zero() {
return new Buffer(0L,0L);
}
//单分区聚合
@Override
public Buffer reduce(Buffer b, Long a) {
b.setSum(b.getSum()+a);
b.setCount(b.getCount()+1);
return b;
}
//多分区聚合
@Override
public Buffer merge(Buffer b1, Buffer b2) {
b1.setSum(b1.getSum()+ b2.getSum());
b1.setCount(b1.getCount()+ b2.getCount());
return b1;
}
//最后逻辑运算
@Override
public Double finish(Buffer reduction) {
return reduction.getSum().doubleValue()/ reduction.getCount();
}
//设置Buffer的编码格式
@Override
public Encoder<Buffer> bufferEncoder() {
return Encoders.kryo(Buffer.class);
}
//设置返回值的编码格式
@Override
public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}
(2)创建自定义函数Test06_UDAF继承 Aggregator
package code;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.udaf;
/**
* @ClassName Test_07_UDAF
* @Description TODO
* @Author Zouhuiming
* @Date 2023/7/1 10:11
* @Version 1.0
*/
public class Test06_UDAF {
public static void main(String[] args) {
//1、创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test07_UDAF");
//2、创建SparkSession
SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
//3、读取文件并且转换为视图user_info
sparkSession.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json").createOrReplaceTempView("user_info");
//4、查询表中所有数据
Dataset<Row> sqlDS = sparkSession.sql("select * from user_info");
System.out.println("------------查询全表-----------");
sqlDS.show();
//5、注册函数
sparkSession.udf().register("MyAvg",udaf(new Test06_Buffer(), Encoders.LONG()));
//6、查询全表平均年龄
Dataset<Row> sqlDS1 = sparkSession.sql("select MyAvg(age) avg_age from user_info");
System.out.println("--------------查询全表平均年龄----------------");
sqlDS1.show();
//7、查询每个人的平均年龄
Dataset<Row> sqlDS2 = sparkSession.sql("select name ,MyAvg(age) as avg_age from user_info group by name");
System.out.println("--------------------查询每个人的平均年龄(自定义函数)-------------------------");
sqlDS2.show();
//8、关闭sparkSession
sparkSession.close();
}
}
运行结果:
2.3.3 UDTF(没有)
1、UDTF:输入一行,返回多行(Hive)
SparkSQL中没有UDTF,需要使用算子类型的flatMap先完成拆分
3、SparkSQL数据的加载与保存
3.1 读取和保持文件
SparkSQL读取和保存的文件一般分为三种,JSON文件、CSV文件和列示储存文件,同时可以通过添加参数来识别不同的储存和压缩格式。
3.1.1 CSV文件
0、在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.csv。并输入如下内容:
name,age
zhangsan,16
lisi,18,
wangwu,20
1、SparkSQL读取和写出CSV文件
package inputandoutput;
import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
/**
* @ClassName Test01_CSV
* @Description TODO
* @Author Zouhuiming
* @Date 2023/7/1 10:30
* @Version 1.0
*/
public class Test01_CSV {
public static void main(String[] args) {
//1、创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_CSV");
//2、创建SparkSession
SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
//3、获取Reader对象
DataFrameReader reader = sparkSession.read();
//4、读取CSV文件,同时可以通过参数指定是否读取列名和指定分割符
Dataset<Row> csvDS = reader.option("header", "true").option("sep", ",")
.csv("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.csv");
//5、输出
System.out.println("----------------查看csvDS-------------");
csvDS.show();
//6、可以将csvDS转换为user对象的集合
// Dataset<User> userDS = csvDS.as(Encoders.bean(User.class));
/**
上述直接转换会报错:AnalysisException: Cannot up cast `age` from string to bigint.
原因:csv读进来都是string
*/
//7、使用map转换为user之前先将String转换2为Long
Dataset<User> mapDS = csvDS.map(new MapFunction<Row, User>() {
@Override
public User call(Row value) throws Exception {
return new User(value.getString(0), Long.valueOf(value.getString(1)));
}
}, Encoders.bean(User.class));
//8、输出
System.out.println("-------------查看mapDS---------------");
mapDS.show();
//TODO 写出为csv文件
//9、获取写对象
DataFrameWriter<User> writer = mapDS.write();
//10、写出时,可以通过参数方式指定:压缩、分隔符,写入模式、是否带有列名等
writer.option("seq","\t").mode(SaveMode.Overwrite)
.option("header","true").csv("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.csv");
//11、关闭SparkSession
sparkSession.close();
}
}
运行结果
3.1.2 JSON文件
1、SparkSQL读取和写出JSON文件
package inputandoutput;
import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
/**
* @ClassName Test02_JSON
* @Description TODO
* @Author Zouhuiming
* @Date 2023/7/1 11:24
* @Version 1.0
*/
public class Test02_JSON {
public static void main(String[] args) {
//1、创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_JSON");
//2、创建SparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
//3、获取DS
DataFrameReader reader = spark.read();
Dataset<Row> jsonDS = reader.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
jsonDS.show();
//将jsonDS转换为userDS
Dataset<User> userDS = jsonDS.as(Encoders.bean(User.class));
System.out.println("----------查看userDS的数据------------");
userDS.show();
//输出为JSON文件
DataFrameWriter<User> write = userDS.write();
write.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.json");
//x、关闭资源
spark.close();
}
}
运行结果:
3.1.3 Parquet文件
1、sparkSQL读取和写出列式存储文件。
package inputandoutput;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
/**
* @ClassName Test03_Parquet
* @Description TODO
* @Author Zouhuiming
* @Date 2023/7/1 11:29
* @Version 1.0
*/
public class Test03_Parquet {
public static void main(String[] args) {
//1、创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_Parquet");
//2、创建SparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
//3、获取DS
DataFrameReader reader = spark.read();
//我们想读取Parquet,但是没有,没有关系,我们可以先创建
Dataset<Row> jsonDS = reader.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
//ToDO 写出Parquet文件
//获取写对象
DataFrameWriter<Row> writer = jsonDS.write();
//将jsonDS数据写出,以Parquet的格式写出
writer.parquet("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.parquet");
//Todo 读取Parquet文件
Dataset<Row> outputParquetDS = reader.parquet("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.parquet");
//输出
outputParquetDS.show();
//x、关闭资源
spark.close();
}
}
运行结果
3.2 与MySQL交互(前提是自己的数据要有相关的表)
使用SparkSQL对mysql进行读写
1、导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
2、从MySQL读写数据
package inputandoutput;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import java.util.Properties;
/**
* @ClassName Test04_mysql
* @Description TODO
* @Author Zouhuiming
* @Date 2023/7/1 11:34
* @Version 1.0
*/
public class Test04_mysql {
public static void main(String[] args) {
//1、创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test04_mysql");
//2、创建SparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
//3、获取reader对象
DataFrameReader reader = spark.read();
//4、TODO 读取Mysql中的表的数据
Properties properties = new Properties();
properties.put("user","root");
properties.put("password","123456");
Dataset<Row> base_provinceDS = reader.jdbc("jdbc:mysql://hadoop102:3306/gmall", "base_province", properties);
//5、查看数据
System.out.println("---------------查看base_provinceDS-------------------");
base_provinceDS.show();
//6、查看数据中的偶数
base_provinceDS.createOrReplaceTempView("base_province");
Dataset<Row> sqlDS = spark.sql("select * from base_province where id%2=0");
//7、查看数据
System.out.println("----------------数据中的偶数---------------");
sqlDS.show();
//Todo 向mysql中写数据
System.out.println("-------------输出sqlDS数据类型----------------");
sqlDS.printSchema();
//8、获取Writer对象
DataFrameWriter<Row> writer = sqlDS.write();
writer.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://hadoop102:3306/gmall?useSSL=false&useUnicode=true" +
"&characterEncoding=UTF-8","test_base_province",properties);
//x、关闭资源
spark.close();
}
}
运行结果:
3.3 与Hive交互
SparkSQL可以采用内嵌Hive(Spark开箱即用的Hive),也可以采用外部Hive。企业开发中常用外部Hive。
3.3.1 Linux中的交互
1、添加mysql连接驱动到spark-yarn的jars目录
2、添加hive-site.xml文件到spark-yarn的conf目录
3、启动spark-sql的客户端即可
3.3.2 IDEA中的交互
1、向idea项目中的pom.xml文件中添加新的依赖
(1)添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.3.0</version>
</dependency>
(2)拷贝配置文件到resources目录
需要拷贝hive-site.xml,hdfs-site.xml、core-site.xml、yarn-site.xml
(3)记得开启metastore和Hiveserver2的服务
(4)代码实现
package inputandoutput;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
/**
* @ClassName Test05_Hive
* @Description TODO
* @Author Zouhuiming
* @Date 2023/7/1 14:00
* @Version 1.0
*/
public class Test05_Hive {
public static void main(String[] args) {
//1、设置系统用户名称为zhm
System.setProperty("HADOOP_USER_NAME","zhm");
//2、创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test05_Hive");
//3、创建SparkSession
SparkSession sparkSession = SparkSession.builder()
.enableHiveSupport().config(conf).getOrCreate();
//4、显示所以表
System.out.println("显示所有的表");
sparkSession.sql("show tables;").show();
//5、创建user_info表,字段包括name:string,age:bigint
sparkSession.sql("create table user_info(name String,age bigint);");
//6、向user_info表中插入数据,"张三",10
sparkSession.sql("insert into user_info values(\"zhangsan\",10);");
//7、查询user_info表中数据
System.out.println("------------输出user_info表中的内容--------------");
sparkSession.sql("select * from user_info;").show();
//8、关闭SparkSession
sparkSession.close();
}
}
运行结果:
恭喜大家看完了小编的博客,希望你能够有所收获,我一直相信躬身自问和沉思默想会充实我们的头脑,希望大家看完之后可以多想想喽,编辑不易,求关注、点赞、收藏(Thanks♪(・ω・)ノ)。