SparkSQL - 介绍及使用 Scala、Java、Python 三种语言演示

news2024/11/15 21:26:16

一、SparkSQL

前面的文章中使用 RDD 进行数据的处理,优点是非常的灵活,但需要了解各个算子的场景,需要有一定的学习成本,而 SQL 语言是一个大家十分熟悉的语言,如果可以通过编写 SQL 而操作RDD,学习的成本便会大大降低,在大数据领域 SQL 已经是数一个非常重要的范式,在 Hadoop 生态圈中,我们可以通过 Hive 进而转换成 MapReduces 进行数据分析,在后起之秀的 Flink 中也有 FlinkSQL 来简化数据的操作。

SparkSQL 可以理解成是将 SQL 解析成:RDD + 优化 再执行。

SparkSQL 对比 Hive

SparkSQLHive
计算方式基于 RDD 在内存计算转化为 MapReduces 需要磁盘IO读写
计算引擎SparkMR、Spark、Tez
性能
元数据无自身的元数据,可以与Hive metastore连接Hive metastore
缓存表支持不支持
视图支持支持
ACID不支持支持(hive 0.14)
分区支持支持
分桶支持支持

SparkSQL 的适用场景

数据类型说明
结构化数据有固定的 Schema ,例如:关系型数据库的表
半结构化数据没有固定的 Schema,但是有结构,数据一般是自描述的,例如:JSON 数据

理解 DataFrame 和 DataSet

SparkSQL的数据抽象是 DataFrameDataSet ,底层都是RDD

DataFrame 可以理解为是一个分布式表,包括:RDD - 泛型 + Schema约束(指定了字段名和类型) + SQL操作 + 优化

DataSetDataFrame 的基础上增加了泛型的概念。

例如:有文本数据,读取为 RDD 后,可以拥有如下数据:

1小明110@qq.com
2小张120@qq.com
3小王130@qq.com

如果转化为 DataFrame ,那就就可以拥有下面数据:

ID:bigint姓名:String邮箱:String
1小明110@qq.com
2小张120@qq.com
3小王130@qq.com

如果转化为 DataSet ,那就就可以拥有下面数据:

ID:bigint姓名:String邮箱:String泛型
1小明110@qq.comuser
2小张120@qq.comuser
3小王130@qq.comuser

DataSetDataFrame还是有挺大区别的,DataFrame开发都是写SQL,但是DataSet可以使用类似RDDAPI。也可以理解成DataSet就是存了个数据类型的RDD

二、通过 RDD 使用SparkSQL

如果是使用 ScalaJava 语言开发,需要引入 SparkSQL 的依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.0.1</version>
</dependency>

假如现在有如下文本文件,分别对应含义为:ID、名称、年龄、邮箱

1 小明 20 110.@qq.com
2 小红 29 120.@qq.com
3 李四 25 130.@qq.com
4 张三 30 140.@qq.com
5 王五 35 150.@qq.com
6 赵六 40 160.@qq.com

下面还是使用前面文章的方式读取文本为 RDD ,不过不同的是,我们将 RDD 转为 DataFrame 使用 SQL 的方式处理:

  • Scala:
object SQLRddScala {

  case class User(id: Int, name: String, age: Int, email: String)

  def main(args: Array[String]): Unit = {
    //声明 SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("sparksql")
      .master("local[*]")
      .getOrCreate()
    //通过 SparkSession 获取 SparkContext
    val sc = spark.sparkContext
    //读取文件为 RDD
    val text = sc.textFile("D://test/input1/")
    //根据空格拆分字段
    val rdd = text.map(_.split(" ")).map(s => User(s(0).toInt, s(1), s(2).toInt, s(3)))
    //转化为 DataFrame,并指定 Schema
    val dataFrame = spark.createDataFrame(rdd)
    //打印 Schema
    dataFrame.printSchema()
    //查看数据
    dataFrame.show()
    //DSL 风格查询
    dataFrame.select("id","name").filter("age >= 30").show()
    //SQL 风格
    //注册表
    dataFrame.createOrReplaceTempView("user")
    //执行 SQL 语言
    spark.sql("select * from user where age >= 30").show()
    //关闭资源
    spark.stop()
  }

}
  • Java:
