SparkSQL学习03-数据读取与存储

news2025/1/11 22:54:02

文章目录

    • 1 数据的加载
      • 1.1 方式一:spark.read.format
        • 1.1.1读取json数据
        • 1.1.2 读取jdbc数据
      • 1.2 方式二:spark.read.xxx
        • 1.2.1 读取json数据
        • 1.2.2 读取csv数据
        • 1.2.3 读取txt数据
        • 1.2.4 读取parquet数据
        • 1.2.5 读取orc数据
        • 1.2.6 读取jdbc数据
    • 2 数据的保存
      • 2.1 方式一:spark.write.format
        • 2.1.1 读取orc数据
      • 2.2 方式二:spark.write.xxx
        • 2.2.1 写入到jdbc数据库中

SparkSQL提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不用格式的数据。SparkSQL默认读取和保存的文件格式为parquet,parquet是一种能够有效存储嵌套数据的列式存储格式。

1 数据的加载

SparkSQL提供了两种方式可以加载数据

1.1 方式一:spark.read.format

  • spark.read.format读取数据文件格式.load加载数据路径”
  • 数据文件格式包括csv、jdbc、json、orc、parquet和textFile。
  • 需要注意:在读取jdbc时需要在format和load之间添加多个option进行相应的JDBC参数设置【url、user、password.tablename】load中不用传递路经空参数即可
  • 数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format
1.1.1读取json数据

json数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}

object _06SparkReadData {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .appName("SparkReadData")
      .master("local[*]").getOrCreate()

    //使用第一种范式加载数据
    var frame: DataFrame = session.read.format("json")
      .load("data/people.json")
    frame.printSchema()
    /**
     * 运行结果:
     root
      |-- age: long (nullable = true)
      |-- height: double (nullable = true)
      |-- name: string (nullable = true)
      |-- province: string (nullable = true)
     */
    frame.show()
    /**
     * 运行结果:
      +---+------+-------+--------+
      |age|height|   name|province|
      +---+------+-------+--------+
      | 10| 168.8|Michael|    广东|
      | 30| 168.8|   Andy|    福建|
      | 19| 169.8| Justin|    浙江|
      | 32| 188.8| 王启峰|    广东|
      | 10| 168.8|   John|    河南|
      | 19| 179.8|   Domu|    浙江|
      +---+------+-------+--------+
     * */
  }
}
1.1.2 读取jdbc数据

读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}

object _06SparkReadData {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .appName("SparkReadData")
      .master("local[*]").getOrCreate()
    // 如果读取的JDBC操作(即读取mysql中的数据)
    val frame = session.read.format("jdbc")
            .option("url","jdbc:mysql://localhost:3306/mydb1")
            .option("dbtable","location_info")
            .option("user","root")
            .option("password","123456")
            .load()
    frame.printSchema()
  }
}

1.2 方式二:spark.read.xxx

  • 上述的书写方式太过项,所以SparksQL推出了更加便捷的方式spark.read.xxx加载数据路径”)
  • XXX包括csv、jdbc、json、orc、parquet和text
  • 需要注意:在读取jdbc时方法参数为三个分别为【url、tablename、properties对象】,其中properties对象中存储的是【user,password】
1.2.1 读取json数据

json数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}

object _06SparkReadData {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .appName("SparkReadData")
      .master("local[*]").getOrCreate()
    
    //【推荐使用】第二种方式进行读取操作
    val frame = session.read.json("data/people.json")
    frame.printSchema()
    /**
    root
     |-- age: long (nullable = true)
     |-- height: double (nullable = true)
     |-- name: string (nullable = true)
     |-- province: string (nullable = true)
     */
    frame.show()
    /**
    +---+------+-------+--------+
    |age|height|   name|province|
    +---+------+-------+--------+
    | 10| 168.8|Michael|    广东|
    | 30| 168.8|   Andy|    福建|
    | 19| 169.8| Justin|    浙江|
    | 32| 188.8| 王启峰|    广东|
    | 10| 168.8|   John|    河南|
    | 19| 179.8|   Domu|    浙江|
    +---+------+-------+--------+ 
     */
   }
 }
1.2.2 读取csv数据

csv数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}

