Spark SQL实战(07)-Data Sources

news2025/1/11 21:59:15

1 概述

Spark SQL通过DataFrame接口支持对多种数据源进行操作。

DataFrame可使用关系型变换进行操作,也可用于创建临时视图。将DataFrame注册为临时视图可以让你对其数据运行SQL查询。

本节介绍使用Spark数据源加载和保存数据的一般方法,并进一步介绍可用于内置数据源的特定选项。

数据源关键操作:

  • load
  • save

2 大数据作业基本流程

input 业务逻辑 output
不管是使用MR/Hive/Spark/Flink/Storm。

Spark能处理多种数据源的数据,而且这些数据源可以是在不同地方:

  • file/HDFS/S3/OSS/COS/RDBMS
  • json/ORC/Parquet/JDBC
object DataSourceApp {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .master("local").getOrCreate()
    
    text(spark)
    // json(spark)
    // common(spark)
    // parquet(spark)

    // convert(spark)

    // jdbc(spark)
    jdbc2(spark)
    spark.stop()
  }
}

3 text数据源读写

读取文本文件的 API,SparkSession.read.text()

参数:

  • path:读取文本文件的路径。可以是单个文件、文件夹或者包含通配符的文件路径。
  • wholetext:如果为 True,则将整个文件读取为一条记录;否则将每行读取为一条记录。
  • lineSep:如果指定,则使用指定的字符串作为行分隔符。
  • pathGlobFilter:用于筛选文件的通配符模式。
  • recursiveFileLookup:是否递归查找子目录中的文件。
  • allowNonExistingFiles:是否允许读取不存在的文件。
  • allowEmptyFiles:是否允许读取空文件。

返回一个 DataFrame 对象,其中每行是文本文件中的一条记录。