public class SQLRddJava {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class User {
        private Integer id;
        private String name;
        private Integer age;
        private String email;
    }

    public static void main(String[] args) {
        // 声明 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .appName("sparksql")
                .master("local[*]")
                .getOrCreate();
        //  通过 SparkSession 获取 SparkContext
        JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
        // 读取文件为 RDD
        JavaRDD<String> text = sc.textFile("D://test/input1/");
        //根据空格拆分字段
        JavaRDD<User> rdd = text.map(s -> s.split(" ")).map(s -> new User(Integer.parseInt(s[0]), s[1], Integer.parseInt(s[2]), s[3]));
        //转化为 DataFrame,并指定 Schema
        Dataset<Row> dataFrame = spark.createDataFrame(rdd, User.class);
        //打印 Schema
        dataFrame.printSchema();
        // 查看数据
        dataFrame.show();
        //DSL 风格查询
        dataFrame.select("id","name").filter("age >= 30").show();
        // SQL 风格
        dataFrame.createOrReplaceTempView("user");
        // 注册表
        spark.sql("select * from user where age >= 30").show();
        // 执行 SQL 语言
        spark.stop();
    }

}
  • Python:
from pyspark.sql import SparkSession
import findspark

if __name__ == '__main__':
    findspark.init()
    # 声明 SparkSession
    spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
    # 通过 SparkSession 获取 SparkContext
    sc = spark.sparkContext
    # 读取文件为 RDD
    text = sc.textFile("D:/test/input1/")
    # 根据空格拆分字段
    rdd = text.map(lambda s: s.split(" "))
    # 转化为 DataFrame,并指定 Schema
    dataFrame = spark.createDataFrame(rdd, ["id", "name", "age", "email"])
    # 打印 Schema
    dataFrame.printSchema()
    # 查看数据
    dataFrame.show()
    # DSL 风格查询
    dataFrame.select(["id","name"]).filter("age >= 30").show()
    # SQL 风格
    # 注册表
    dataFrame.createOrReplaceTempView("user")
    # 执行 SQL 语言
    spark.sql("select * from user where age >= 30").show()

    #关闭资源
    spark.stop()

打印的 Schema 信息:
在这里插入图片描述
全部数据内容:

在这里插入图片描述
DSL 查询结果:

在这里插入图片描述
SQL 查询结果:

在这里插入图片描述

三、多数据源交互

SparkSession 中可以通过: spark.read.格式(路径) 的方式, 获取 SparkSQL 中的外部数据源访问框架 DataFrameReaderDataFrameReader 有两种访问方式,一种是使用 load 方法加载,使用 format 指定加载格式, 还有一种是使用封装方法, 类似 csv, json, jdbc 等,这两种方式本质上一样,都是 load 的封装。

注意:如果使用 load 方法加载数据, 但是没有指定 format 的话, 默认是按照 Parquet 文件格式读取。

对于写数据SparkSQL 中增加了一个新的数据写入框架 DataFrameWriter ,同样也有两种使用方式,一种是使用 format 配合 save,还有一种是使用封装方法,例如 csv, json, saveAsTable 等,参数如下:

组件说明
source写入目标, 文件格式等, 通过 format 方法设定
mode写入模式, 例如一张表已经存在, 如果通过 DataFrameWriter 向这张表中写入数据, 是覆盖表呢, 还是向表中追加呢? 通过 mode 方法设定
extraOptions外部参数, 例如 JDBC 的 URL, 通过 options, option 设定
partitioningColumns类似 Hive 的分区, 保存表的时候使用, 这个地方的分区不是 RDD 的分区, 而是文件的分区, 或者表的分区, 通过 partitionBy 设定
bucketColumnNames类似 Hive 的分桶, 保存表的时候使用, 通过 bucketBy 设定
sortColumnNames用于排序的列, 通过 sortBy 设定