object _06SparkReadData {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .appName("SparkReadData")
      .master("local[*]").getOrCreate()
    val frame = session.read.csv("data/country.csv")
    frame.printSchema()
    /**
    root
     |-- _c0: string (nullable = true)
     |-- _c1: string (nullable = true)
     |-- _c2: string (nullable = true)
     */
    frame.show()
    /**
    +---+----------------+---+
    |_c0|             _c1|_c2|
    +---+----------------+---+
    |  1|            中国|  1|
    |  2|      阿尔巴尼亚|ALB|
    |  3|      阿尔及利亚|DZA|
    |  4|          阿富汗|AFG|
    |  5|          阿根廷|ARG|
    |  6|阿拉伯联合酋长国|ARE|
    |  7|          阿鲁巴|ABW|
    |  8|            阿曼|OMN|
    |  9|        阿塞拜疆|AZE|
    | 10|        阿森松岛|ASC|
    | 11|            埃及|EGY|
    | 12|      埃塞俄比亚|ETH|
    | 13|          爱尔兰|IRL|
    | 14|        爱沙尼亚|EST|
    | 15|          安道尔|AND|
    | 16|          安哥拉|AGO|
    | 17|          安圭拉|AIA|
    | 18|安提瓜岛和巴布达|ATG|
    | 19|        澳大利亚|AUS|
    | 20|          奥地利|AUT|
    +---+----------------+---+
     */
   }
 }
1.2.3 读取txt数据

txt数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}

object _06SparkReadData {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .appName("SparkReadData")
      .master("local[*]").getOrCreate()
    val frame = session.read.text("data/dailykey.txt")
    frame.printSchema()
    /**
    root
     |-- value: string (nullable = true)
     * */
    frame.show()
    /**
    +--------------------+
    |               value|
    +--------------------+
    |2018-11-13\ttom\t...|
    |2018-11-13\ttom\t...|
    |2018-11-13\tjohn\...|
    |2018-11-13\tlucy\...|
    |2018-11-13\tlucy\...|
    |2018-11-13\tjohn\...|
    |2018-11-13\tricha...|
    |2018-11-13\tricha...|
    |2018-11-13\tricha...|
    |2018-11-14\ttom\t...|
    |2018-11-14\ttom\t...|
    |2018-11-14\ttom\t...|
    +--------------------+
     * */
    }
  }

1.2.4 读取parquet数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}

object _06SparkReadData {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .appName("SparkReadData")
      .master("local[*]").getOrCreate()
    val frame = session.read.parquet("data/users.parquet")
    frame.printSchema()
    /**
    root
     |-- name: string (nullable = true)
     |-- favorite_color: string (nullable = true)
     |-- favorite_numbers: array (nullable = true)
     |    |-- element: integer (containsNull = true)
     */
    frame.show()

    /*
    +------+--------------+----------------+
    |  name|favorite_color|favorite_numbers|
    +------+--------------+----------------+
    |Alyssa|          null|  [3, 9, 15, 20]|
    |   Ben|           red|              []|
    +------+--------------+----------------+
     */
  }
}
1.2.5 读取orc数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}

object _06SparkReadData {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .appName("SparkReadData")
      .master("local[*]").getOrCreate()
    val frame = session.read.orc("data/student.orc")
    frame.printSchema()
    /**
    root
     |-- id: string (nullable = true)
     |-- name: string (nullable = true)
     |-- age: string (nullable = true)
     |-- gender: string (nullable = true)
     |-- course: string (nullable = true)
     |-- score: string (nullable = true)
     */
    frame.show()

    /**
    +---+------+---+------+-------+-----+
    | id|  name|age|gender| course|score|
    +---+------+---+------+-------+-----+
    | 12|  张三| 25|    男|chinese|   50|
    | 12|  张三| 25|    男|   math|   60|
    | 12|  张三| 25|    男|english|   70|
    | 12|  李四| 20|    男|chinese|   50|
    | 12|  李四| 20|    男|   math|   50|
    | 12|  李四| 20|    男|english|   50|
    | 12|  王芳| 19|    女|chinese|   70|
    | 12|  王芳| 19|    女|   math|   70|
    | 12|  王芳| 19|    女|english|   70|
    | 13|张大三| 25|    男|chinese|   60|
    | 13|张大三| 25|    男|   math|   60|
    | 13|张大三| 25|    男|english|   70|
    | 13|李大四| 20|    男|chinese|   50|
    | 13|李大四| 20|    男|   math|   60|
    | 13|李大四| 20|    男|english|   50|
    | 13|王小芳| 19|    女|chinese|   70|
    | 13|王小芳| 19|    女|   math|   80|
    | 13|王小芳| 19|    女|english|   70|
    +---+------+---+------+-------+-----+
     */
  }
}

1.2.6 读取jdbc数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}

object _06SparkReadData {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .appName("SparkReadData")
      .master("local[*]").getOrCreate()
    // 读取jdbc文件
    val properties = new Properties()
    properties.put("user","root")
    properties.put("password","123456")
    val frame = session.read.jdbc("jdbc:mysql://localhost:3306/mydb1"
      ,"location-info",properties)
    frame.printSchema()
    frame.show()
  }
}

2 数据的保存

SparkSQL提供了两种方式可以保存数据

