Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)

news2024/11/24 19:54:29

这是本人的学习过程,看到的同道中人祝福你们心若有所向往,何惧道阻且长;
但愿每一个人都像星星一样安详而从容的,不断沿着既定的目标走完自己的路程;
最后想说一句君子不隐其短,不知则问,不能则学。
如果大家觉得我写的还不错的话希望可以收获关注、点赞、收藏(谢谢大家)

文章目录

  • 一、SparkSQL概述
    • 1.1 什么是SparkSQL
    • 1.2 为什么要有SparkSQL
    • 1.3 SparkSQL的发展
    • 1.4 SparkSQL的特点
  • 二、SparkSQL 编程
    • 2.1 SparkSession 新的起始点
    • 2.2 常用方式
      • 2.2.1 方法调用案例实操
      • 2.2.2 SQL使用方式
      • 2.2.3 DSL特殊语法
    • 2.3 SQL语法的用户自定义函数
      • 2.3.1 UDF
      • 2.3.2 UDAF
      • 2.3.3 UDTF(没有)
  • 3、SparkSQL数据的加载与保存
    • 3.1 读取和保持文件
      • 3.1.1 CSV文件
      • 3.1.2 JSON文件
      • 3.1.3 Parquet文件
    • 3.2 与MySQL交互(前提是自己的数据要有相关的表)
    • 3.3 与Hive交互
      • 3.3.1 Linux中的交互
      • 3.3.2 IDEA中的交互


一、SparkSQL概述

1.1 什么是SparkSQL

Spark是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,SparkSQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,SparkSQL使用这些额外的信息来执行额外的优化。与SparkSQL交互的方式有很多种,包括SQL和DatasetAPI。结算时,使用相同的执行引擎,与你用于表计算的API/语言无关。

1.2 为什么要有SparkSQL

在这里插入图片描述

1.3 SparkSQL的发展

1、发展历史
RDD(Spark1.0)=> Dateframe(Spark1.3) =>Dataset(Spark1.6)
如果同样的数据都给到这三个数据结构,它们分别计算之后,都会给出相同的结果。
不同的是它们执行效率和执行方式。在现在的版本中,dataset性能最好,已经成为了唯一使用的接口。其中Dataframe已经在底层被看作是特殊泛型的DataSet。