其中一个很重要的参数叫做 mode,表示指定的写入模式,可以传入Scala 对象表示或字符串表示,有如下几种方式:

Scala 对象表示字符串表示说明
SaveMode.ErrorIfExists“error”将 DataFrame 保存到 source 时, 如果目标已经存在, 则报错
SaveMode.Append“append”将 DataFrame 保存到 source 时, 如果目标已经存在, 则添加到文件或者 Table 中
SaveMode.Overwrite“overwrite”将 DataFrame 保存到 source 时, 如果目标已经存在, 则使用 DataFrame 中的数据完全覆盖目标
SaveMode.Ignore“ignore”将 DataFrame 保存到 source 时, 如果目标已经存在, 则不会保存 DataFrame 数据, 并且也不修改目标数据集, 类似于 CREATE TABLE IF NOT EXISTS

注意:如果没有指定 format, 默认的 formatParquet

1. 读写 CSV 格式

准备 CSV 文件:

在这里插入图片描述

  • Scala:
object SQLCSV {
  def main(args: Array[String]): Unit = {
    //声明 SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("sparksql")
      .master("local[*]")
      .getOrCreate()

    //读取 CSV
    val csv = spark
      .read
      .schema("id int, name string, age int, email string")
      .option("header", "true") //第一行为标题
      .csv("D:/test/input1/test.csv")
    csv.printSchema()
    csv.show()
    // SQL 操作
    csv.createOrReplaceTempView("csv")
    spark.sql("select * from csv where age >= 30").show()
    //如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符
    val csv1 = spark
      .read
      .schema("id int, name string, age int, email string")
      .option("delimiter", " ")
      .csv("D:/test/input1/test.txt")
    csv1.printSchema()
    csv1.show()
    //写出CSV文件
    csv1.write.mode(SaveMode.Overwrite).json("D:/test/output")
    //写出查询结果
    spark.sql("select * from csv where age <= 30")
      .write.mode(SaveMode.Overwrite).csv("D:/test/output1")
    spark.stop()
  }
}
  • Java:
public class SQLCSVJava {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .appName("sparksql")
                .master("local[*]")
                .getOrCreate();
        //读取 CSV
        Dataset<Row> csv = spark.read()
                .schema("id int, name string, age int, email string")
                .option("header", "true") //第一行为标题
                .csv("D:/test/input1/test.csv");
        csv.printSchema();
        csv.show();
        // SQL 操作
        csv.createOrReplaceTempView("csv");
        spark.sql("select * from csv where age >= 30").show();
        //如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符
        Dataset<Row> csv1 = spark.read()
                .schema("id int, name string, age int, email string")
                .option("delimiter", " ") //第一行为标题
                .csv("D:/test/input1/test.txt");
        csv1.printSchema();
        csv1.show();
        //写出CSV文件
        csv1.write().mode(SaveMode.Overwrite).json("D:/test/output");
        //写出查询结果
        spark.sql("select * from csv where age <= 30")
                .write().mode(SaveMode.Overwrite).csv("D:/test/output1");

        spark.close();
    }
}
  • Python:
from pyspark.sql import SparkSession,DataFrameWriter
import findspark

if __name__ == '__main__':
    findspark.init()
    # 声明 SparkSession
    spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
    # 读取 CSV
    csv = spark.read \
      .schema("id int, name string, age int, email string") \
      .option("header", "true") \
      .csv("D:/test/input1/test.csv")
    csv.printSchema()
    csv.show()
    #  SQL 操作
    csv.createOrReplaceTempView("csv")
    spark.sql("select * from csv where age >= 30").show()
    # 如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符
    csv1 = spark.read \
      .schema("id int, name string, age int, email string") \
      .option("delimiter", " ") \
      .csv("D:/test/input1/test.txt")
    csv1.printSchema()
    csv1.show()
    # 写出CSV文件
    csv1.write.mode("overwrite").json("D:/test/output")
    # 写出查询结果
    spark.sql("select * from csv where age <= 30").write.mode("overwrite").csv("D:/test/output1")
    #关闭资源
    spark.stop()

