大数据技术之SparkSQL——数据的读取和保存

news2024/10/5 19:19:20

一、通用的加载和保存方式

        SparkSQL提供了通用的保存数据和数据加载的方式。根据不同的参数读取,并保存不同格式的数据。SparkSQL默认读取和保存的文件格式为Parquet。

1.1 加载数据

spark.read.load 是加载数据的通用方式。

 如果读取不同格式的数据,可以对不同的数据格式进行设定,如:

spark.read.format("json").load("data/user.json")

// 可以简化为如下:
spark.read.json("data/user.json")

 

 1.2 保存数据

spark.write.save 是保存数据的通用方式。

          如果保存不同格式的数据,可以对不同的数据格式进行设定,如:

df.write.format("json").save("data/user.json")
df.write.format("orc").saveAsTable("dws_events.DF_user_friend_count")

// 可以简化为如下:
spark.write.json("data/user.json")

        保存操作可以使用SaveMode,用来指明如何处理数据,使用mode()方法来设置。SaveMode是一个枚举类,其中常量包括:

Scala/ JavaAny LanguageMeaning
SaveMode.ErrorIfExists(default)"error"(default)如果文件已经存在则抛出异常
SaveMode.Append"append"如果文件已经存在则追加
SaveMode.Overwrite"overwrite"如果文件已经存在则覆盖
SaveMode.Ignore"ignore"如果文件已经存在则忽略

如:

df.write.mode("append").json("data/user.json")

二、Parquet

        SparkSQL的默认数据源为Parquet格式。Parquet是一种能够有效存储嵌套数据的列式存储格式。

        数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format。修改配置项spark.sql.sources.default,可以修改默认数据源格式。

三、JSON

        SparkSQL能够自动推测JSON数据集的结构,并将它加载为一个Dataset[Row]。可以通过SparkSession.read.json()去加载JSON文件。

1)导入隐式转换

import spark.implicits._

2)加载JSON文件

val path = "/opt/module/spark-local/people.json"
val peopleDF = spark.read.json(path)

3)创建临时表

peopleDF.createOrReplaceTempView("people")

4)数据查询

val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 and 19")

四、CSV

        SparkSQL可以配置CSV文件的列表信息,读取CSV文件,CSV文件的第一行设置为数据列。

spark.read.format("csv")
    .option("sep",";")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("data/user.csv")

五、MySQL

        SparkSQL可以通过JDBC从关系型数据库中读取数据的方式来创建DataFrame。通过对DataFrame进行一系列的计算后,再将数据写回到关系型数据库中。如果使用spark-shell操作,可在启动shell时指定相关的数据库驱动路径,或将相关的数据库驱动放到spark的类路径下。

spark-shell

bin/spark-shell
--jars mysql-connector-java-8.0.30.jar

IDEA中通过JDBC对MySQL进行操作

1)导入依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.30</version>
</dependency>

2)读取数据

val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")

// 创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

import spark.implicits._

// 方式1:通过load方式读取
val df = spark.read.format("jdbc")
    .option("url", "jdbc:mysql://192.168.136.20:3306/spark-sql")    // 数据库名字
    .option("driver", "com.mysql.jdbc.Driver")
    .option("user", "xsqone")
    .option("pasword", "root")
    .option("dbtable", "user")    // 表名
    .load()

df.show

// 方式2:通用的load方法读取(参数另一种形式)
spark.read.format("jdbc")
    .options(
        Map(
            "url"->"jdbc:mysql://linux1:3306/spark-sql?user=root&password=123123",
            "dbtable"->"user",
            "driver"->"com.mysql.jdbc.Driver"))
    .load().show


// 方式3:使用JDBC读取
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123123")

val df: DataFrame = spark.read.jdbc("jdbc:mysql://linux1:3306/spark-sql", "user", props)

df.show


// 释放资源
spark.stop()

3)写入数据

//方式1:通用的方式 format 指定写出类型
val df = spark.write.format("jdbc")
    .option("url", "jdbc:mysql://192.168.136.20:3306/spark-sql")    // 数据库名字
    .option("driver", "com.mysql.jdbc.Driver")
    .option("user", "xsqone")
    .option("pasword", "root")
    .option("dbtable", "user")    // 表名
    .mode(SaveMode.Append)
    .save()