2、三者的共性
(1)RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理大型数据通过便利。
(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子实,三者才会开始遍历运算。
(3)三者有许多共同的函数,例如filter,sortby等
(4)三者都会根据Spark的内存情况自动缓存运算。
(5)三者都有分区的概念

1.4 SparkSQL的特点

1、易整合:无缝的整合了SQL查询和Spark编程
2、统一的数据访问方式:使用相同的方式连接不同的数据源
3、兼容Hive:在已有的仓库上直接运行SQL或者HQL
4、标准的数据连接:通过JDBC或者ODBC来连接

二、SparkSQL 编程

2.1 SparkSession 新的起始点

在老的版本中,SparkSQL提供两种SQL查询起始点:
(1) 一个叫SQLContext,用于Spark自己提供的SQL查询;
(2)一个叫HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。
在这里插入图片描述

2.2 常用方式

2.2.1 方法调用案例实操

1、创建一个maven工程SparkSQL
2、输入文件夹准备:在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.json。并输入如下内容:

{"age":20,"name":"qiaofeng"}
{"age":19,"name":"xuzhu"}
{"age":18,"name":"duanyu"}
{"age":22,"name":"qiaofeng"}
{"age":11,"name":"xuzhu"}
{"age":12,"name":"duanyu"}

4、需求
统计人次和统计每人最大年龄
5、在pom.xml文件中添加spark-sql的依赖

  <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>

6、代码实现
(1)添加javaBean的User

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.io.Serializable;

/**
 * @ClassName User
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 20:45
 * @Version 1.0
 */
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
public class User implements Serializable {
    private String name;
    private Long age;

}

(2)统计每个人的最大年龄



import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import scala.Tuple2;

/**
 * @ClassName Test01_MaxAge
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 20:48
 * @Version 1.0
 */
public class Test01_MaxAge {
   public static void main(String[] args) {
       //1、创建配置对象
       SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_MaxAge");

       //2、创建SparkSession
       SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

       //3、获取DS
       Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");

       //4、将json中的行转换为User对象
       Dataset<User> mapDS = jsonDS.map(new MapFunction<Row, User>() {
           @Override
           public User call(Row value) throws Exception {
               return new User(value.getString(1), value.getLong(0));
           }
       }, Encoders.bean(User.class));
       System.out.println("--------由jsonDS转换为UserDS后输出--------");
       mapDS.show();

        //5、分组
       KeyValueGroupedDataset<String, User> groupByKeyDS = mapDS.groupByKey(new MapFunction<User, String>() {
           @Override
           public String call(User value) throws Exception {
               return value.getName();
           }
       }, Encoders.STRING());

       //6、统计,取最大值
       Dataset<Tuple2<String, User>> resultDS = groupByKeyDS.reduceGroups(new ReduceFunction<User>() {
           @Override
           public User call(User v1, User v2) throws Exception {
               return new User( v1.getName(),Math.max(v1.getAge(), v2.getAge()));
           }
       });

       System.out.println("--------------输出结果-------------");
       resultDS.show();

       //x、关闭资源
       spark.close();
   }
}




运行结果:

在这里插入图片描述
(3)统计人次代码编写

import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;

/**
 * @ClassName Test02_UserVisitsCount
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 21:18
 * @Version 1.0
 */
public class Test02_UserVisitsCount {

    /**
     * 在sparkSql中DS直接支持的转换算子有:
     * map(底层已经优化为mapPartition)、mapPartition、flatMap、groupByKey(聚合算子全部由groupByKey开始)、
     * filter、distinct、coalesce、repartition、sort和orderBy(不是函数式的算子,不过不影响使用)。
     */
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_UserVisitsCount");

         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

         //3、获取DS(从文件中按行读取数据文件)
         Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");

         //4、将jsonDS中每行数据转换为对象
         Dataset<User> UserDS = jsonDS.as(Encoders.bean(User.class));
         System.out.println("-------------由jsonDS转换为UserDS输出-------------");
         UserDS.show();

         //5、根据字段分组
         RelationalGroupedDataset groupByDS = UserDS.groupBy(new Column("name"));

         System.out.println("--------由UserDS根据字段Name分组后输出----------");

         System.out.println(groupByDS.toString());

         //6、统计次数
         Dataset<Row> countDS = groupByDS.count();
         System.out.println("--------------由分组后的groupDS进行count统计后输出-------------");
         countDS.show();


         //x、关闭资源
         spark.close();
     }
}

运行结果:
在这里插入图片描述
在SparkSQL中DS直接支持的转换算子有:
map(底层已经优化为mapPartition)、mapPartition·、flatMap、groupByKey(聚合算子全部由groupByKey开始)、filter、distinct、coalesce、repartition、sort和orderBy(不是函数式算子,不够不影响使用)。

2.2.2 SQL使用方式

1、需求:查询年龄大于18岁的用户信息
2、代码编写

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

/**
 * @ClassName Test03_AgeGt18
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:24
 * @Version 1.0
 */
public class Test03_AgeGt18_sql {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_AgeGt18_sql");

         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

         //3、获取DS
         Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");

         System.out.println("------------读取数据之后输出jsonDS------------");
         jsonDS.show();

         //4、根据jsonDS创建临时视图,视图的生命周期和sparkSession绑定,"orReplace"表示可覆盖
         jsonDS.createOrReplaceTempView("user_info");

         //5、查询数据中年龄大于18的用户
         Dataset<Row> sql = spark.sql("select age,name from user_info where age>18");

         //6、输出结果
         System.out.println("---------------输出年龄大于18的结果---------------");
         sql.show();

         //x、关闭资源
         spark.close();
     }
}

运行结果:
在这里插入图片描述

2.2.3 DSL特殊语法

1、需求:查询年龄大于18的用户信息
2、特点:需要导入特殊的依赖:import static org.apache.spark.sql.functions.col;
3、代码实现

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

/**
 * @ClassName Test04_AgeGt18_DSL
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:31
 * @Version 1.0
 */
public class Test04_AgeGt18_DSL {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test04_AgeGt18_DSL");

        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();