存储的 csv :
在这里插入图片描述

在这里插入图片描述

2. 读写Parquet 格式文件

先将上面 csv 文件转为 Parquet 文件:

//读取 CSV
Dataset<Row> csv = spark.read()
        .schema("id int, name string, age int, email string")
        .option("header", "true") //第一行为标题
        .csv("D:/test/input1/test.csv");
// 转化为 Parquet 文件
csv.write().mode(SaveMode.Overwrite).parquet("D:/test/output3");

在这里插入图片描述
将该文件名修改为 test.parquet 方便下面测试:

读取 Parquet 格式文件

  • Scala:
object SQLParquet {
  def main(args: Array[String]): Unit = {
    //声明 SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("sparksql")
      .master("local[*]")
      .getOrCreate()

    //读取 parquet
    val parquet = spark.read.parquet("D:/test/output3/test.parquet")
    parquet.printSchema()
    parquet.show()
    // SQL 操作
    parquet.createOrReplaceTempView("parquet")
    spark.sql("select * from parquet where age >= 30").show()
    //写入 Parquet 的时候指定分区
    parquet.write.mode(SaveMode.Overwrite).partitionBy("age").csv("D:/test/output5")

    spark.stop()
  }
}
  • Java
public class SQLParquetJava {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .appName("sparksql")
                .master("local[*]")
                .getOrCreate();
        //读取 parquet 
        Dataset<Row> parquet = spark.read().parquet("D:/test/output3/test.parquet");
        parquet.printSchema();
        parquet.show();
        // SQL 操作
        parquet.createOrReplaceTempView("parquet");
        spark.sql("select * from parquet where age >= 30").show();
        //写入 Parquet 的时候指定分区
        parquet.write().mode(SaveMode.Overwrite).partitionBy("age").parquet("D:/test/output5");

        spark.close();
    }
}
  • Python:
from pyspark.sql import SparkSession,DataFrameWriter
import findspark

if __name__ == '__main__':
    findspark.init()
    # 声明 SparkSession
    spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()

    #  读取 parquet
    parquet = spark.read.parquet("D:/test/output3/test.parquet")
    parquet.printSchema()
    parquet.show()
    #  SQL操作
    parquet.createOrReplaceTempView("parquet")
    spark.sql("select * from parquet where age >= 30").show()
    # 写入Parquet的时候指定分区
    parquet.write.mode("overwrite").partitionBy("age").csv("D:/test/output5")
   
    #关闭资源
    spark.stop()

SQL 查询结果:

在这里插入图片描述

输出目录:
在这里插入图片描述

3. 读写 JSON 格式文件

将上面CSV数据转化为 JSON

//读取 CSV
Dataset<Row> csv = spark.read()
        .schema("id int, name string, age int, email string")
        .option("header", "true") //第一行为标题
        .csv("D:/test/input1/test.csv");
// 转化为 JSON 文件
csv.write().mode(SaveMode.Overwrite).json("D:/test/output6");

在这里插入图片描述
在这里插入图片描述
读写 JSON 格式文件

  • Scala:
object SQLJson {
  def main(args: Array[String]): Unit = {
    //声明 SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("sparksql")
      .master("local[*]")
      .getOrCreate()

    //读取 JSON
    val json = spark.read.json("D:/test/output6/test.json")
    json.printSchema()
    json.show()
    // SQL 操作
    json.createOrReplaceTempView("parquet")
    spark.sql("select * from parquet where age >= 30").show()
    //写入 JSON
    json.filter("age < 30 ").write.json("D:/test/output7")

    spark.stop()
  }
}
  • Java:
public class SQLJsonJava {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .appName("sparksql")
                .master("local[*]")
                .getOrCreate();

