大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive

news2025/1/16 7:44:25

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节我们完成了如下的内容:

  • SparkSQL 核心操作
  • Action操作 详细解释+测试案例
  • Transformation操作 详细解释+测试案例

在这里插入图片描述

SQL 语句

总体而言:SparkSQL语HQL兼容;与HQL相比,SparkSQL更简洁。
SparkSQL是Apache Spark框架中的一个模块,专门用于处理结构化和半结构化数据。它提供了对数据进行查询、处理和分析的高级接口。

SparkSQL的核心特点包括:

  • DataFrame API:SparkSQL提供了DataFrame API,它是一种以行和列为结构的数据集,与关系数据库中的表非常相似。DataFrame支持多种数据源,如Hive、Parquet、JSON、JDBC等,可以轻松地将数据导入并进行操作。
  • SQL查询:SparkSQL允许用户通过标准的SQL语法查询DataFrame,这使得数据分析师和工程师可以使用他们熟悉的SQL语言来处理大数据。SparkSQL会自动将SQL查询转换为底层的RDD操作,从而在分布式环境中执行。
  • 与Hive集成:SparkSQL可以与Hive无缝集成,使用Hive的元数据和查询引擎。它支持HiveQL(Hive Query Language)语法,并且能够直接访问Hive中的数据。
  • 性能优化:SparkSQL采用了多种优化技术,如Catalyst查询优化器和Tungsten物理执行引擎。这些优化技术能够自动生成高效的执行计划,提高查询的执行速度。

数据样例

// 数据
1 1,2,3
2 2,3
3 1,2

// 需要实现如下的效果
1 1
1 2
1 3
2 2
2 3
3 1
3 2

编写代码

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.Encoders


case class Info(id: String, tags: String)

object SparkSql01 {

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession
      .builder()
      .appName("SparkSQLDemo")
      .master("local[*]")
      .getOrCreate()

    val sc = sparkSession.sparkContext
    sc.setLogLevel("WARN")

    val arr = Array("1 1,2,3", "2 2,3", "3 1,2")
    val rdd: RDD[Info] = sc
      .makeRDD(arr)
      .map{
        line => val fields: Array[String] = line.split("\\s+")
          Info(fields(0), fields(1))
      }

    import sparkSession.implicits._
    implicit val infoEncoder = Encoders.product[Info]

    val ds: Dataset[Info] = sparkSession.createDataset(rdd)
    ds.createOrReplaceTempView("t1")

    sparkSession.sql(
      """
        | select id, tag
        | from t1
        | lateral view explode(split(tags, ",")) t2 as tag
        |""".stripMargin
    ).show
    sparkSession.sql(
      """
        | select id, explode(split(tags, ","))
        | from t1
        |""".stripMargin
    ).show

    sparkSession.close()
  }

}

运行测试

控制台输出结果为:

+---+---+
| id|tag|
+---+---+
|  1|  1|
|  1|  2|
|  1|  3|
|  2|  2|
|  2|  3|
|  3|  1|
|  3|  2|
+---+---+

+---+---+
| id|col|
+---+---+
|  1|  1|
|  1|  2|
|  1|  3|
|  2|  2|
|  2|  3|
|  3|  1|
|  3|  2|
+---+---+

运行结果

运行结果如下图所示:
在这里插入图片描述

输入与输出

SparkSQL 内建支持的数据源包括:

  • Parquet (默认数据源)
  • JSON
  • CSV
  • Avro
  • Images
  • BinaryFiles(Spark 3.0)

简单介绍一下,Parquet 是一种列式存储格式,专门为大数据处理和分析而设计。

  • 列式存储:Parquet 采用列式存储格式,这意味着同一列的数据存储在一起。这样可以极大地提高查询性能,尤其是当查询只涉及少量列时。
  • 高效压缩:由于同一列的数据具有相似性,Parquet 能够更高效地进行压缩,节省存储空间。
  • 支持复杂数据类型:Parquet 支持嵌套的数据结构,包括嵌套列表、映射和结构体,这使得它非常适合处理复杂的、半结构化的数据。
  • 跨平台:Parquet 是一种开放标准,支持多种编程语言和数据处理引擎,包括 Apache Spark、Hadoop、Impala 等。

在这里插入图片描述

Parquet

特点:Parquet是一种列式存储格式,特别适合大规模数据的存储和处理。它支持压缩和嵌套数据结构,因此在存储效率和读取性能方面表现优异。

