spark 数据的加载和保存(Parquet、JSON、CSV、MySql)

news2024/11/26 15:22:37

spark数据的加载和保存
SparkSQL 默认读取和保存的文件格式为 parquet
1.加载数据
spark.read.load 是加载数据的通用方法

scala> spark.read.
csv format jdbc json load option options orc parquet schema 
table text textFile
如果读取不同格式的数据,可以对不同的数据格式进行设定
scala> spark.read.format("…")[.option("…")].load("…")

  format("…"):指定加载的数据类型,包括"csv""jdbc""json""orc""parquet""textFile"。

  load("…"):在"csv""jdbc""json""orc""parquet""textFile"格式下需要传入加载
数据的路径。

  option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable
前面都是使用 read API 先把文件加载到 DataFrame 然后再查询,也可以直接在文件上进行查询: 文件格式.`文件路径`

scala>spark.sql("select * from json.`/opt/tmp/data/test.json`").show

2.保存数据
df.write.save 是保存数据的通用方法

scala>df.write.
csv jdbc json orc parquet textFile…
scala>df.write.format("…")[.option("…")].save("…")

format("…"):指定保存的数据类型,包括"csv""jdbc""json""orc""parquet""textFile"等

save ("…"):在"csv""orc""parquet""textFile"格式下需要传入保存数据的路径。

option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置。

SaveMode 是一个枚举类,其中的常量包括:
在这里插入图片描述
保存:

df.write.mode("append").json("/opt/tmp/data/output")

Parquet
Spark SQL 的默认数据源为 Parquet 格式。Parquet 是一种能够有效存储嵌套数据的列式
存储格式。
数据源为 Parquet 文件时,Spark SQL 可以方便的执行所有的操作,不需要使用 format。
修改配置项 spark.sql.sources.default,可修改默认数据源格式。
1.加载数据

scala> val df = spark.read.load("tmp/resources/test.parquet")
scala> df.show

2.保存数据

scala> var df = spark.read.json("/opt/tmp/data/input/test.json")
//保存为 parquet 格式
scala> df.write.mode("append").save("/opt/tmp/data/output")

JSON
Spark SQL 能够自动推测 JSON 数据集的结构,并将它加载为一个 Dataset[Row]. 可以通过 SparkSession.read.json()去加载 JSON 文件。
注意:Spark 读取的 JSON 文件不是传统的 JSON 文件,每一行都应该是一个 JSON 串。格式如下:

{"name":"zhangsan"}
{"name":"lisi""age":21}
[{"name":"wangwu""age":21},{"name":"xiaohua""age":22}]

1.导入隐式转换

import spark.implicits.

2.加载 JSON 文件

val path = "/opt/tmp/spark-test/test.json"
val peopleDF = spark.read.json(path)

3.创建临时表

peopleDF.createOrReplaceTempView("people")

4.数据查询

val teenagerNamesDF = spark.sql("SELECT name FROM test WHERE age BETWEEN 20 AND 22")teenagerNamesDF.show()

CSV
Spark SQL 可以配置 CSV 文件的列表信息,读取 CSV 文件,CSV 文件的第一行设置为数据列

spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("data/test.csv")

MySQL
Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对
DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。如果使用 spark-shell 操
作,可在启动 shell 时指定相关的数据库驱动路径或者将相关的数据库驱动放到 spark 的类
路径下

bin/spark-shell 
--jars mysql-connector-java-5.1.27-bin.jar

Idea 中通过 JDBC 对 Mysql 进行操作
1.导入依赖

<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.27</version>
</dependency>

2.读取数据(mysql)

package SparkTest.sparkmysql

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkMysqlRead {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new
        SparkConf().setMaster("local[*]").setAppName("SparkSQL")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._
    
    //方式 1:通用的 load 方法读取
    spark.read.format("jdbc")
      .option("url", "jdbc:mysql://192.168.58.203:3306/testdb")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123")
      .option("dbtable", "user_test")
      .load().show()

   //方式 2:通用的 load 方法读取 参数另一种形式
  spark.read.format("jdbc")
    .options(Map("url"->"jdbc:mysql://192.168.58.203:3306/testdb?user=root&password=123", "dbtable"->"user_test","driver"->"com.mysql.jdbc.Driver")).load().show()

  //方式 3:使用 jdbc 方法读取
  val props: Properties = new Properties()
  props.setProperty("user", "root")
  props.setProperty("password", "123")
  val df: DataFrame = spark.read.jdbc("jdbc:mysql://192.168.58.203:3306/testdb", "user_test", props)
  df.show()
  
  //释放资源
    spark.stop()
  }
}

3.写入数据(mysql)