        //读取 JSON
        Dataset<Row> json = spark.read().json("D:/test/output6/test.json");
        json.printSchema();
        json.show();
        // SQL 操作
        json.createOrReplaceTempView("parquet");
        spark.sql("select * from parquet where age >= 30").show();
        //写入 JSON
        json.filter("age < 30 ").write().json("D:/test/output7");

        spark.close();
    }
}
  • Python:
from pyspark.sql import SparkSession,DataFrameWriter
import findspark

if __name__ == '__main__':
    findspark.init()
    # 声明 SparkSession
    spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()

    # 读取JSON
    json = spark.read.json("D:/test/output6/test.json")
    json.printSchema()
    json.show()
    # SQL操作
    json.createOrReplaceTempView("parquet")
    spark.sql("select * from parquet where age >= 30").show()
    # 写入JSON
    json.filter("age < 30 ").write.json("D:/test/output7")
   
    #关闭资源
    spark.stop()

4. 读写 MySQL 格式文件

ScalaScala 项目需要引入 MySQL 的依赖:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.22</version>
</dependency>

创建表:

CREATE TABLE `user` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  `email` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4;

写入测试数据:

INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (1, '小明', 20, '110.@qq.com');
INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (2, '小红', 29, '120.@qq.com');
INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (3, '李四', 25, '130.@qq.com');
INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (4, '张三', 30, '140.@qq.com');
INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (5, '王五', 35, '150.@qq.com');
INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (6, '赵六', 40, '160.@qq.com');

读写 MySQL 格式文件

  • Scala:
object SQLMySql {
  def main(args: Array[String]): Unit = {
    //声明 SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("sparksql")
      .master("local[*]")
      .getOrCreate()

    //读取 mysql
    val prop = new Properties
    prop.setProperty("user", "root")
    prop.setProperty("password", "root")
    val user = spark.read.jdbc(
      "jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
      "user",
      prop)
    user.printSchema()
    user.show()
    // SQL 操作
    user.createOrReplaceTempView("user")
    spark.sql("select * from user where age >= 30").show()
    //写入表信息,没有表自动创建
    user.filter("age < 30 ")
      .write.mode(SaveMode.Overwrite).jdbc(
      "jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
      "user2",
      prop)

    spark.stop()
  }
}
  • Java:
public class SQLMySqlJava {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .appName("sparksql")
                .master("local[*]")
                .getOrCreate();

        //读取 mysql
        Properties prop = new Properties();
        prop.setProperty("user","root");
        prop.setProperty("password","root");
        Dataset<Row> user = spark.read().jdbc(
                "jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
                "user",
                prop);
        user.printSchema();
        user.show();
        // SQL 操作
        user.createOrReplaceTempView("user");
        spark.sql("select * from user where age >= 30").show();
        //写入表信息,没有表自动创建
        user.filter("age < 30 ").write().mode(SaveMode.Overwrite).jdbc(
                "jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
                "user2",
                prop
        );

        spark.close();
    }
}
  • Python
    使用 Python 读写 MySql 需要将 MySql的驱动放到 java 安装目录的 jre\lib\ext目录下:

在这里插入图片描述

from pyspark.sql import SparkSession, SQLContext
import findspark

if __name__ == '__main__':
    findspark.init()
    # 声明 SparkSession
    spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()

    properties = {'user': 'root', 'password': 'root'}
    url = "jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
    user = spark.read.jdbc(url=url, table="user", properties=properties)
    user.printSchema()
    user.show()
    #  SQL操作
    user.createOrReplaceTempView("user")
    spark.sql("select * from user where age >= 30").show()
    # 写入表信息,没有表自动创建
    user.filter("age < 30 ").write.mode("overwrite").jdbc(url=url, table="user2", properties=properties)

    # 关闭资源
    spark.stop()

SQL 查询结果:
在这里插入图片描述
MySQL 表信息:
在这里插入图片描述

四、SparkOnHive