// 方式2:通过JDBC方法
val props: Properties = new Properties()

props.setProperty("user", "root")
props.setProperty("password", "123123")

ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://linux1:3306/spark-sql", "user", props)

六、Hive

        Hive是Hadoop上的SQL引擎,SparkSQL编译时可以包含Hive支持,也可以不包含。

1)内嵌的HIVE(使用较少)

        如果使用spark内嵌的hive,那么可以直接使用。hive的元数据存储在derby中,默认仓库地址:$SPARK_HOME/spark-warehouse

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

scala> spark.sql("create table aa(id int)")

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| aa| false|
+--------+---------+-----------+

向表中加载本地数据

scala> spark.sql("load data local inpath 'input/ids.txt' into table aa")

scala> spark.sql("select * from aa").show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+

 

2)外部的HIVE

        想要连接外部已经部署好的HIVE,需要下面几个步骤:

1、spark 要接管hive需要把 hive-site.xml 拷贝到 conf/ 目录下

2、把 MySQL 的驱动拷贝到 jars/ 目录下

3、如果访问不到 hdfs ,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/ 目录下

4、重启 spark-shell

scala> spark.sql("show tables").show
20/04/25 22:05:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
+--------+--------------------+-----------+
|database| tableName|isTemporary|
+--------+--------------------+-----------+
| default| emp| false|
| default|hive_hbase_emp_table| false|
| default| relevance_hbase_emp| false|
| default| staff_hive| false|
| default| ttt| false|
| default| user_visit_action| false|
+--------+--------------------+-----------+

 

3)运行SparkSQL CLI

        SparkSQL CLI可以很方便的在本地运行HIVE元数据服务以及从命令行执行查询任务。在spark目录下执行如下命令启动SparkSQL CLI,直接执行SQL语句。类似hive窗口。

bin/spark-sql

4)运行Spark beeline

        Spark Thrift Server 是 spark 基于 HiveServer2 实现的一个 Thrift 服务。旨在无缝兼容 HiveServer2 。由于 Spark Thrift Server 的接口和协议都与 HiveServer2 完全一致,因此部署好 Spark Thrift Server 后,可以直接使用 hive 的 beeline 访问 spark Thrift Serve r执行相关语句。

        如果想连接 Thrift Server ,需要通过以下几个步骤:

1、spark 要接管 hive 需要把 hive-site 拷贝到 conf/ 目录下

2、把 MySQL 的驱动拷贝到 jars/ 目录下

3、如果访问不到 hdfs ,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/ 目录下

4、启动Thrift Server

sbin/start-thriftserver.sh

5、使用 beeline 连接 Thrift Server

bin/beeline -u jdbc:hive2://linux1:10000 -n root

5)代码操作HIVE:enableHiveSupport()

1> 导入依赖

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.1.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.1.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>3.1.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.29</version>
</dependency>

2> 将hive-site.xml文件拷贝到项目的resource目录中

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>

    <!--配置数据库连接-->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://localhost:3306/hive2?createDatabaseIfNotExist=true</value>
        <description>JDBC connect string for a JDBC metastore</description>
    </property>

    <!--配置数据库连接驱动-->
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
        <description>Driver class name for a JDBC metastore</description>
    </property>

    <!--配置数据库连接用户名-->
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>admin</value>
        <description>username to use against metastore database</description>
    </property>

    <!--配置数据库连接密码-->
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>admin</value>
        <description>password to use against metastore database</description>
    </property>

    <!--配置使用hive查询数据时,显示所查询字段的头信息-->
    <property>
        <name>hive.cli.print.header</name>
        <value>true</value>
        <description>Whether to print the names of the columns in query output.</description>
    </property>
    <property>
        <name>hive.cli.print.current.db</name>
        <value>true</value>
        <description>Whether to include the current database in the Hive prompt.</description>
    </property>
</configuration>

3> 启用hive支持

//创建 SparkSession
val spark: SparkSession = SparkSession
 .builder()
    // 添加对应主机IP
 .config("hive.metastore.uris", "thrift://192.168.153.139:9083")
 .enableHiveSupport()

 .master("local[*]")
 .appName("sql")
 .getOrCreate()