package SparkTest.sparkmysql

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}

object SparkMysqlWrite {
  case class User(name: String, age: Long)
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new
        SparkConf().setMaster("local[*]").setAppName("SparkSQL")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._
    val rdd: RDD[User] = spark.sparkContext.makeRDD(List(User("zhangsan", 20), User("lisi", 21)))
    val ds: Dataset[User] = rdd.toDS()


    //方式 1:通用的方式 format 指定写出类型
    ds.write
      .format("jdbc")
      .option("url", "jdbc:mysql://192.168.58.203:3306/testdb")
      .option("user", "root")
      .option("password", "123")
      .option("dbtable", "user1")
      .mode(SaveMode.Append)
      .save()

    //方式 2:通过 jdbc 方法
    val props: Properties = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "123")
    ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.58.203:3306/testdb", "user2", props)
    
    //释放资源
    spark.stop()

  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/480044.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker安装fastdfs

1 拉取镜像 docker pull morunchang/fastdfs如果网速下载慢&#xff0c;可以参考资料文件夹中给大家导出的镜像包上传到 Linux服务器上&#xff0c;通过docker load -i my_fdfs.tar 加载镜像。 使用 docker images查看是否成功 2 运行tracker docker run -d --name tracker -…

MySQL高阶——索引设计的推演

前言 MySQL在我们工作中都会用到&#xff0c;那么我们最常接触的就是增删改查&#xff0c;而对于增删改查来说&#xff0c;我们更多的是查询。但是面试中&#xff0c;面试官又不会问你什么查询是怎么写的&#xff0c;都是问一些索引啊&#xff0c;事务啊&#xff0c; 底层结构…

C. Multiplicity(DP + 分解因数)

Problem - C - Codeforces 给定一个整数数组a1&#xff0c;a2&#xff0c;...&#xff0c;an。 如果可以从a中删除一些元素得到b&#xff0c;则称数组b为a的子序列。 当且仅当对于每个i&#xff08;1≤i≤k&#xff09;&#xff0c;bi是i的倍数时&#xff0c;数组b1&#xff…

Spring Data JPA 快速上手

一、JPA的概述 JPA的全称是Java Persisitence API&#xff0c;即JAVA持久化API&#xff0c;是sum公司退出的一套基于ORM的规范&#xff0c;内部是由一些列的接口和抽象类构成。JPA通过JDK5.0注解描述对象-关系表的映射关系&#xff0c;并将运行期的实体对象持久化到数据库中。…

Mysql 分库分表 Mycat

0 课程视频 https://www.bilibili.com/video/BV1Kr4y1i7ru?p163&spm_id_frompageDriver&vd_sourceff8b7f852278821525f11666b36f180a 1 单库问题 1.1 热点数据多 -> 缓冲区不足 ->内存不足 1.2 数据多 -> 磁盘不足 1.3 请求数据量多 -> 带宽不足 1…

C/C++的命名空间和调用函数的详细讲解

目录 空函数 调用函数 调用 执行流程 命名空间 在创建函数时&#xff0c;必须编写其定义。所有函数定义包括以下组成部分&#xff1a; 名称&#xff1a;每个函数都必须有一个名称。通常&#xff0c;适用于变量名称的规则同样也适用于函数名称。形参列表&#xff1a;调用函…

算法记录lday4 LinkedList链表交换 删除倒数N个点 环形链表

今日任务 ● 24. 两两交换链表中的节点 ● 19.删除链表的倒数第N个节点 ● 面试题 02.07. 链表相交 ● 142.环形链表II 两两交换链表中的节点 题目描述 Given a linked list, swap every two adjacent nodes and return its head. You must solve the problem without modi…

一天吃透Redis面试八股文

Redis连环40问&#xff0c;绝对够全&#xff01; Redis是什么&#xff1f; Redis&#xff08;Remote Dictionary Server&#xff09;是一个使用 C 语言编写的&#xff0c;高性能非关系型的键值对数据库。与传统数据库不同的是&#xff0c;Redis 的数据是存在内存中的&#xf…

Golang每日一练(leetDay0052)

153. 寻找旋转排序数组中的最小值 Find Minimum In Rotated Sorted Array 已知一个长度为 n 的数组&#xff0c;预先按照升序排列&#xff0c;经由 1 到 n 次 旋转 后&#xff0c;得到输入数组。例如&#xff0c;原数组 nums [0,1,2,4,5,6,7] 在变化后可能得到&#xff1a; …

纸模图纸的发展历程+纸模图纸免费下载-手工985