Hive 中可以将运算引擎改为 Spark ,也就是 HiveONSpark 不过这种方式严重依赖 Hive ,已经淘汰,而 SparkOnHvie 是在 SparkSQL 诞生之后提出的,仅仅使用 Hive 的元数据(库、表、字段、位置等),剩下的全部由 Spark 进行语法解析、物理执行计划、SQL优化等。

由于远程模式下 Hive 的元数据是由 metastore 服务控制,因此确保metastore 服务正常启动,如果对此不了解,可以参考下面文章:

https://xiaobichao.blog.csdn.net/article/details/127717080

在这里插入图片描述

注意spark3.0.1整合hive要求hive版本>=2.3.7

ScalaJava 项目需要引入 spark-hive 的依赖:

<!--SparkSQL+ Hive依赖-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>3.0.1</version>
</dependency>
  • Scala:
object SparkOnHive {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val spark = SparkSession.builder
      .appName("sparksql")
      .master("local[*]")
      // 实际开发中可以根据集群规模调整大小,默认200
      .config("spark.sql.shuffle.partitions", "8")
      // 指定 Hive 数据库在 HDFS 上的位置
      .config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")
      // hive metastore 的地址
      .config("hive.metastore.uris", "thrift://node1:9083")
      // 开启对hive语法的支持
      .enableHiveSupport
      .getOrCreate

    // 查询全部数据库
    spark.sql("show databases").show()
    // 使用 bxc 库
    spark.sql("use bxc").show()
    // 查询全部表
    spark.sql("show tables").show()
    // 创建表
    spark.sql("create table if not exists `user2`(" +
      "    id int comment 'ID'," +
      "    name string comment '名称'," +
      "    age int comment '年龄'," +
      "    email string comment '邮箱'" +
      ") comment '用户表'" +
      "row format delimited " +
      "fields terminated by ',' " +
      "lines terminated by '\n' ").show()
    spark.sql("show tables").show()

    //查询数据
    spark.sql("select * from `user`").show()

    spark.stop()
  }
}
  • Java
public class SparkOnHiveJava {

    public static void main(String[] args) {
        System.setProperty("HADOOP_USER_NAME","root");
        SparkSession spark = SparkSession
                .builder()
                .appName("sparksql")
                .master("local[*]")
                // 实际开发中可以根据集群规模调整大小,默认200
                .config("spark.sql.shuffle.partitions", "8")
                // 指定 Hive 数据库在 HDFS 上的位置
                .config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")
                // hive metastore 的地址
                .config("hive.metastore.uris", "thrift://node1:9083")
                // 开启对hive语法的支持
                .enableHiveSupport()
                .getOrCreate();

        // 查询全部数据库
        spark.sql("show databases").show();
        // 使用 bxc 库
        spark.sql("use bxc").show();
        // 查询全部表
        spark.sql("show tables").show();
        // 创建表
        spark.sql(
                "create table if not exists `user2`(" +
                "    id int comment 'ID'," +
                "    name string comment '名称'," +
                "    age int comment '年龄'," +
                "    email string comment '邮箱'" +
                ") comment '用户表'" +
                "row format delimited " +
                "fields terminated by ',' " +
                "lines terminated by '\n' ").show();
        // 查询全部表
        spark.sql("show tables").show();
        
        //查询数据
        spark.sql("select * from `user`").show();

        spark.stop();
    }
}

  • Python:
from pyspark.sql import SparkSession, SQLContext
import findspark

if __name__ == '__main__':
    findspark.init()
    # 声明 SparkSession
    spark = SparkSession.builder\
        .appName('sparksql')\
        .master("local[*]") \
        .config("spark.sql.shuffle.partitions", "8") \
        .config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse") \
        .config("hive.metastore.uris", "thrift://node1:9083") \
        .enableHiveSupport() \
        .getOrCreate()

    #  查询全部数据库
    spark.sql("show databases").show()
    #  使用bxc库
    spark.sql("use bxc").show()
    #  查询全部表
    spark.sql("show tables").show()
    #  创建表
    spark.sql("create table if not exists `user2`(" +
              "    id int comment 'ID'," +
              "    name string comment '名称'," +
              "    age int comment '年龄'," +
              "    email string comment '邮箱'" +
              ") comment '用户表'" +
              "row format delimited " +
              "fields terminated by ',' " +
              "lines terminated by '\n' ").show()
    spark.sql("show tables").show()

    #  查询数据
    spark.sql("select * from `user`").show()

    spark.stop()