        //3、读取文件内容,每行按照jsonLine解析
        Dataset<Row> jsonDS = sparkSession.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");

        System.out.println("------------读取数据之后输出jsonDS------------");
        jsonDS.show();

        //4、查询年龄大于18的用户信息
        Dataset<Row> whereDS = jsonDS.select(new Column("name"), new Column("age")).where(new Column("age").gt(18));
        System.out.println("-------------------输出whereDS的结果-----------------");
        whereDS.show();

        Dataset<Row> filterDS = jsonDS.select(new Column("name"), new Column("age").plus(1).as("newAge")).filter(new Column("age").gt(18));
        System.out.println("-----------------输出filterDS的结果------------------");
        filterDS.show();

        //5、关闭sparkSession
        sparkSession.close();
    }
}


运行结果:
在这里插入图片描述

2.3 SQL语法的用户自定义函数

2.3.1 UDF

1、UDF:一进一出
2、特点:需要导入依赖:import static org.apache.spark.sql.functions.udf;
3、代码实现

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

import static org.apache.spark.sql.functions.udf;


/**
 * @ClassName Test05_UDF
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:41
 * @Version 1.0
 */
public class Test05_UDF {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test05_UDF");

        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();

        //3、读取文件,每行按照jsonLine解析
        Dataset<Row> jsonDS = sparkSession.read().json("input/user.json");

        //4、创建UDF函数addName:为输出参数拼接 is numberOne
        UserDefinedFunction addName = udf(new UDF1<String, String>() {
            @Override
            public String call(String s) throws Exception {
                return s + " is numberone";
            }
        }, DataTypes.StringType);

        //5、为创建的函数进行注册
        sparkSession.udf().register("addName",addName);

        //6、根据jsonDS创建视图user_info
        jsonDS.createOrReplaceTempView("user_info");

        //7、在SQL中使用函数
        Dataset<Row> sqlDS = sparkSession.sql("select addName(name) as newName,age from user_info");

        //8、输出结果
        System.out.println("------------------输出结果--------------");
        sqlDS.show();
    }
}

运行结果
在这里插入图片描述

2.3.2 UDAF

1、UDAF:输入多行,返回一行,通常和groupBy一起使用,如果直接使用UDAF函数,默认将所有数据合并在一起。
2、特点:需要引入依赖:import static org.apache.spark.sql.functions.udaf
3、Spark3.x推荐使用extends Aggregator自定义UDAF,属于强类型的Dataset方式。
4、Spark2.x使用extends UserDefinedAggregateFunction,属于弱类型的DataFrame
5、代码实现
(1)创建Buffer类

package code;

import bean.Buffer;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;


/**
 * @ClassName Test06_UDAF
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:51
 * @Version 1.0
 */
public class Test06_Buffer extends Aggregator<Long, Buffer,Double> {
    public Test06_Buffer() {
    }

    //初始化
    @Override
    public Buffer zero() {
        return new Buffer(0L,0L);
    }

    //单分区聚合
    @Override
    public Buffer reduce(Buffer b, Long a) {
        b.setSum(b.getSum()+a);
        b.setCount(b.getCount()+1);
        return b;
    }

    //多分区聚合
    @Override
    public Buffer merge(Buffer b1, Buffer b2) {
        b1.setSum(b1.getSum()+ b2.getSum());
        b1.setCount(b1.getCount()+ b2.getCount());
        return b1;
    }

    //最后逻辑运算
    @Override
    public Double finish(Buffer reduction) {
        return reduction.getSum().doubleValue()/ reduction.getCount();
    }

    //设置Buffer的编码格式
    @Override
    public Encoder<Buffer> bufferEncoder() {
        return Encoders.kryo(Buffer.class);
    }

    //设置返回值的编码格式
    @Override
    public Encoder<Double> outputEncoder() {
        return Encoders.DOUBLE();
    }
}


(2)创建自定义函数Test06_UDAF继承 Aggregator

package code;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.udaf;


/**
 * @ClassName Test_07_UDAF
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 10:11
 * @Version 1.0
 */
public class Test06_UDAF {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test07_UDAF");

        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();

        //3、读取文件并且转换为视图user_info
        sparkSession.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json").createOrReplaceTempView("user_info");