4> 增加对应的依赖关系(包含MySQL驱动)

spark.sql("show tables").show

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/503219.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何编译DPDK静态库

阅读前面文章https://blog.csdn.net/qq_36314864/article/details/130243348,知道了哪些dpdk文件可以在windows下生成。 打开vs,新建一个生成静态库工程,在生成的lib文件中找到D:\dpdk-21.07\build\lib D:\dpdk-21.07\build\drivers找到对应的文件,并按照路径,新建筛选项…

【Vue学习笔记7】Vue3中如何开发组件

重点学习&#xff1a;vue3.0之组件通信机制defineProps&#xff08;组件接收外部传来的参数&#xff09;、defineEmits&#xff08;向组件外部传递参数&#xff09;。 1. 评级组件第一版 简单的评级需求&#xff0c;只需要一行代码就可以实现&#xff1a; "★★★★★☆…

SLAM面试笔记(5) — ROS面试

目录 1 ROS概述 2 ROS通信机制 问题&#xff1a;服务通信概念 问题&#xff1a;服务通信理论模型 3 常见面试题 问题&#xff1a;roslaunch和rosrun区别&#xff1f; 问题&#xff1a;什么是ROS&#xff1f; 问题&#xff1a;ROS中的节点是什么&#xff1f; 问题&…

挠性航天器姿态机动动力学模型及PD鲁棒控制

挠性航天器姿态机动动力学模型及PD鲁棒控制 1挠性航天器姿态机动动力学模型2挠性航天器姿态机动PD鲁棒控制2.1 动力学模型及PD控制律2.2仿真模型2.3 控制程序2.4 被控对象程序2.5 绘图程序2.6 结果 1挠性航天器姿态机动动力学模型 2挠性航天器姿态机动PD鲁棒控制 2.1 动力学模…

【NLP开发】Python实现聊天机器人(ChatterBot,集成web服务)

&#x1f37a;NLP开发系列相关文章编写如下&#x1f37a;&#xff1a; &#x1f388;【NLP开发】Python实现词云图&#x1f388;&#x1f388;【NLP开发】Python实现图片文字识别&#x1f388;&#x1f388;【NLP开发】Python实现中文、英文分词&#x1f388;&#x1f388;【N…

澳大利亚兔灾和——栈?

一.背景 1859年&#xff0c;当一位叫托马斯奥斯汀的农民收到英国老家送来的24只野兔并将它们放归农场的时候&#xff0c;他绝对意想不到&#xff0c;这些看似人畜无害的小兔子&#xff0c;竟为古老的澳洲大陆带来一场巨大的生态破坏。到20世纪初&#xff0c;澳大利亚的兔子数量…

操作系统内存管理(上)——内存管理基础

一、内存的基本知识 1.什么是内存&#xff1f;有什么作用&#xff1f; 内存可存放数据。程序执行前先放到内存才能被CPU处理——缓和CPU和硬盘之间的速度矛盾。 给内存的存储单元编址。如果计算机按字节编址&#xff0c;则每个存储单元大小为1字节。即1B8b&#xff08;8个二进…

智能医院导航导诊系统,门诊地图导航怎么做?

现在很多医院都是综合化大型医院&#xff0c;有很多的科室&#xff0c;院区面积也逐渐扩大&#xff0c;一方面给病患提供了更为全面的医疗资源&#xff0c;另一方面&#xff0c;医院复杂的环境也给病患寻医问诊带来了一定的困扰。电子地图作为大家最喜闻乐见的高效应用形式&…

Python的socket模块及示例

13.2 socket模块 socket由一些对象组成&#xff0c;这些对象提供网络应用程序的跨平台标准。 13.2.1 认识socket模块 socket又称“套接字”&#xff0c;应用程序通常通过“套接字”向网络发出请求或应答网络请求&#xff0c;使主机间或一台计算机上的进程间可以通信。sock…

Android 路由框架ARouter源码解析

作者&#xff1a;小马快跑 我们知道在使用ARouter时&#xff0c;需要在build.config里配置&#xff1a; annotationProcessor com.alibaba:arouter-compiler:1.2.2并且知道annotationProcessor用来声明注解解析器&#xff0c;arouter-compiler用来解析ARouter中的各个注解并自…