def text(spark: SparkSession): Unit = {
  import spark.implicits._

  val textDF: DataFrame = spark.read.text(
    "/Users/javaedge/Downloads/sparksql-train/data/people.txt")

  val result: Dataset[(String, String)] = textDF.map(x => {
    val splits: Array[String] = x.getString(0).split(",")
    (splits(0).trim, splits(1).trim)
  })

编译无问题,运行时报错:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 2 columns.;

思考下,如何使用text方式,输出多列的值?

修正后

val result: Dataset[String] = textDF.map(x => {
  val splits: Array[String] = x.getString(0).split(",")
  splits(0).trim
})

result.write.text("out")

继续报错:

Exception in thread "main" org.apache.spark.sql.AnalysisException: path file:/Users/javaedge/Downloads/sparksql-train/out already exists.;

回想Hadoop中MapReduce的输出:

  • 第一次0K
  • 第二次也会报错输出目录已存在

这关系到 Spark 中的 mode

SaveMode

Spark SQL中,使用DataFrame或Dataset的write方法将数据写入外部存储系统时,使用“SaveMode”参数指定如何处理已存在的数据。

SaveMode有四种取值:

  1. SaveMode.ErrorIfExists:如果目标路径已经存在,则会引发异常
  2. SaveMode.Append:将数据追加到现有数据
  3. SaveMode.Overwrite:覆盖现有数据
  4. SaveMode.Ignore:若目标路径已经存在,则不执行任何操作

所以,修正如下:

result.write.mode(SaveMode.overwrite).text("out")

4 JSON 数据源

// JSON
def json(spark: SparkSession): Unit = {
  import spark.implicits._

  val jsonDF: DataFrame = spark.read.json(
    "/Users/javaedge/Downloads/sparksql-train/data/people.json")

  jsonDF.show()

  // 只要age>20的数据
  jsonDF.filter("age > 20")
    .select("name")
    .write.mode(SaveMode.Overwrite).json("out")
  
output:
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

嵌套 JSON

// 嵌套 JSON
val jsonDF2: DataFrame = spark.read.json(
  "/Users/javaedge/Downloads/sparksql-train/data/people2.json")
jsonDF2.show()

jsonDF2.select($"name",
  $"age",
  $"info.work".as("work"),
  $"info.home".as("home"))
  .write.mode("overwrite")
  .json("out")

output:
+---+-------------------+----+
|age|               info|name|
+---+-------------------+----+
| 30|[shenzhen, beijing]|  PK|
+---+-------------------+----+

5 标准写法

// 标准API写法
private def common(spark: SparkSession): Unit = {
  import spark.implicits._

  val textDF: DataFrame = spark.read.format("text").load(
    "/Users/javaedge/Downloads/sparksql-train/data/people.txt")
  val jsonDF: DataFrame = spark.read.format("json").load(
    "/Users/javaedge/Downloads/sparksql-train/data/people.json")
  textDF.show()
  println("~~~~~~~~")
  jsonDF.show()

  jsonDF.write.format("json").mode("overwrite").save("out")

}

output:
+-----------+
|      value|
+-----------+
|Michael, 29|
|   Andy, 30|
| Justin, 19|
+-----------+

~~~~~~~~
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

6 Parquet数据源

6.1 简介

一种列式存储格式,在大数据环境中高效地存储和处理数据。由Hadoop生态系统中的Apache Parquet项目开发的。

6.2 设计目标

支持高效的列式存储和压缩,并提供高性能的读/写能力,以便处理大规模结构化数据。

Parquet可以与许多不同的计算框架一起使用,如Apache Hadoop、Apache Spark、Apache Hive等,因此广泛用于各种大数据应用程序中。

6.3 优点

高性能、节省存储空间、支持多种编程语言和数据类型、易于集成和扩展等。

private def parquet(spark: SparkSession): Unit = {
  import spark.implicits._

  val parquetDF: DataFrame = spark.read.parquet(
    "/Users/javaedge/Downloads/sparksql-train/data/users.parquet")
  parquetDF.printSchema()
  parquetDF.show()

  parquetDF.select("name", "favorite_numbers")
    .write.mode("overwrite")
    .option("compression", "none")
    .parquet("out")
  
output:
root
 |-- name: string (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- favorite_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

7convert

方便从一种数据源写到另一种数据源。

存储类型转换:JSON==>Parquet

def convert(spark: SparkSession): Unit = {
  import spark.implicits._

  val jsonDF: DataFrame = spark.read.format("json")
    .load("/Users/javaedge/Downloads/sparksql-train/data/people.json")
  jsonDF.show()

  jsonDF.filter("age>20")
    .write.format("parquet").mode(SaveMode.Overwrite).save("out")

8 JDBC

有些数据是在MySQL,使用Spark处理,肯定要通过Spark读出MySQL的数据。
数据源是text/json,通过Spark处理完后,要将统计结果写入MySQL。

查 DB

写法一

def jdbc(spark: SparkSession): Unit = {
  import spark.implicits._

  val jdbcDF = spark.read
    .format("jdbc")
    .option("url", "jdbc:mysql://localhost:3306")
    .option("dbtable", "smartrm_monolith.order")
    .option("user", "root")
    .option("password", "root")
    .load()

  jdbcDF.filter($"order_id" > 150).show(100)
}

写法二

val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")

val jdbcDF2: DataFrame = spark.read
  .jdbc(url, srcTable, connectionProperties)

jdbcDF2.filter($"order_id" > 100)

写 DB

val connProps = new Properties()
connProps.put("user", "root")
connProps.put("password", "root")

val jdbcDF: DataFrame = spark.read.jdbc(url, srcTable, connProps)

jdbcDF.filter($"order_id" > 100)
  .write.jdbc(url, "smartrm_monolith.order_bak", connProps)

若 目标表不存在,会自动帮你创建:

统一配置管理

如何将那么多数据源配置参数统一管理呢?

先引入依赖:

<dependency>
    <groupId>com.typesafe</groupId>
    <artifactId>config</artifactId>
    <version>1.3.3</version>
</dependency>

配置文件:

读配置的程序:

package com.javaedge.bigdata.chapter05

import com.typesafe.config.{Config, ConfigFactory}

object ParamsApp {

  def main(args: Array[String]): Unit = {

    val config: Config = ConfigFactory.load()
    val url: String = config.getString("db.default.url")
    println(url)

  }

}
private def jdbcConfig(spark: SparkSession): Unit = {
  import spark.implicits._

  val config = ConfigFactory.load()
  val url = config.getString("db.default.url")
  val user = config.getString("db.default.user")
  val password = config.getString("db.default.password")
  val driver = config.getString("db.default.driver")
  val database = config.getString("db.default.database")
  val table = config.getString("db.default.table")
  val sinkTable = config.getString("db.default.sink.table")

  val connectionProperties = new Properties()
  connectionProperties.put("user", user)
  connectionProperties.put("password", password)

  val jdbcDF: DataFrame = spark.read.jdbc(url, s"$database.$table", connectionProperties)

  jdbcDF.filter($"order_id" > 100).show()

写到新表:

jdbcDF.filter($"order_id" > 158)
.write.jdbc(url, s"$database.$sinkTable", connectionProperties)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/430606.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

node安装

一、下载nodejs的安装包&#xff1a; 下载地址&#xff1a;https://nodejs.org/zh-cn/download 根据自己电脑系统及位数选择&#xff0c;一般都选择windows64位.msi格式安装包 二、改变nodejs的下载依赖包路径 安装完nodejs后&#xff0c;也同时安装了npm&#xff0c; npm是…

半监督语义分割_paper reading part1

Assignment 要解决的问题思路方法结果自己的想法 01 A Survey on Semi-Supervised Semantic Segmentation University of Granada, 18071, Granada, Spain 2023.02出版 problem to solve ss先前的&#xff08;19年&#xff09;不适用先前的调研包含弱监督&#xff0c;ss不…

Docker Desktop使用PostgreSql配合PGAdmin的使用

在看此教程之前&#xff0c;请先下载安装Docker Desktop 安装成功可以查看版本 然后拉取postgresql的镜像&#xff1a;docker pull postgres:14.2 版本可以网上找一个版本&#xff0c;我的不是最新的 发现会报一个问题 no matching manifest for windows/amd64 10.0.19045 i…

小心,丢失的消息!RocketMQ投递策略帮你解决问题!博学谷狂野架构师

RocketMQ消息投递策略 作者: 博学谷狂野架构师GitHub&#xff1a;GitHub地址 &#xff08;有我精心准备的130本电子书PDF&#xff09;只分享干货、不吹水&#xff0c;让我们一起加油&#xff01;&#x1f604; 前言 RocketMQ的消息投递分分为两种&#xff1a;一种是生产者往MQ …

java中级面试题

1.假如有两个线程共同操作数据库&#xff0c;以乐观锁的角度考虑&#xff0c;怎么确保不会发生并发问题&#xff1f; PS&#xff1a;考点是CAS&#xff0c;比较并替换。CAS中有三个值&#xff0c;内存中的值&#xff0c;新值&#xff0c;旧值。 假如内存中的值是2000&#xf…

[C++]string类的模拟实现和相关函数的详解

目录string总体架构具体实现默认成员函数构造函数构造拷贝函数析构函数赋值重载[]相关操作函数c_str() && size()reserve() && resize()push_back() && append()find()inserterase() && clear其余操作符重载< 、 <、 >、 >、 !<…

【系统集成项目管理工程师】项目整体管理

&#x1f4a5;十大知识领域&#xff1a;项目整体管理 项目整体管理包括以下 6 个过程: 制定项目章程定项目管理计划指导与管理项目工作监控项目工作实施整体变更控制结束项目或阶段过程 一、制定项目章程 制定项目章程。编写一份正式文件的过程&#xff0c;这份文件就是项目章程…

某程序员哀叹:月薪四五万,却每天极度焦虑痛苦,已有生理性不适,又不敢裸辞,怎么办?

高薪能买来快乐吗&#xff1f; 来看看这位程序员的哀叹&#xff1a; 实在是扛不住了&#xff0c;每天都在极度焦虑和痛苦中度过&#xff0c;早上起来要挣扎着做心理建设去上班&#xff0c;已经产生生理性的头晕恶心食欲不振。有工作本身的原因&#xff0c;更多是自己心态的问…

OpenCV+FFmpeg 实现人脸检测Rtmp直播推流(Python快速实现)

实现效果 windows平台笔记本摄像头视频采集、人脸识别&#xff0c;识别后将视频推流到RTMP流媒体服务器&#xff0c;在任意客户端可以进行RTMP拉流播放。 效果如图&#xff1a; 使用VLC播放器进行拉流。 准备工作 需要先安装OpenCV的python包以及FFmpeg。 对于ffmpeg有两…

Java——删除链表中重复的节点

题目链接 牛客在线oj题——删除链表中重复的节点 题目描述 在一个排序的链表中&#xff0c;存在重复的结点&#xff0c;请删除该链表中重复的结点&#xff0c;重复的结点不保留&#xff0c;返回链表头指针。 例如&#xff0c;链表 1->2->3->3->4->4->5 处…

【Vue】学习笔记-数据代理

数据代理 Object.defineproperty方法 <script type"text/javascript">let number18let person{name:张三,sex:男,}//age属性 不参与遍历Object.defineProperty(person,age,{//value:18,//enumerable:true, //控制属性是否可以枚举&#xff0c;默认值是false//…

科技成果评价最新攻略,你确定不来看看?

一、什么是科技成果评价&#xff1f; 是指按照委托者的要求&#xff0c;由具有评价资质的第三方专业机构聘请专家&#xff0c;坚持实事求是、科学民主、客观公正、注重质量、讲求实效的原则&#xff0c;依照规定的程序和标准&#xff0c;对被评价科技成果进行审查与辨别&#…

[Java Web]VUE | vue:一项Java Web开发中不可或缺的前端技术

⭐作者介绍&#xff1a;大二本科网络工程专业在读&#xff0c;持续学习Java&#xff0c;努力输出优质文章 ⭐作者主页&#xff1a;逐梦苍穹 ⭐所属专栏&#xff1a;Java Web ⭐如果觉得文章写的不错&#xff0c;欢迎点个关注一键三连&#x1f609;有写的不好的地方也欢迎指正&a…

AD19 基础应用技巧(快速定义PCB板框,CAD中DWG转DXF格式导入)

【B站一个假的攻城狮】导入CAD图纸到PCB&#xff0c;Altium Designer 21教程&#xff0c;第九节。 http://www.keyboard-layout-editor.com/ http://builder.swillkb.com/ 1、打开中望CAD&#xff0c;并打开一张图纸文件&#xff0c;为了能把孔表达清楚&#xff0c;开孔断面图…

React(六) —— redux

&#x1f9c1;个人主页&#xff1a;个人主页 ✌支持我 &#xff1a;点赞&#x1f44d;收藏&#x1f33c;关注&#x1f9e1; 文章目录⛳Redux&#x1f346;redux定义&#x1f490;redux使用原则&#x1f370;redux使用场景&#x1f9ca;redux工作流程&#x1f96b;redux基本创建…

14.创建组件

组件可以理解为页面的拼图块&#xff0c;一个完整的页面是由若干个组件拼成的 在vue中规定&#xff0c;组件的后缀名为vue&#xff0c;每一个vue文件中应该包含三个大标签 template 组件的模板结构&#xff0c;可以理解为htmlscript 组件的JS&#xff0c;控制组件要执行什么动…

区域检验管理系统(云LIS)源码

1、区域检验管理系统&#xff08;云LIS&#xff09;概述 云LIS是为区域医疗提供临床实验室信息服务的计算机应用程序&#xff0c;可协助区域内所有临床实验室相互协调并完成日常检验工作&#xff0c;对区域内的检验数据进行集中管理和共享&#xff0c;通过对质量控制的管理&am…

Java每日一练(20230418)

目录 1. N皇后 II &#x1f31f;&#x1f31f;&#x1f31f; 2. 字符串相乘 &#x1f31f;&#x1f31f; 3. 买卖股票的最佳时机 &#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练 专栏 Java每日一…

“Natural Earth II“ === “Natural Earth II“是false?你知道空 格的四种写法吗?

前言 有一回对我说道&#xff0c;“你学过前端么&#xff1f;”我略略点一点头。他说&#xff0c;“学过前端&#xff1f;……我便考你一考。html 里面的空格&#xff0c;怎样 coding 的&#xff1f;”我想&#xff0c;讨饭一样的人&#xff0c;也配考我么&#xff1f;便回过脸…

计算机网络 - 网络中的基本概念

前言 本篇介绍网络的一些基本概念&#xff0c;认识IP地址&#xff0c;端口号&#xff0c;协议&#xff1b;了解常用的网络协议模型&#xff0c;知道数据如何封装与分用的&#xff1b;为以后学习计算机网络其它知识做铺垫&#xff0c;如有错误&#xff0c;请在评论区指正&#…