        //4、查询表中所有数据
        Dataset<Row> sqlDS = sparkSession.sql("select * from user_info");
        System.out.println("------------查询全表-----------");
        sqlDS.show();

        //5、注册函数
        sparkSession.udf().register("MyAvg",udaf(new Test06_Buffer(), Encoders.LONG()));

        //6、查询全表平均年龄
        Dataset<Row> sqlDS1 = sparkSession.sql("select MyAvg(age) avg_age from user_info");
        System.out.println("--------------查询全表平均年龄----------------");
        sqlDS1.show();

        //7、查询每个人的平均年龄
        Dataset<Row> sqlDS2 = sparkSession.sql("select  name ,MyAvg(age) as avg_age from user_info group by name");
        System.out.println("--------------------查询每个人的平均年龄(自定义函数)-------------------------");
        sqlDS2.show();

        //8、关闭sparkSession
        sparkSession.close();
    }
}


运行结果:
在这里插入图片描述

2.3.3 UDTF(没有)

1、UDTF:输入一行,返回多行(Hive)
SparkSQL中没有UDTF,需要使用算子类型的flatMap先完成拆分

3、SparkSQL数据的加载与保存

3.1 读取和保持文件

SparkSQL读取和保存的文件一般分为三种,JSON文件、CSV文件和列示储存文件,同时可以通过添加参数来识别不同的储存和压缩格式。

3.1.1 CSV文件

0、在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.csv。并输入如下内容:

name,age
zhangsan,16
lisi,18,
wangwu,20

1、SparkSQL读取和写出CSV文件

package inputandoutput;

import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;

/**
 * @ClassName Test01_CSV
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 10:30
 * @Version 1.0
 */
public class Test01_CSV {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_CSV");

        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();

        //3、获取Reader对象
        DataFrameReader reader = sparkSession.read();

        //4、读取CSV文件,同时可以通过参数指定是否读取列名和指定分割符
        Dataset<Row> csvDS = reader.option("header", "true").option("sep", ",")
                .csv("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.csv");

        //5、输出
        System.out.println("----------------查看csvDS-------------");
        csvDS.show();

        //6、可以将csvDS转换为user对象的集合
     //   Dataset<User> userDS = csvDS.as(Encoders.bean(User.class));
        /**
         上述直接转换会报错:AnalysisException: Cannot up cast `age` from string to bigint.
         原因:csv读进来都是string
         */


        //7、使用map转换为user之前先将String转换2为Long
        Dataset<User> mapDS = csvDS.map(new MapFunction<Row, User>() {
            @Override
            public User call(Row value) throws Exception {
                return new User(value.getString(0), Long.valueOf(value.getString(1)));
            }
        }, Encoders.bean(User.class));


        //8、输出
        System.out.println("-------------查看mapDS---------------");
        mapDS.show();


        //TODO 写出为csv文件
        //9、获取写对象
        DataFrameWriter<User> writer = mapDS.write();

        //10、写出时,可以通过参数方式指定:压缩、分隔符,写入模式、是否带有列名等
        writer.option("seq","\t").mode(SaveMode.Overwrite)
                .option("header","true").csv("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.csv");

        //11、关闭SparkSession
        sparkSession.close();
    }
}


运行结果
在这里插入图片描述
在这里插入图片描述

3.1.2 JSON文件

1、SparkSQL读取和写出JSON文件

package inputandoutput;

import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;

/**
 * @ClassName Test02_JSON
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 11:24
 * @Version 1.0
 */
public class Test02_JSON {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_JSON");

         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

         //3、获取DS
         DataFrameReader reader = spark.read();

         Dataset<Row> jsonDS = reader.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");

         jsonDS.show();

         //将jsonDS转换为userDS
         Dataset<User> userDS = jsonDS.as(Encoders.bean(User.class));
         System.out.println("----------查看userDS的数据------------");
         userDS.show();

         //输出为JSON文件
         DataFrameWriter<User> write = userDS.write();
         write.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.json");

         //x、关闭资源
         spark.close();
     }
}


运行结果:
在这里插入图片描述
在这里插入图片描述

3.1.3 Parquet文件