2.1 方式一:spark.write.format

  • spark.write.format(“保存数据格式”).mode(“存储格式”).save(“存储数据路径”)
  • 数据文件格式包括csv、jdbc、json、orc、parquet和textFile。
  • 保存数据可以使用SaveMode,用来指明如何处理数据,使用mode()方法来设置
  • SaveMode是一个枚举类,其中的常量包括:
scala/javaAny LanguageMeaning
SaveMode.ErrorifExists(default)“error”(default)如果文件已经存在,则抛出异常
SaveMode.Append“append”如果文件已经存在,则追加
SaveMode.Overwrite“overwrite”如果文件已经存在,则覆盖
SaveMode.Ignore“ignore”如果文件已经存在,则忽略

需要注意:在读取jdbc时需要在format和save之间添加多个option进行相应的JDBC参数设置【url、user、password、tablename】save中不用传递路经空参数即可,可以不用设置mode

数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format

2.1.1 读取orc数据
package _02SparkSQL
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object _07SparkWriteData {
  def main(args: Array[String]): Unit = {
    //提供SparkSession对象
    val session = SparkSession.builder()
    .appName("SparkWriteData")
    .master("local").getOrCreate()
    //先读取数据
    var frame: DataFrame = session.read.orc("data/student.orc")
    //保存到某个路径下,OWstudent为文件夹,不需要文件名
    frame.write.format("json").mode(SaveMode.Overwrite).save("data/OWstudent")
    session.stop()
  }
}

最后结果为:
在这里插入图片描述

2.2 方式二:spark.write.xxx

上述的书写方式太过繁项,所以SparksQL推出了更加便捷的方式:

  • spark.write.xxx(“保存数据路径”)
  • XXX包括csv、jdbc、json、orc、parquet和text
  • 需要注意:在保存jdbc时方法参数为三个分别为【url、tablename、properties对象】,其中properties对象中存储的是【user,password】
  • mode可以选择性设置