使用方式:spark.read.parquet(“path/to/data”) 读取Parquet文件;df.write.parquet(“path/to/output”) 将DataFrame保存为Parquet格式。

JSON

特点:JSON是一种轻量级的数据交换格式,广泛用于Web应用程序和NoSQL数据库中。SparkSQL能够解析和生成JSON格式的数据,并支持嵌套结构。

使用方式:spark.read.json(“path/to/data”) 读取JSON文件;df.write.json(“path/to/output”) 将DataFrame保存为JSON格式。

CSV

特点:CSV(逗号分隔值)是最常见的平面文本格式之一,简单易用,但不支持嵌套结构。SparkSQL支持读取和写入CSV文件,并提供了处理缺失值、指定分隔符等功能。

使用方式:spark.read.csv(“path/to/data”) 读取CSV文件;df.write.csv(“path/to/output”) 将DataFrame保存为CSV格式。

Avro

特点:Avro是一种行式存储格式,适合大规模数据的序列化。它支持丰富的数据结构和模式演化,通常用于Hadoop生态系统中的数据存储和传输。

使用方式:spark.read.format(“avro”).load(“path/to/data”) 读取Avro文件;df.write.format(“avro”).save(“path/to/output”) 将DataFrame保存为Avro格式。

ORC

特点:ORC(Optimized Row Columnar)是一种高效的列式存储格式,专为大数据处理而设计,支持高压缩率和快速读取性能。它在存储空间和I/O性能方面表现优越。

使用方式:spark.read.orc(“path/to/data”) 读取ORC文件;df.write.orc(“path/to/output”) 将DataFrame保存为ORC格式。

Hive Tables

特点:SparkSQL能够无缝集成Hive,直接访问Hive元数据,并对Hive表进行查询。它支持HiveQL语法,并能够利用Hive的存储格式和结构。

使用方式:通过spark.sql(“SELECT * FROM hive_table”)查询Hive表;也可以使用saveAsTable将DataFrame写入Hive表。

JDBC/ODBC

特点:SparkSQL支持通过JDBC/ODBC接口连接关系型数据库,如MySQL、PostgreSQL、Oracle等。它允许从数据库读取数据并将结果写回数据库。