1、sparkSQL读取和写出列式存储文件。

package inputandoutput;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;

/**
 * @ClassName Test03_Parquet
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 11:29
 * @Version 1.0
 */
public class Test03_Parquet {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_Parquet");

         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

         //3、获取DS
         DataFrameReader reader = spark.read();

         //我们想读取Parquet,但是没有,没有关系,我们可以先创建
         Dataset<Row> jsonDS = reader.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");

         //ToDO 写出Parquet文件
         //获取写对象
         DataFrameWriter<Row> writer = jsonDS.write();

         //将jsonDS数据写出,以Parquet的格式写出
         writer.parquet("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.parquet");

         //Todo 读取Parquet文件
         Dataset<Row> outputParquetDS = reader.parquet("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.parquet");

         //输出
         outputParquetDS.show();



         //x、关闭资源
         spark.close();
     }
}


运行结果
在这里插入图片描述
在这里插入图片描述

3.2 与MySQL交互(前提是自己的数据要有相关的表)

使用SparkSQL对mysql进行读写
1、导入依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

2、从MySQL读写数据

package inputandoutput;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;

import java.util.Properties;

/**
 * @ClassName Test04_mysql
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 11:34
 * @Version 1.0
 */
public class Test04_mysql {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test04_mysql");

         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

         //3、获取reader对象
         DataFrameReader reader = spark.read();

         //4、TODO   读取Mysql中的表的数据

         Properties properties = new Properties();
         properties.put("user","root");
         properties.put("password","123456");
         Dataset<Row> base_provinceDS = reader.jdbc("jdbc:mysql://hadoop102:3306/gmall", "base_province", properties);

         //5、查看数据
         System.out.println("---------------查看base_provinceDS-------------------");
         base_provinceDS.show();

         //6、查看数据中的偶数
         base_provinceDS.createOrReplaceTempView("base_province");
         Dataset<Row> sqlDS = spark.sql("select * from base_province where id%2=0");

         //7、查看数据
         System.out.println("----------------数据中的偶数---------------");
         sqlDS.show();

         //Todo 向mysql中写数据
         System.out.println("-------------输出sqlDS数据类型----------------");
         sqlDS.printSchema();

         //8、获取Writer对象
         DataFrameWriter<Row> writer = sqlDS.write();
          writer.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://hadoop102:3306/gmall?useSSL=false&amp;useUnicode=true" +
                  "&characterEncoding=UTF-8","test_base_province",properties);

         //x、关闭资源
         spark.close();
     }
}


运行结果:
在这里插入图片描述

3.3 与Hive交互

SparkSQL可以采用内嵌Hive(Spark开箱即用的Hive),也可以采用外部Hive。企业开发中常用外部Hive。

3.3.1 Linux中的交互

1、添加mysql连接驱动到spark-yarn的jars目录
2、添加hive-site.xml文件到spark-yarn的conf目录
3、启动spark-sql的客户端即可
在这里插入图片描述
在这里插入图片描述

3.3.2 IDEA中的交互

1、向idea项目中的pom.xml文件中添加新的依赖
(1)添加依赖

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.3.0</version>
        </dependency>

(2)拷贝配置文件到resources目录
需要拷贝hive-site.xml,hdfs-site.xml、core-site.xml、yarn-site.xml
(3)记得开启metastore和Hiveserver2的服务
(4)代码实现

package inputandoutput;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;

/**
 * @ClassName Test05_Hive
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 14:00
 * @Version 1.0
 */
public class Test05_Hive {
    public static void main(String[] args) {
        //1、设置系统用户名称为zhm
        System.setProperty("HADOOP_USER_NAME","zhm");

        //2、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test05_Hive");

        //3、创建SparkSession
        SparkSession sparkSession = SparkSession.builder()
                .enableHiveSupport().config(conf).getOrCreate();

        //4、显示所以表
        System.out.println("显示所有的表");
        sparkSession.sql("show tables;").show();

        //5、创建user_info表,字段包括name:string,age:bigint
        sparkSession.sql("create table user_info(name String,age bigint);");

        //6、向user_info表中插入数据,"张三",10
        sparkSession.sql("insert into  user_info values(\"zhangsan\",10);");

        //7、查询user_info表中数据
        System.out.println("------------输出user_info表中的内容--------------");
        sparkSession.sql("select * from user_info;").show();


        //8、关闭SparkSession
        sparkSession.close();

    }
}