查看所有库:
在这里插入图片描述
查看全部表:
在这里插入图片描述
查询表信息:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/61438.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ARM汇编之程序状态寄存器传输指令

ARM汇编之程序状态寄存器传输指令前言 首先&#xff0c;请问大家几个小小问题&#xff0c;你清楚&#xff1a; CLZ指令的常见使用场景&#xff1b;状态寄存器访问指令有哪些&#xff1f; 今天&#xff0c;我们来一起探索并回答这些问题。为了便于大家理解&#xff0c;以下是…

[附源码]Python计算机毕业设计SSM金牛社区疫情防控系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

[附源码]JAVA毕业设计老年人健康饮食管理系统(系统+LW)

[附源码]JAVA毕业设计老年人健康饮食管理系统&#xff08;系统LW&#xff09; 目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项…

LeetCode 0542. 01 矩阵

【LetMeFly】542.01 矩阵 力扣题目链接&#xff1a;https://leetcode.cn/problems/01-matrix/ 给定一个由 0 和 1 组成的矩阵 mat &#xff0c;请输出一个大小相同的矩阵&#xff0c;其中每一个格子是 mat 中对应位置元素到最近的 0 的距离。 两个相邻元素间的距离为 1 。 示…

MySQL数据库之存储引擎

MySQL数据库之存储引擎数据存储引擎介绍MyISAM数据引擎概述MyISAM的特点介绍及数据引擎对应文件MyISAM的存储格式分类MyISAM适用的生产场景举例InnoDB数据引擎概述InnoDB特点介绍及数据引擎对应文件InnoDB适用生产场景分析企业选择存储引擎的依据如何配置存储引擎查看系统支持的…

c<8>指针

目录 2&#xff0c;指针的赋值 2.1C语言允许指针赋值为0&#xff08;初始化&#xff09; 2.2指针赋值例 2.3输出指针的值 3&#xff0c;用指针引用数组 3.1利用指针输入数组 3.2优先级问题 4.多维数组 5.字符串 5.1通过指针引用字符串 4.函数中对指针的应用 4.1将指针变…

[附源码]计算机毕业设计车源后台管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Acer W700废物利用- 第一章 - 安装Linux系统Debian 11.5

前言 收拾房子时在犄角旮旯发现了一台N年前的Windows平板&#xff0c;也就是今天的主角&#xff1a;Acer W700 &#xff0c;机器配置是&#xff1a;CPU&#xff1a;I5-3337U&#xff1b;内存&#xff1a;4G&#xff1b;硬盘&#xff1a;128G固态&#xff1b; 插上充电线&…

YOLOv5图像分割--SegmentationModel类代码详解

目录 ​编辑 SegmentationModel类 DetectionModel类 推理阶段 DetectionModel--forward() BaseModel--forward() Segment类 Detect--forward SegmentationModel类 定义model将会调用models/yolo.py中的类SegmentationModel。该类是继承父类--DetectionModel类。 cl…

数学基础从高一开始1、集合的概念

数学基础从高一开始1、集合的概念 目录 数学基础从高一开始1、集合的概念 一、课程引入 解析&#xff1a;方程​编辑2是否有解&#xff1f; 解析&#xff1a;所有到定点的距离等于定长的点组成何种图形&#xff1f; 结论&#xff1a; 二、课程讲解 问题1&#xff1a; 集…

1548_AURIX_TC275_锁步比较逻辑LCL

全部学习汇总&#xff1a; GreyZhang/g_TC275: happy hacking for TC275! (github.com) 这可能是这段时间看过的最简单的一个章节了&#xff0c;所有的章节内容都可以放进这一份笔记也不显得多。 1. 首先明确LCL的意思&#xff0c;其实是锁步核比较器逻辑的意思&#xff0c;还不…