代码管理记录(一): 码云Gitee代码提交和维护

文章目录 Gitee介绍登录地址代码提交 Gitee介绍 Gitee 是一个类似于GitHub的代码托管平台&#xff0c;是中国的开源社区和开发者社区。它为开发者提供了基于Git的代码托管、协作、部署、代码质量检测、漏洞扫描、容器镜像等服务&#xff0c;同时也提供了一系列的个人资料和社交…

gitlab使用docker简单快速部署

文章目录 前言一、下载gitlab镜像二、安装步骤1.创建docker-compose文件2. 启动及登陆 三、配置页面总结 前言 GitLab 是一个用于仓库管理系统的开源项目&#xff0c;使用Git作为代码管理工具&#xff0c;并在此基础上搭建起来的web服务。本文主要用来记录如何使用docker快速搭…

c#笔记-类成员

声明类 类可以使用帮助你管理一组相互依赖的数据&#xff0c;来完成某些职责。 类使用class关键字定义&#xff0c;并且必须在所有顶级语句之下。 类的成员只能有声明语句&#xff0c;不能有执行语句。 class Player1 {int Hp;int MaxHp;int Atk;int Def;int Overflow(){if (…

七大排序算法一文通(易懂图解+优化代码)

目录 1.直接插入排序 2.希尔排序 3.选择排序 4.堆排序 5.冒泡排序 6.快速排序 6.1 递归实现——Hoare版 6.2 递归实现——挖坑法 6.3 非递归实现 6.4 优化 7.归并排序 7.1 归并排序——递归实现 7.2 归并排序——非递归实现 8.复杂度以及稳定性 1.直接插入排序 …

一列数到中位数的总距离最小

一列数到中位数的总距离最小 3554.二进制&#xff08;二进制数的加减法-转化为十进制运算再将结果转回二进制3565.完美矩阵1824.钻石收藏家&#xff08;经典双指针&#xff09; 3554.二进制&#xff08;二进制数的加减法-转化为十进制运算再将结果转回二进制 输入样例&#xff…

i春秋 Misc Web 爆破-1

打开链接是PHP源码 代码审计&#xff1a; include "flag.php"; 表示文件中包含flag.php文件&#xff0c;即根目录下存在flag.php $a $_REQUEST[hello]; 命名一个变量a来接收超全局变量$_REQUEST&#xff08;接收表单’hello’数据&#xff0c;请求一个为hello的参…

研发效能系列 - 质量与速度能否兼得?

作者&#xff1a;冬哥 引言 我们的时间&#xff0c;应该是用于提高软件质量&#xff0c;还是专注在发布更有价值的功能&#xff1f;这貌似是软件研发中永恒的话题。 到底什么是质量&#xff1f;质量有什么特质&#xff1f; 质量与速度是什么关系&#xff0c;两者是一个硬币的…

spring.factories 的作用是什么

spring.factories 文件用于在 Spring Boot 项目中配置自动配置项。它包含了一系列 key-value 对,key 是自动配置类的全限定名,value 是这些配置类对应的条件类。Spring Boot 会在启动时扫描 classpath 下的 META-INF/spring.factories 文件,并加载其中定义的自动配置类。这些自…

[IAR][CC2642R1] IDE安装和环境搭建,CC2642的环境配置

文章目录 一、IAR安装&#xff08;1&#xff09;压缩包下载&#xff08;2&#xff09;IAR安装(3) 注册(4) 补丁 二、在IAR中使用CC2642&#xff08;0&#xff09;打开IAR&#xff0c;配置环境。&#xff08;1&#xff09;例程位置&#xff08;2&#xff09;打开例程&#xff08…

4.Redis10大数据类型

Redis10大数据类型 Which 101.String&#xff08;字符串&#xff09;2.List&#xff08;列表&#xff09;3.hash &#xff08;哈希&#xff09;4.Set&#xff08;集合&#xff09;5.zset(sorted set&#xff1a;有序集合)6.Redis GEO &#xff08;地理空间&#xff09;7.HyperL…