2.2.1 写入到jdbc数据库中
package _02SparkSQL
import java.util.Properties

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object _07SparkWriteData {
  def main(args: Array[String]): Unit = {
    //提供SparkSession对象
    val session = SparkSession.builder()
    .appName("SparkWriteData")
    .master("local").getOrCreate()

    //先读取数据
    var frame: DataFrame = session.read.orc("data/student.orc")
    
    val properties = new Properties()
    properties.put("user","root")
    properties.put("password","123456")
    frame.write.mode(SaveMode.Append)
      .jdbc("jdbc:mysql://localhost:3306/mydb1","student",properties)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1466706.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RT-Thread-快速入门-2-时钟与定时器

时钟与定时器 阅读须知 定义与作用 定义 系统时钟 系统时钟在RT-Thread中用于管理时间,为系统运行提供时间基准。系统时钟由硬件计时器(通常是CPU的内部定时器或外部定时器)提供时钟节拍,这些时钟节拍通常以固定频率中断CPU&#…

opengl 学习纹理

一.纹理是什么? 纹理是一个2D图片(甚至也有1D和3D的纹理),它可以用来添加物体的细节;类似于图像一样,纹理也可以被用来储存大量的数据,这些数据可以发送到着色器上。 采样是指用纹理坐标来获取纹…

npm install 失败,需要node 切换到 对应版本号

npm install 失败 原本node 的版本号是16.9,就会报以上错误 node版本问题了,我切到这个版本,报同样的错。降一下node(14.18)版本就好了 具体的方法:(需要在项目根目录下切换) 1. …

微服务学习

一、服务注册发现 服务注册就是维护一个登记簿,它管理系统内所有的服务地址。当新的服务启动后,它会向登记簿交待自己的地址信息。服务的依赖方直接向登记簿要Service Provider地址就行了。当下用于服务注册的工具非常多ZooKeeper,Consul&am…

JavaScript从零写网站《一瞬》开发日志20240223

产品介绍 一个无需注册能随时发布图片并配一段文字介绍的app,有时间线,用户在主页面向下滑动,可以看到被发布的若干图片,并且能够在每一个发布处做基本互动——评论,点赞 编程语言 本产品使用htmlcssJavaScript开发…

【Docker】构建pytest-playwright镜像并验证

Dockerfile FROM ubuntu LABEL maintainer "langhuang521l63.com" ENV TZAsia/Shanghai #设置时区 #安装python3依赖与下载安装包 RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone \&& apt update \&&…

Linux——进程概念

目录 冯诺依曼体系结构 操作系统 管理 系统调用和库函数 进程的概念 进程控制块——PCB 查看进程 通过系统调用获取进程标示符 通过系统调用创建进程 进程状态 运行状态-R ​编辑 浅度睡眠状态-S 深度睡眠状态-D 暂停状态-T 死亡状态-X 僵尸状态-Z 僵尸进程…

Open CASCADE学习|绘制砂轮

今天绘制一个砂轮,其轮廓由两条直线段和两段圆弧构成,圆弧分别与直线相切,两条圆弧之间相交而非相切。建模思路是:先给定两条直线段的起始点及长度,画出直线段,然后给定其中一圆弧的半径及圆心角&#xff0…

Linux之ACL访问控制列表

一、ACL权限的介绍 1.1 什么是ACL 访问控制列表(ACL)是一种网络安全技术,它通过在网络设备(如路由器、交换机和防火墙)上定义一系列规则,对进出接口的数据包进行控制。这些规则可以包含“允许”&…

解决IDEA中Maven下载依赖包过慢或报错的问题

由于公司项目迭代,越来越多的项目开始转型新版本,由于我对Java一直不感冒,但要顺应公司项目要求,遂自己要逐步开始完善Java相关的知识层面,此篇是我在学习SpringBoot时对一些不懂地方及遇到问题时的记录。 学习视频链…

Day 1.进程的基本概念、相关命令、函数结口

进程基本概念 一、进程: 程序:存放在外存中的一段数据组成的文件 进程:是一个程序动态执行的过程,包括进程的创建、进程的调度、进程的消亡 二、进程相关的命令 1.top 动态查看当前系统中所有的进程信息(根据CPU…

基于PostGIS的慢查询引起的空间索引提升实践

目录 前言 一、问题定位 1、前端接口定位 2、后台应用定位 3、找到问题所在 二、空间索引优化 1、数据库查询 2、创建空间索引 3、geography索引 4、再看前端响应 总结 前言 这是一个真实的案例,也是一个新入门的工程师很容易忽略的点。往往在设计数据库的…

项目管理:如何成功完成一个项目

项目管理是一项重要的技能,它可以帮助你成功地完成一个项目。以下是一些关键的步骤,可以帮助你实现这一目标: 1. 明确项目目标:在开始项目之前,你需要明确项目的目标。这将有助于你制定一个明确的计划,并确…

最长公共前缀【简单】

题目 编写一个函数来查找字符串数组中的最长公共前缀。 如果不存在公共前缀&#xff0c;返回空字符串 ""。 示例如下&#xff1a; 所给提示如下&#xff1a; 1 < strs.length < 2000 < strs[i].length < 200strs[i] 仅由小写英文字母组成 解题 根据…

iOS面试:4.多线程GCD

一、多线程基础知识 1.1 什么是进程&#xff1f; 进程是指在系统中正在运行的一个应用程序。对于电脑而已&#xff0c;你打开一个软件&#xff0c;就相当于开启了一个进程。对于手机而已&#xff0c;你打开了一个APP&#xff0c;就相当于开启了一个进程。 1.2 什么是线程&am…

反序列化字符串逃逸 [安洵杯 2019]easy_serialize_php1

打开题目 $_SESSION是访客与整个网站交互过程中一直存在的公有变量 然后看extract()函数的功能&#xff1a; extract($_POST)就是将post的内容作为这个函数的参数。 extract() 函数从数组中将变量导入到当前的符号表(本题的作用是将_SESSION的两个函数变为post传参) function…

MySql-DQL-条件查询

目录 条件查询修改数据 查询 姓名 为 Name10 的员工查询 id小于等于5 的员工信息查询 没有分配职位 的员工信息查询 有职位 的员工信息查询 密码不等于 password1 的员工信息查询 入职日期 在 2000-01-01 (包含) 到 2010-01-01(包含) 之间的员工信息查询 入职时间 在 2000-01-0…

linux之权限管理(实施必会!!!!)

文章目录 一、什么是ACL二、操作步骤2.1. 添加测试目录&#xff0c;用户&#xff0c;组&#xff0c;并将用户添加到组2.2.查看组是否正常建立 cat /etc/group2.3设定权限2.4此时需要为临时用户进行权限&#xff1a; r-x 三、ACL中mask修改最大权限四、ACL权限的删除五、ACL权限…

pclpy KD-Tree K近邻搜索

pclpy KD-Tree K近邻搜索 一、算法原理1.KD-Tree 介绍2.原理 二、代码三、结果1.原点云2.k近邻点搜索后的点云 四、相关数据 一、算法原理 1.KD-Tree 介绍 kd 树或 k 维树是计算机科学中使用的一种数据结构&#xff0c;用于在具有 k 维的空间中组织一定数量的点。它是一个二叉…

MybatisPlus--03--IService、ServiceImpl

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 1. IService接口1.1 IService、ServiceImpl 接口的使用第一步&#xff1a;实现basemapper接口第二步&#xff1a;编写service类第三步&#xff1a;编写serviceImpl第…