运行结果:
在这里插入图片描述

恭喜大家看完了小编的博客,希望你能够有所收获,我一直相信躬身自问和沉思默想会充实我们的头脑,希望大家看完之后可以多想想喽,编辑不易,求关注、点赞、收藏(Thanks♪(・ω・)ノ)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/744017.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

移动互联网应用程序(app)个人信息安全测试能力验证-流程介绍

ILONGYU 产品简介 为规范检验检测市场&#xff0c;提升检验检测机构技术能力&#xff0c;根据《检验检测机构资质认定管理办法》《实验室能力验证实施办法》等有关规定&#xff0c;市场监管总局决定在社会重点关注的部分检验检测领域&#xff0c;组织开展2020年国家级检验检测…

【IMX6ULL驱动开发学习】17.内核定时器(按键消抖)

1. 内核定时器初始化 setup_timer(struct timer_list *timer, void (*function)(unsigned long), unsigned long data);timer &#xff1a; 定时器结构体 struct timer_list function&#xff1a; 定时器处理函数 data&#xff1a; 参数 2. 设置定时器的超时时间 timer.exp…

数据备份与恢复

目录 数据备份 1、备份单个数据库中的所有表 2、备份数据库中的某些表 3、备份所有数据库 4、只备份emp表结构 数据库恢复 方法1:使用mysql 命令恢复 方法2:进入数据库&#xff0c;使用source加载备份文件恢复 MySQL表的导入导出 数据备份 MySQLdump备份数据库语句的…

报名开启 | DolphinDB 粉丝节,与你相约上海

作为量化爱好者&#xff0c;你是否在寻找更多志同道合的朋友&#xff1f; 作为技术达人&#xff0c;想探索因子挖掘、深度学习、AI领域的前沿技术&#xff1f; 7月22日 机会来了&#xff01; DolphinDB 首届线下粉丝节将于7月22日下午在上海举行&#xff01; 来现场&#xf…

VoxelNet End-to-End Learning for Point Cloud Based 3D Object Detection 论文学习

代码&#xff1a;VoxelNet: https://github.com/skyhehe123/VoxelNet-pytorch 论文&#xff1a;VoxelNet End-to-End Learning for Point Cloud Based 3D Object Detection 1. 解决了什么问题&#xff1f; 对点云做 3D 检测是许多应用得以落地的关键&#xff0c;如自动驾驶和…

想要避免项目延期,项目经理要关注这三点

在项目交付的过程中&#xff0c;出现项目进度与计划有较大的偏差是常见的现象。这种偏差的原因可能是多种多样的。 为了避免项目延期&#xff0c;项目经理需要认真分析引起进度延期的原因&#xff0c;以及采取相应的措施进行规避。 1、导致进度延期之计划不清晰 在项目开始…

基于灰色神经网络的订单需求预测代码

目录 1 概述 2 代码 3 结果 1 概述 BP(Back Propagation)神经网络模型是一种信息前向传播,误差反向传播的神经网络模型0,能够通过训练样本反向传播调节网络的阈值和权值,使误差平方最小。 BP神经网络是目前应用最广泛的神经网络模型之一。 灰色人工神经网络模型建模过程: (1)利…

快速排序的三种方法 hoare,挖坑法,前后指针法

文章目录 快速排序的整体介绍hoare思路代码实现 挖坑法思路代码实现 前后指针法思路代码实现 快速排序的整体介绍 快速排序是Hoare于1962年提出的一种二叉树结构的交换排序方法&#xff0c;其基本思想为&#xff1a;任取待排序元素序列中的某元素作为基准值&#xff0c;按照该排…

spring data jpa(概述、快速入门、内部原理剖析、查询使用方式)

一、概述 1.1 Spring Data JPA概述 Spring Data JPA 是 Spring 基于 ORM 框架、JPA 规范的基础上封装的一套JPA应用框架&#xff0c;可使开发者用极简的代码即可实现对数据库的访问和操作。 它提供了包括增删改查等在内的常用功能&#xff0c;且易于扩展&#xff01;学习并使…