知识点1--认识Docker

IT界2014年之前&#xff0c;对于服务器虚拟化的使用&#xff0c;有过一个流派&#xff0c;基于Windows server系统VMware组成服务器集群&#xff0c;但是后期由于这样的使用方式维护成本相当高&#xff0c;比如服务器的序列、服务器台账以及服务器与服务器之间的切换等等&#…

据说Linuxer都难忘的25个画面

虽然对 Linux 正式生日是哪天还有些争论&#xff0c;甚至 Linus Torvalds 认为在 1991 那一年有四个日子都可以算作 Linux 的生日。但是不管怎么说&#xff0c;Linux 已经 25 岁了&#xff0c;这里我们为您展示一下这 25 年来发生过的 25 件重大里程碑事件。 1991&#xff1a;L…

SpringMVC学习笔记二(获取Cookies、Session和Header、IDEA热部署)

目录 一、一些前置知识 二、SpringMVC获取cookies和session &#x1f351;获取cookies和header &#x1f351;获取session 三、SpringMVC热部署 &#x1f4dd;添加框架支持 &#x1f4dd;settings配置开启自动热部署 &#x1f4dd;开启运行中热部署&#xff1a; &…

【Autopsy数字取证篇】Autopsy案例创建与镜像分析详细教程

【Autopsy数字取证篇】Autopsy案例创建与镜像分析详细教程 Autopsy是一款非常优秀且功能强大的免费开源数字取证分析工具。—【蘇小沐】 文章目录【Autopsy数字取证篇】Autopsy案例创建与镜像分析详细教程1.实验环境2.Autopsy下载安装&#xff08;一&#xff09;创建案例1.软件…

【简单易操作】图漾TM460-E2深度网络相机在ROS-melodic环境下的配置过程

目录一、配置的环境二、下载内容及链接三、ubuntu环境配置下载 Camport3 SDK安装依赖编译运行四、安装OpenNI2套件下载 Camport3 OpenNI2 SDK安装 Camport3 OpenNI2 SDK五、ROS平台安装下载 Camport3 ROS SDK编译配置环境变量运行一、配置的环境 相机型号&#xff1a;TM460-E2…

OpenRASP agent源码分析

目录 前言 准备 源码分析 1. manifest 2. agent分析 3. agent卸载逻辑 总结 前言 笔者在很早前写了(231条消息) OpenRASP Java应用自我保护使用_fenglllle的博客-CSDN博客 实际上很多商业版的rasp工具都是基于OpenRASP的灵感来的&#xff0c;主要就是对核心的Java类通过…

堆(二叉堆)-优先队列-数据结构和算法(Java)

文章目录1 概述1.1 定义1.2 二叉堆表示法2 API3 堆相关算法3.1 上浮&#xff08;由下至上的堆有序化&#xff09;3.2 下沉&#xff08;由上至下的堆有序化&#xff09;3.3 插入元素3.4 删除最大元素4 实现5 性能和分析5.1 调整数组的大小5.2 元素的不可变性6 简单测试6 后记1 概…

2006-2020年全国31省人口老龄化水平

2006-2020年全国31省人口老龄化 1、时间为2006-2020年 2、来源&#xff1a;人口与就业年鉴 3、数据缺失情况说明&#xff1a; 其中2010年存在缺失&#xff0c;采用线性插值法进行填补&#xff0c;内含原始数据、线性插值 4、计算说明&#xff1a;以城镇地区老年抚养比衡量…

uImage的制作过程详解

1、uImage镜像介绍 参考博客&#xff1a;《vmlinuz/vmlinux、Image、zImage与uImage的区别》&#xff1b; 2、uImage镜像的制作 2.1、mkimage工具介绍 参考博客&#xff1a;《uImage的制作工具mkimage详解(源码编译、使用方法、添加的头解析、uImage的制作)》&#xff1b; 2.2…