7.spark sql编程

news2025/1/12 10:37:53

概述

spark 版本为 3.2.4,注意 RDDDataFrame 的代码出现的问题及解决方案

本文目标如下:

  • RDD ,Datasets,DataFrames 之间的区别
  • 入门
    • SparkSession
    • 创建 DataFrames
    • DataFrame 操作
    • 编程方式运行 sql 查询
    • 创建 Datasets
    • DataFramesRDDs 互相转换
      • 使用反射推断模式
      • 编程指定 Schema

参考 Spark 官网

相关文章链接如下

文章链接
spark standalone环境安装地址
Spark的工作与架构原理地址
使用spark开发第一个程序WordCount程序及多方式运行代码地址
RDD编程指南地址
RDD持久化地址

RDD ,Datasets,DataFrames 之间的区别

Datasets , DataFrames和 RDD

Dataset 是一个分布式的数据集合,DatasetSpark 1.6 中添加的一个新接口,它增益了 RDD (强类型,可以使用 lambda 函数的能力) 和 Spark sql 优化执行引擎的优势。Dataset 可以由JVM对象构建,然后使用函数转换(map、flatMap、filter等)进行操作。数据集API有Scala和Java版本。Python不支持数据集API。

DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的DataFrame APIScalaJavaPythonR中可用。在Scala API中,DataFrame只是Dataset[Row]的一个类型别名。而在Java API中,用户需要使用Dataset<Row>来表示DataFrame

DataFrame=RDD+SchemaRDD可以认为是表中的数据,Schema是表结构信息。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD

入门

Spark SQL是一个用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了更多关于正在执行的数据结构信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。有几种方法可以与SparkSQL进行交互,包括SQLDataset API。计算结果时,使用相同的执行引擎,与用于表示计算的API/语言无关。方便用户切换不同的方式进行操作

people.json

people.json文件准备
在这里插入图片描述

SparkSession

Spark sql 中所有功能入口点是 SparkSession类。创建一个基本的 SparkSession,只需使用 SparkSession.builder()

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

创建 DataFrames

使用 SparkSession,通过存在的RDDhive 表,或其它的Spark data sources 程序创建 DataFrames

val df = spark.read.json("/tmp/people.json")
df.show()

执行如下图
在这里插入图片描述

DataFrame 操作

使用数据集进行结构化数据处理的基本示例如下

// 需要引入 spark.implicits._ 才可使用 $
// This import is needed to use the $-notation
import spark.implicits._
// 打印schema 以树格式
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// 仅显示 name 列
// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+
// 显示所有,age 加1
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// 过滤 人的 age 大于 21
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// 按 age 分组统计
// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

spark-shell 执行如下图
在这里插入图片描述
在这里插入图片描述

编程方式运行 sql 查询

df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

执行如下:

scala> df.createOrReplaceTempView("people")

scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> sqlDF.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

创建 Datasets

Datasets类似于RDD,不是使用Java序列化或Kryo,而是使用专门的编码器来序列化对象,以便通过网络进行处理或传输。使用的格式允许Spark执行许多操作,如过滤、排序和哈希,而无需将字节反序列化为对象。

case class Person(name: String, age: Long)

// 为 case classes 创建编码器
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()

// 为能用类型创建编码器,并提供 spark.implicits._ 引入 
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// 通过定义类,将按照名称映射,DataFrames 能被转成 Dataset 
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "/tmp/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()

执行如下:

scala> case class Person(name: String, age: Long)
defined class Person

scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> caseClassDS.show()
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+


scala> val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> primitiveDS.map(_ + 1).collect()
res1: Array[Int] = Array(2, 3, 4)

scala> val path = "/tmp/people.json"
path: String = /tmp/people.json

scala> val peopleDS = spark.read.json(path).as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

scala> peopleDS.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

DataFrames 与 RDDs 互相转换

Spark SQL支持两种不同的方法将现有RDD转换为Datasets

  • 第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以生成更简洁的代码,当知道 schema 结构的时间,会有更好的效果。
  • 第二种方法是通过编程接口,构造 schema,然后将其应用于现有的RDD。虽然此方法更详细,直至运行时,才能知道他们的字段和类型,用于构造 Datasets

使用反射推断模式

代码如下:

object RddToDataFrameByReflect {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("RddToDataFrameByReflect")
      .master("local")
      .getOrCreate()

    // 用于从RDD到DataFrames的隐式转换
    // For implicit conversions from RDDs to DataFrames
    import spark.implicits._

    // Create an RDD of Person objects from a text file, convert it to a Dataframe
    val peopleDF = spark.sparkContext
      .textFile("/Users/hyl/Desktop/fun/sts/spark-demo/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    // Register the DataFrame as a temporary view
    peopleDF.createOrReplaceTempView("people")

    // SQL statements can be run by using the sql methods provided by Spark
    val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

    // The columns of a row in the result can be accessed by field index
    teenagersDF.map(teenager => "Name: " + teenager(0)).show()

    // or by field name
    teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
  }

  case class Person(name: String, age: Long)
}

执行如下图:
在这里插入图片描述

编码问题

关于 Spark 官网 上复杂类型编码问题,直接加下面一句代码

teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect().foreach(println(_))

报以下图片错误
在这里插入图片描述
将原有代码改变如下:

 // 没有为 Dataset[Map[K,V]] 预先定义编码器,需要自己定义
 // No pre-defined encoders for Dataset[Map[K,V]], define explicitly
 implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
 // 也可以如下操作
 // Primitive types and case classes can be also defined as
 // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

 // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
 teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect().foreach(println(_))
 // Array(Map("name" -> "Justin", "age" -> 19))

在这里插入图片描述
通过这一波操作,就可以理解什么情况下,需要编码器,以及编码器的作用

编程指定 Schema

代码如下:

object RddToDataFrameByProgram {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local")
      .getOrCreate()

    import org.apache.spark.sql.Row

    import org.apache.spark.sql.types._

    // 加上此解决报错问题
    import spark.implicits._

    // Create an RDD
    val peopleRDD = spark.sparkContext.textFile("/Users/hyl/Desktop/fun/sts/spark-demo/people.txt")

    // The schema is encoded in a string
    val schemaString = "name age"

    // Generate the schema based on the string of schema
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)

    // Convert records of the RDD (people) to Rows
    val rowRDD = peopleRDD
      .map(_.split(","))
      .map(attributes => Row(attributes(0), attributes(1).trim))

    // Apply the schema to the RDD
    val peopleDF = spark.createDataFrame(rowRDD, schema)

    // Creates a temporary view using the DataFrame
    peopleDF.createOrReplaceTempView("people")

    // SQL can be run over a temporary view created using DataFrames
    val results = spark.sql("SELECT name FROM people")

    // The results of SQL queries are DataFrames and support all the normal RDD operations
    // The columns of a row in the result can be accessed by field index or by field name
    results.map(attributes => "Name: " + attributes(0)).show()
  }
}

执行如下图
在这里插入图片描述

官方文档的代码不全问题

Unable to find encoder for type String. An implicit Encoder[String] is needed to store String instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
results.map(attributes => "Name: " + attributes(0)).show()

在这里插入图片描述
加下以下代码

// 加上此解决报错问题
import spark.implicits._

如下图解决
在这里插入图片描述

结束

spark sql 至此结束,如有问题,欢迎评论区留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1173197.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Navicat连接mysql 8.0.35 2059错误解决办法

这2天在家重装电脑,顺便把mysql升级8.0,安装完成后,用Navicat连接,报错2059,如下 网上查了一下, 【报错原因】mysql8.0 之前的版本中加密规则是 mysql_native_password,而 mysql8.0 之后的版本…

超越 GLIP! | RegionSpot: 识别一切区域,多模态融合的开放世界物体识别新方法

本文的主题是多模态融合和图文理解,文中提出了一种名为RegionSpot的新颖区域识别架构,旨在解决计算机视觉中的一个关键问题:理解无约束图像中的各个区域或patch的语义。这在开放世界目标检测等领域是一个具有挑战性的任务。 关于这一块&…

Vim快速插入常用代码模板

1 修改home目录下.vimrc 家目录中ls -a找到隐藏文件.vimrc 2 编辑.vimrc 输入i编辑&#xff0c;在尾巴插入代码&#xff0c;按:wq保存并退出。 noremap io i#include <stdio.h><Esc>o<Esc> noremap im iint main(int argc, char *argv[])<Esc> map …

使用自定义函数拟合辨识HPPC工况下的电池数据(适用于一阶RC、二阶RC等电池模型)

该程序可以离线辨识HPPC工况下的电池数据&#xff0c;只需要批量导入不同SOC所对应的脉冲电流电压数据&#xff0c;就可以瞬间获得SOC为[100% 90% 80% 70% 60% 50% 40% 30% 20% 10% 0%]的所有电池参数,迅速得到参数辨识的结果并具有更高的精度&#xff0c;可以很大程度上降低参…

第7章-使用统计方法进行变量有效性测试-7.1-假设检验

目录 女士品茶 假设检验 样本与总体 原假设与备择假设 检验法、拒绝域与检验统计量 显著性水平 决策方法——临界值法和p值&#xff08;p-value&#xff09;法 假设检验步骤 参考文献 假设检验&#xff0c;我们从女士品茶这个故事开始说起。希望这篇文章能给您带来极大…

三、操作系统

&#xff08;一&#xff09;概述 操作系统是管理整个系统的软、硬件资源的系统&#xff0c;既是人和硬件之间的一种接口&#xff0c;也是应用软件与硬件之间的接口。 &#xff08;二&#xff09;进程管理 1.进程的状态 进程的状态是操作系统对进程进行管理的时候设置的几种状…

CLion2022安装

1. CLion下载 地址&#xff1a;https://www.jetbrains.com.cn/clion/download/other.html 下载你需要的版本&#xff0c;这里以2022.2.4为例 之后获取到对应的安装包 2. 安装 1、双击运行安装包&#xff0c;next 2、选择安装路径&#xff0c;建议非系统盘&#xff0c;nex…

Jetpack:029-Jetpack中的网格布局