Hexo博客部署腾讯云服务器

✅作者简介&#xff1a;大家好&#xff0c;我是Cisyam&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Cisyam-Shark的博客 &#x1f49e;当前专栏&#xff1a; 前端相关 ✨特色专栏&…

ModaHub魔搭社区:AI原生云向量数据库MIlvus Cloud的倒置文件索引?

目录 VF 总结 VF 平面索引很不错,但它无法扩展。这就是向量搜索的数据结构发挥作用的地方。通过牺牲准确性来减少运行时间,以便显著提高查询速度和吞吐量。现在有很多索引策略,其中最常用的是倒置文件索引(IVF)。 抛开花哨的名字,IVF 实际上是相当简单的。IVF 通过将…

用C语言进行学生成绩排序(选择排序)

一.选择排序 选择排序的基本思想是:每一趟(如第i趟)在后面n-i1 (i1,2…,n-1) 个待排序元素中选取关键字最小的元素&#xff0c;作为有序子序列的第i个元素&#xff0c;直到第n-1趟做完&#xff0c;待排序元素只剩下1个&#xff0c;就不用再选了。选择排序中的堆排序算法是历年考…

Vue数据项加圆点

目录 Html 样式 方法 Html <el-table-column prop"status" label"数据状态" header-align"center" width"200"><template slot-scope"scope"><div style"display: flex; justify-content: center; a…

六大组件助力大屏一键升级!老板当场拍案叫绝!

上个礼拜参加高中同学聚会&#xff0c;大家在饭桌上聊自己的工作&#xff0c;各自吐槽后发现大家真的是各有各的不容易&#xff01;有个和我一样做数据分析工作的兄弟&#xff0c;喝了点小酒后&#xff0c;情绪上头直接在饭桌上大吐苦水&#xff0c;疯狂diss他领导。 他说本来…

java项目之二手车交易网站(ssm+mysql+jsp)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的二手车交易网站。技术交流和部署相关看文章末尾&#xff01; 开发环境&#xff1a; 后端&#xff1a; 开发语言&#xff1a;Java 框架&a…

【图形学入门】概述(Overview)

本文基于GAMES 101课程进行记录和总结。 概念 计算机图形学&#xff08;Computer Graphics&#xff0c;俗称CG&#xff09;&#xff0c;是一种使用数学算法将二维或三维图形转化为计算机显示器的栅格形式的科学&#xff08;或使用计算机合成和操作视觉/图像图形的信息&#xf…

【Linux操作系统】多线程初步概念

文章目录 多线程初步概念线程的优点线程的缺点线程异常线程用途Linux进程VS线程 多线程初步概念 在一个程序里的一个执行路线就叫做线程&#xff08;thread&#xff09;。更准确的定义是&#xff1a;线程是“一个进程内部的控制序列”。一个进程至少都有一个执行线程。线程是一…

pandas 笔记 style 调整DataFrame格式

1 format 1.0 数据 # Visual Python: Data Analysis > File vp_df pd.read_csv(https://raw.githubusercontent.com/visualpython/visualpython/main/visualpython/data/sample_csv/iris.csv) vp_dfvp_df[:5] vp_df.at[0,sepal_length]np.nan vp_df.at[2,sepal_length]10…

进程和线程的本质区别

前几天有个同学问我&#xff0c;为什么electron是多进程而不是多线程&#xff1f;今天总结一下&#xff0c;对这个问题做下解答。 首先我们先了解下进程的概念&#xff1a;进程是计算机分配资源的最小单位。 简单地说&#xff0c;进程是一个容器。比如一间漂亮的小别墅&#x…

asp.net core框架搭建1-搭建webapi,对数据增删改查接口模板(附源码)

文章目录 系列文章1.项目搭建1.1 新建Asp.net core webapi项目1.2 配置连接Mysql1.3 实现对mysql数据库的数据增删改查&#xff0c;接口1.3.1 根据id查询数据1.3.2 根据用户名模糊查询数据&#xff0c;并分页1.3.3 新增用户数据1.3.4 修改用户数据1.3.5 根据ID删除数据1.3.6 接…