使用方式:spark.read.format(“jdbc”).option(“url”, “jdbc:mysql://host/db”).option(“dbtable”, “table”).option(“user”, “username”).option(“password”, “password”).load() 读取数据库表;df.write.format(“jdbc”).option(“url”, “jdbc:mysql://host/db”).option(“dbtable”, “table”).option(“user”, “username”).option(“password”, “password”).save() 将DataFrame写入数据库。

Text Files

特点:SparkSQL可以处理简单的文本文件,每一行被读取为一个字符串。适合用于处理纯文本数据。

使用方式:spark.read.text(“path/to/data”) 读取文本文件;df.write.text(“path/to/output”) 将DataFrame保存为文本格式。

Delta Lake (外部插件)

特点:Delta Lake是一种开源存储层,构建在Parquet格式之上,支持ACID事务、可扩展元数据处理和流批一体的实时数据处理。尽管不是内建的数据源,但它在Spark生态系统中得到了广泛支持。

使用方式:spark.read.format(“delta”).load(“path/to/delta-table”) 读取Delta表;df.write.format(“delta”).save(“path/to/delta-table”) 将DataFrame保存为Delta格式。

测试案例

val df1 =
spark.read.format("parquet").load("data/users.parquet")
// Use Parquet; you can omit format("parquet") if you wish as
it's the default
val df2 = spark.read.load("data/users.parquet")

// Use CSV
val df3 = spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("data/people1.csv")

// Use JSON
val df4 = spark.read.format("json")
.load("data/emp.json")

此外还支持 JDBC 的方式:

val jdbcDF = sparkSession
  .read
  .format("jdbc")
  .option("url", "jdbc:mysql://h122.wzk.icu/spark_test?useSSL=false")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("user", "hive")
  .option("password", "hive@wzk.icu")
  .load()
jdbcDF.show()

访问Hive

在这里插入图片描述

导入依赖

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

hive-site

需要在项目的 Resource 目录下,新增一个 hive-site.xml
备注:最好使用 metastore service连接Hive,使用直接metastore的方式时,SparkSQL程序会修改Hive的版本信息

<configuration>
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://h122.wzk.icu:9083</value>
    </property>
</configuration>

编写代码

object AccessHive {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("Demo1")
      .master("local[*]")
      .enableHiveSupport()
      // 设为true时,Spark使用与Hive相同的约定来编写Parquet数据
      .config("spark.sql.parquet.writeLegacyFormat", true)
      .getOrCreate()

    val sc = spark.sparkContext
    sc.setLogLevel("warn")

    spark.sql("show databases").show
    spark.sql("select * from ods.ods_trade_product_info").show

    val df: DataFrame = spark.table("ods.ods_trade_product_info")
    df.show()

    df.write.mode(SaveMode.Append).saveAsTable("ods.ods_trade_product_info_back")
    spark.table("ods.ods_trade_product_info_back").show

    spark.close()
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2075108.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

活动|华院计算主办数智人系列沙龙,探讨人工智能如何赋能内容营销

在科技日新月异的今天&#xff0c;人工智能正以前所未有的速度重塑着我们的世界&#xff0c;它不仅深刻改变了传统行业的运作模式&#xff0c;更在电商零售、客户服务、保险金融等多个领域开辟了全新的生态与可能。其中&#xff0c;数智人作为大语言模型、音视频生成等人工智能…

【开端】开发团队如何应对突发的技术故障和危机

开发团队如何应对突发的技术故障和危机&#xff1f; 在数字化时代&#xff0c;软件服务的稳定性至关重要。然而&#xff0c;即便是像网易云音乐这样的大型平台&#xff0c;也难免遇到突发的技术故障。8月19日下午&#xff0c;网易云音乐疑似出现服务器故障&#xff0c;网页端出…

选择护眼台灯的标准是什么?2024值得入手的护眼台灯推荐

2022年3月1日起&#xff0c;正式实施的《儿童青少年学习用品近视防控卫生要求》&#xff08;GB 40070-2021&#xff09;规定了与近视防控相关的读写作业台灯卫生要求。要求从照度、均匀度、显色指数、色温、防蓝光等方面去完善护眼台灯&#xff0c;可见国家多这方面多么的重视&…

黑神话悟空爆火,有人靠它赚翻了!

黑神话悟空&#xff0c;这个游戏最近爆火&#xff0c;相信很多人都知道。 这样的热点事件&#xff0c;对于大多数人来说&#xff0c;那就是图个热闹&#xff0c;吃个瓜&#xff1b; 但对于那些混在互联网副业圈里的&#xff0c;那闻到的都是钱味。 热点事件&#xff0c;意味…

2025长江流域跨境电商展:Temu在丹麦的惊人崛起,跨境电商的新风向标

Temu在丹麦的惊人崛起&#xff1a;跨境电商的新风向标 在全球化电商竞争日益激烈的今天&#xff0c;一个新兴的电商平台能够在短短时间内超越行业巨头亚马逊&#xff0c;成为丹麦消费者的首选&#xff0c;无疑是一个值得关注的现象。拼多多海外分支Temu正是这样一个平台&#…

支付宝开放平台-开发者社区——AI 日报「8 月 26 日」

1 国产机器人黑马首次登场&#xff0c;打螺丝堪比擎天柱&#xff01;国家队全栈自主研发 新智元丨阅读原文 浙江人形机器人创新中心研发的领航者2号 NAVIA1&#xff0c; 在2024 世界机器人大会上首次亮相&#xff0c;展示了其类人外观和高智能作业能力。这款1.65 米高、60公斤…

蓝牙耳机什么价位的性价比高?2024百元性价比品牌机型推荐

随着科技的不断进步&#xff0c;蓝牙耳机已成为现代人日常生活中不可或缺的配件之一&#xff0c;市场上的蓝牙耳机品牌和型号繁多&#xff0c;价格也从几十元到几千元不等&#xff0c;使得消费者在选择时往往感到眼花缭乱&#xff0c;那么蓝牙耳机什么价位的性价比高&#xff1…

安科瑞AEW100电力改造智能电力仪表,体积小巧

AEW100电力改造用智能电力仪表主要用于计量低压网络的三相有功电能&#xff0c;具有RS485通讯和470MHz无线通讯功能&#xff0c;方便用户进行用电监测、集抄和管理。 功能&#xff1a; AEW100电力改造用智能电力仪表主要用于计量低压网络的三相有功电能&#xff0c;具有RS485…

ssm动漫展示系统-计算机毕业设计源码12113

目 录 摘要 1 绪论 1.1 研究背景 1.2 研究意义 1.3论文结构与章节安排 2系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1 数据新增流程 3.2.2 数据删除流程 2.3 系统功能分析 2.3.1 功能性分析 2.3.2 非功能性分析 2.4 系统用例分析 2.5本章小结 3 系统总体设…

如何根据不同的场景选择合适的报表格式?一文详细解答

在处理数据的过程中&#xff0c;许多人常感困扰于报表格式选择的多样性&#xff0c;这源于面对纷繁复杂的数据集时&#xff0c;难以迅速锁定最适合的呈现方式。这种迷茫感源于报表设计的灵活性&#xff0c;每种格式都针对特定情境和数据特性精心打造。 不必为选择何种报表格式…

Dooring智图,一款开箱即用的图片海报编辑器

嗨, 大家好, 我是徐小夕. 之前一直在社区分享零代码&低代码的技术实践&#xff0c;也陆陆续续设计并开发了多款可视化搭建产品&#xff0c;比如&#xff1a; Nocode/Doc&#xff0c;可视化 零代码打造下一代文件编辑器爆肝1000小时, Dooring零代码搭建平台3.5正式上线可视化…

产品小白学习及求职的3个误区,看看自己中招了没?

产品经理是互联网行业中颇有“钱”途的岗位&#xff0c;学习的人也最多&#xff0c;很多小白在学习产品的过程中或多或少的会踩坑&#xff0c;进入误区&#xff0c;小编本文就总结了小白学习产品的3大误区&#xff0c;快来看看自己中招了没吧。 1、画出漂亮的高保真原型就能当产…

骨传导耳机最热门好用款推荐,保你不会踩雷!

耳机发展到现在已经经历了无数次的迭代更新。从有线耳机到如今的无线耳机以及骨传导耳机&#xff0c;功能也更加的全面&#xff0c;从当初的只是用来听音乐&#xff0c;到如今的追求音质、舒适、防水等功能&#xff0c;在无线耳机的市场中&#xff0c;骨传导耳机尤为受欢迎&…

docke进阶---镜像迁移、容器的ip地址、端口映射和持久化

1.镜像的迁移 1.镜像打包 #查看镜像有一个centos的镜像 [rootdocker0 ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE centos latest 5d0da3dc9764 2 years ago 231MB 3查看帮助文件 docker --help save Save one or more…

查找3(红黑树、B树)

一、红黑树 1&#xff09;红黑树的定义和性质 不包括根节点本身的那个黑 2&#xff09;红黑树的查找 3&#xff09;红黑树的插入 4)删除操作 二、B树 1&#xff09;概念B树的查找 2&#xff09;B树的插入 3)B树的删除 三、B树 B树 B树 和OS相关 读磁盘时间开销大

别再为App安装唤起烦恼!Xinstall帮你轻松搞定

在移动互联网时代&#xff0c;App的推广和运营成为了开发者们面临的一大挑战。尤其是当用户通过各种渠道下载并安装App后&#xff0c;如何能够便捷地唤起App&#xff0c;提高用户的使用频率和粘性&#xff0c;成为了摆在推广者面前的一大难题。今天&#xff0c;我们就来揭秘一款…

wangeditor编辑器自定义按钮和节点,上传word转换html,文本替换

vue3ts 需求&#xff1a;在编辑器插入图片和视频时下方会有一个输入框填写描述&#xff0c;上传word功能 wangeditor文档wangEditor开源 Web 富文本编辑器&#xff0c;开箱即用&#xff0c;配置简单https://www.wangeditor.com/ 安装&#xff1a;npm install wangeditor/edit…

将标准输入stdin转换成命令行参数——Unix中的xargs指令

xargs是Unix中的复合指令加工机&#xff0c;联合管道符“|”将制造更加强大的“复杂”指令组合。 (笔记模板由python脚本于2024年08月22日 18:13:51创建&#xff0c;本篇笔记适合喜欢Linux的coder翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.…

【数据结构与算法】使用哈夫曼编码压缩文本

哈夫曼编码原理 哈夫曼编码属于一种基于字符出现频率的贪心算法&#xff0c;其通过构建哈夫曼树&#xff0c;为文本中的每一个字符赋予独一无二的二进制编码。频率较高的字符会被分配较短的编码&#xff0c;而频率较低的字符则会被分配较长的编码&#xff0c;以此达成压缩数据…

通过模板级知识蒸馏进行掩模不变人脸识别

Mask-invariant Face Recognition through Template-level Knowledge Distillation 创新点 1.提出了一种掩模不变人脸识别解决方案&#xff08;MaskInv&#xff09;&#xff0c;该解决方案在训练范式中利用模板级知识蒸馏&#xff0c;旨在生成与相同身份的非蒙面人脸相似的蒙面…