文章目录 1. 概念介绍2. 使用方法3. 代码与效果3.1 示例代码3.2 运行效果 4. 内容总结 我们在上一章回中介绍了Jetpack中Card相关的内容&#xff0c;本章回中主要介 网格布局。闲话休提&#xff0c;让我们一起Talk Android Jetpack吧&#xff01; 1. 概念介绍 我们在本章回中…

第九章《搞懂算法:决策树是怎么回事》笔记

决策树算法是机器学习中很经典的一个算法&#xff0c;它既可以作为分类算法&#xff0c;也可以作为回归算法。 9.1 典型的决策树是什么样的 决策树算法是依据“分而治之”的思想&#xff0c;每次根据某属性的值对样本进行分类&#xff0c;然后传递给下个属性继续进行分类判断…

【CMU15445】Fall 2019, Project 3: Query Execution 实验记录

目录 实验准备实验测试Task 1: CREATING A CATALOG TABLE SQL 执行是由数据库解析器转化为一个由多个 executor 组成的 Query Plan 来完成的&#xff0c;本实验选择了火山模型来完成 query execution&#xff0c;这一次的 project 就是实现各种 exeutor&#xff0c;从而可以通过…

2014年亚太杯APMCM数学建模大赛C题公共基础课教师专业化培养方式研究求解全过程文档及程序

2014年亚太杯APMCM数学建模大赛 C题 公共基础课教师专业化培养方式研究 原题再现 近年来&#xff0c;世界基础工业、信息产业、服务业的跨越式发展引发了大量人才需求&#xff0c;导致了职业教育的飞速发展&#xff0c;除原有专科层次高等职业教育院校外&#xff0c;大量普通…

行业安卓主板-基于RK3568/3288/3588的电子班牌/人脸识别门禁/室内对讲门禁方案解决方案(二)

电子班牌 智能电子班牌可在主页实时显示班级全面的基本信息&#xff0c;包括天气、班名、课程表、值日表等&#xff0c;并发布学校通知、班级通知。学生可刷卡自动登陆系统进行课堂反馈&#xff0c;教师和家长可及时了解教学反馈&#xff0c;打通学校、教师、学生之间的互动通…

逆向学习记录(2)windows常用基本操作及用环境变量配置上多个python版本

1、如何打开cmd 第一种方法&#xff1a;按下winr&#xff0c;运行cmd 第二种方法&#xff1a;进入一个目录&#xff0c;点击路径处&#xff08;显示蓝色背景&#xff09;&#xff0c;然后直接键盘输入cmd&#xff0c;回车&#xff0c;运行cmd并直接进入此目录。 2、命令dir&am…

OpenFeign 的超时重试机制以及底层实现原理

目录 1. 什么是 OpenFeign&#xff1f; 2. OpenFeign 的功能升级 3. OpenFeign 内置的超时重试机制 3.1 配置超时重试 3.2 覆盖 Retryer 对象 4. 自定义超时重试机制 4.1 为什么需要自定义超时重试机制 4.2 如何自定义超时重试机制 5. OpenFeign 超时重试的底层原理 5…

04-附注 三维空间中的线性变换

附注 三维空间中的线性变换 三维空间线性变换 这是关于3Blue1Brown "线性代数的本质"的学习笔记。 三维空间线性变换 图1 绕y轴旋转90 绕y轴旋转90后&#xff0c;各基向量所在的坐标如图1所示。用旋转后的各基向量作为矩阵的列&#xff0c;就得到变换矩阵。变换矩阵…

简单代理模式

代理模式 代理模式(Proxy)&#xff0c;为其他对象提供一种代理以控制对这个对象的访问。 结构图如下&#xff1a; ISubject接口&#xff0c;定义了RealSubject和Proxy的共用接口方法&#xff0c;这样就可以在任何使用RealSubject的地方使用Proxy代理。 ISubject接口 public…

心脏骤停急救赋能

文章目录 0. 背景知识1. 遇到有人突然倒地怎么办1.1 应急反应系统1.2 高质量CPR1.2.1 胸外按压1.2.2 人工呼吸 1.3 AED除颤1.3.1 AED用法 1.4 高级心肺复苏1.5 入院治疗1.6 康复 0. 背景知识 中国每30s就有人倒地&#xff0c;他们可能是工作压力大的年轻人&#xff08;工程师群…

用Java(C语言也可以看)实现冒泡排序和折半查找(详细过程图)+逆序数组

目录 一、冒泡排序 1.冒泡排序介绍 2.排序的思路 3.完整代码 二、折半查找 1.折半查找介绍 2.查找的思路 3.完整代码 三、逆序数组 1.逆序思路 2..完整代码 一、冒泡排序 冒泡排序是众多排序的一种&#xff0c;无论在C语言或者Java中都很常见&#xff0c;后续在数据…

CleanMyMac X2024试用版下载及使用教程

CleanMyMac X是一款颇受欢迎的专业清理软件&#xff0c;拥有十多项强大的功能&#xff0c;可以进行系统清理、清空废纸篓、清除大旧型文件、程序卸载、除恶意软件、系统维护等等&#xff0c;并且这款清理软件操作简易&#xff0c;非常好上手&#xff0c;特别适用于那些刚入手苹…