纸模图纸是一种用于制作模型的图纸&#xff0c;通常用于制作卡通纸模图纸、飞机纸模图纸、船只纸模图纸、汽车纸模图纸、建筑纸模图纸等模型。 纸模图纸的历史 纸模图纸的历史可以追溯到19世纪末期。当时&#xff0c;一些欧洲的制图师开始使用纸板和纸张来制作建筑模型&#xf…

【MATLAB数据处理实用案例详解(21)】——利用SOM自组织映射网络实现癌症样本数据分类和利用SOM自组织映射网络实现柴油机故障分类详解

目录 一、SOM原理分析二、SOM拓扑结构分析三、三种SOM拓扑结构3.1 Gridtop()网格拓扑结构3.2 Hextop()六角形拓扑结构3.3 Randtop()随机拓扑结构 四、利用SOM自组织映射网络实现癌症样本数据分类4.1 问题描述4.2 输入数据4.3 网络建立和训练4.4 验证4.5 预测4.6 运行结果 五、利…

wait/waitpid(重点)介绍

谢谢你的阅读&#xff0c;如有错误请大佬留言 目录 引子&#xff1a; waitpid 返回值介绍 参数介绍 pid status options: 引子&#xff1a; 当一个进程创建子进程后&#xff0c;如果子进程工作结束后会进入僵尸状态&#xff0c;等待父进程回收子进程资源&#xff08;退…

烟花智能直播助手,直播带货必备爆单工具【直播助手脚本+技术教程】

烟花智能直播助手软件教程介绍&#xff1a; 1.账号管理:可以登陆多个账号,一键切换 2.商品批量管理&#xff1a;可一键删除/添加直播商品,一键设置商品卖点 3.自动弹讲解:可设置指定,单个,列表循环自动弹商品讲解 4.智能文字客服:可设置指定关键词对公屏信息进行回复,不限添加条…

【C语言】C语言总结

声明&#xff0c;本文来自中国mooc中的翁凯C语言总结 第一章介绍 变量定义 变量定义的一般形式为&#xff1a;<类型名称><变量名称>变量需要一个名字&#xff0c;变量的名字是一种表示符&#xff0c;意思是用来识别不同的标识符标识符的基本构造规则为&#xff…

近世代数 笔记与题型连载 第十一章(正规子群与商群)

文章目录 基本概念1.正规子群2.商群 常见题型1.正规子群的判定和证明2.给定群和正规子群&#xff0c;求商群 基本概念 1.正规子群 正规子群的定义&#xff1a;设<G,※>是群&#xff0c;H是G的子群。如果对于G中的任意元素g&#xff0c;都有gHHg&#xff0c;则称H是G的正…

org.apache.poi 设置 Excel 单元格颜色 RGB

一、背景说明 在使用 org.apache.poi 导出 Excel 时&#xff0c;需要设置部分单元格的颜色。 可以使用方法&#xff1a;org.apache.poi.ss.usermodel.CellStyle.setFillForegroundColor() 和 org.apache.poi.ss.usermodel.CellStyle.setFillPattern() 来设置单元格的颜色和填…

代码随想录 LeetCode数组篇 二分查找

文章目录 &#xff08;简单&#xff09;35. 搜索插入位置&#xff08;*中等&#xff09;34. 在排序数组中查找元素的第一个和最后一个位置&#xff08;简单&#xff0c;常见面试题&#xff09;69. x的平方根&#xff08;简单&#xff09; 367. 有效的完全平方数 # &#xff08;…

三款自研AI应用引领未来,重塑行业新风尚

在这个科技日新月异的时代&#xff0c;AI技术已经渗透到我们生活的方方面面。今天&#xff0c;我们将向您推荐三款领域独具特色的AI应用&#xff0c;它们分别是AI律师、AI小红书文案提示词、以及AI Midjourney提示词。这些应用都具有独特的内涵&#xff0c;让我们一起走进这些智…

linux--进程程序替换

目录 一、什么是进程程序替换 二、原理&#xff1a; 三、为什么要进行程序替换 四、六种替换函数 命名理解 (1)函数execl (2)函数execv (3)execlp (4)execvp (5)execle (6)execve 一、什么是进程程序替换 所谓进程程序替换&#xff0c;顾名思义&#xff0c;就是使用一个…

‘cnpm‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件

文章目录 1.0 问题描述2.0 检查环境&#xff1a;2.0.1查看node 环境2.0.2 cnpm 要安装好 3.0 查看原因4.0 问题解决【配置环境变量】4.0.1 方式一4.0.2 方式二 5.0 测试成功 1.0 问题描述 ‘cnpm’ 不是内部或外部命令&#xff0c;也不是可运行的程序 或批处理文件。 2.0 检…