头歌:SparkSQL简单使用

news2025/1/15 6:49:40

第1关:SparkSQL初识
 

任务描述


本关任务:编写一个sparksql基础程序。

相关知识


为了完成本关任务,你需要掌握:1. 什么是SparkSQL 2. 什么是SparkSession。  

什么是SparkSQL


Spark SQL是用来操作结构化和半结构化数据的接口。
当每条存储记录共用已知的字段集合,数据符合此条件时,Spark SQL就会使得针对这些数据的读取和查询变得更加简单高效。具体来说,Spark SQL提供了以下三大功能:
(1) Spark SQL可以从各种结构化数据源(例如JSON、Parquet等)中读取数据。

(2) Spark SQL不仅支持在Spark程序内使用SQL语句进行数据查询,也支持从类似商业智能软件Tableau这样的外部工具中通过标准数据库连接器(JDBC/ODBC)连接sparkSQL进行查询。

(3) 当在Spark程序内使用Spark SQL时,Spark SQL支持SQL与常规的Python/Java/Scala代码高度整合,包括连接RDD与SQL表、公开的自定义SQL函数接口等。

什么是SparkSession


Spark中所有功能的入口点都是SparkSession类。要创建基本的SparkSession,只需使用SparkSession.builder()。

import  org.apache.spark.sql.SparkSession;
SparkSession  spark  =  SparkSession 
                  .builder()
                  .appName("Java Spark SQL基本示例")
                  .master("local")
                  .config("spark.some.config.option" , "some-value")
                  .getOrCreate();
 //打印spark版本号
 System.out.println(spark.version());


编程要求


请仔细阅读右侧代码,根据方法内的提示,在Begin - End区域内进行代码补充,具体任务如下:

打印spark的版本号。
测试说明
补充完代码后,点击测评,平台会对你编写的代码进行测试,当你的结果与预期输出一致时,即为通过。

package com.educoder.bigData.sparksql;
 
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
 
 
public class Test1 {
	
 
	public static void main(String[] args) throws AnalysisException {
		/********* Begin *********/
		SparkSession  spark  =  SparkSession   
                  .builder()  
                  .appName("Java Spark SQL基本示例")  
                  .master("local")  
                  .config("spark.some.config.option" , "some-value")  
                  .getOrCreate();  
 //打印spark版本号  
       System.out.println(spark.version());  
		
		
		
		
		
		
		/********* End *********/
	}
 
	
 
}

第2关:Dataset创建及使用
 

任务描述


本关任务:创建Dataset并使用

相关知识


为了完成本关任务,你需要掌握:

什么是Dataset;
Dataset如何创建 ;
Dataset如何操作数据。
什么是Dataset
在Spark2.0版本以后,DataFrame API将会和Dataset  API合并,统一数据处理API。故实训中的Dateset和DataFrame可看成一个概念。
Dataset和RDD一样,也是Spark的一种弹性分布式数据集,它是一个由列组成的数据集,概念上等同于关系型数据库中的一张表,但在底层具有更丰富的优化。Dataset可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。有人肯定会问,已经有了弹性分布式数据集RDD,为什么还要引入Dataset呢?因为在Spark中,我们可以像在关系型数据库中使用SQL操作数据库表一样,使用Spark SQL操作Dataset。这让熟悉关系型数据库SQL人员也能轻松掌握。

上图直观地体现了 Dataset 和 RDD 的区别。左侧的 RDD[Person] 虽然以 Person 为类型参数,但 Spark 框架本身不了解 Person 类的内部结构。而右侧的 Dataset 却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。Dataset 除了提供了比 RDD 更丰富的算子以外,更重要的特点能提升执行效率、减少数据读取以及执行计划的优化等。

Dataset 上可用的操作分为转换和操作。转换是产生新 Dataset 的转换,动作是触发计算和返回结果的转换。示例转换包括map,filter,select和aggregate(groupBy)。示例操作将数据计数,显示或写入文件系统。

Dataset 是“懒惰的”,即只有在调用动作时才会触发计算。在内部,Dataset 表示描述生成数据所需计算的逻辑计划。调用操作时,Spark 的查询优化器会优化逻辑计划,并以并行和分布式方式生成有效执行的物理计划。要探索逻辑计划以及优化的物理计划,请使用explain函数。

要有效地支持特定于域的对象,需要使用编码器。编码器将域特定类型T映射到 Spark 的内部类型系统。例如,给定一个具有两个字段的  Person,name(string)和age(int),编码器用于告诉 Spark 在运行时生成代码以将 Person 对象序列化为二进制结构。该二进制结构通常具有低得多的存储器占用面积以及针对数据处理(例如,柱状格式)的效率进行优化。要了解数据的内部二进制表示,请使用模式函数。

Dataset如何创建
通常有两种方法来创建Dataset。最常见的方法是使用SparkSession上提供的读取功能将Spark指向存储系统上的某些文件。

//创建泛型的Dataset
Dataset<Row> df = spark.read().json("people.json");
//创建Person类型的Dataset
   Dataset<Person> people = spark.read().json("people.json").as(Encoders.bean(Person.class));
//以表格形式显示前20行Dataset
df.show();
people.show();


也可以通过现有数据集上的转换来创建Dataset。 例如,以下内容通过对现有数据集应用过滤器来创建新Dataset:

   Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING));


Dataset如何操作数据
Dataset操作数据有两种方式:API方式处理数据和以编程方式处理数据。

API方式处理数据


Dataset操作也可以通过以下定义的各种特定于域的语言(DSL)函数进行无类型操作:Dataset(类),列和函数。 这些操作与R或Python中的数据框抽象中可用的操作非常相似。
要从数据集中选择列,请在在Java中使用col 。

  

 Column ageCol = people.col("age");


请注意,Column类型也可以通过其各种功能进行操作。

import static org.apache.spark.sql.functions.col;
以树格式
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// 仅选择“名称”列
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+
// 选择所有人,但将年龄增加1 
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+
// 选择年龄超过21 
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// 计数按年龄的人
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+


以编程方式处理数据
SparkSession支持让应用程序以编程方式运行SQL查询并返回结果。

//读取json,并将Dataset,并注册为SQL临时视图
sparkSession.read().json("people.json").createOrReplaceTempView("people");
//以表格形式显示前20行Dataset
sparkSession.sql("select * from people").show();
// + ---- + ------- + 
// | 年龄| 名称| 
// + ---- + ------- + 
// | null | Michael | 
// | 30 | 安迪| 
// | 19 | 贾斯汀| 
// + ---- + ------- +


编程要求


根据提示,在右侧编辑器补充代码,读取people.json文件,过滤age为23的数据,并以表格形式显示前20行Dataset。

people.json文件内容如下:

{"age":21,"name":"张三", "salary":"3000"}
{"age":22,"name":"李四", "salary":"4500"}
{"age":23,"name":"王五", "salary":"7500"}

package com.educoder.bigData.sparksql;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparkSession;


public class Test2 {


	public static void main(String[] args) throws AnalysisException {
		
		SparkSession  spark  =  SparkSession 
				  .builder()
				  .appName("test1")
				  .master("local")
				  .config("spark.some.config.option" , "some-value")
				  .getOrCreate();
		/********* Begin *********/
		spark.read().json("people.json").createOrReplaceTempView("people");  

spark.sql("select * from people where age != '23'").show();
		
		
		
		
		
		/********* End *********/
	}

	

}

第3关:Dataset自定义函数

 

任务描述


本关任务:编写Dataset自定义函数。

相关知识


为了完成本关任务,你需要掌握:

UserDefinedAggregateFunction介绍;
如何使用。
UserDefinedAggregateFunction
UserDefinedAggregateFunction是实现用户定义的聚合函数基础类,用户实现自定义无类型聚合函数必须扩展UserDefinedAggregateFunction 抽象类,相关方法如下:

方法及方法返回    描述


StructType bufferSchema()    StructType表示聚合缓冲区中值的数据类型。
DataType dataType()    UserDefinedAggregateFunction的返回值的数据类型
boolean deterministic()    如果此函数是确定性的,则返回true
Object evaluate(Row buffer)    根据给定的聚合缓冲区计算此UserDefinedAggregateFunction的最终结果
void initialize(MutableAggregationBuffer buffer)    初始化给定的聚合缓冲区
StructType inputSchema()    StructType表示此聚合函数的输入参数的数据类型。
void merge(MutableAggregationBuffer buffer1, Row buffer2)    合并两个聚合缓冲区并将更新的缓冲区值存储回buffer1
void update(MutableAggregationBuffer buffer, Row input)    使用来自输入的新输入数据更新给定的聚合缓冲区
如何使用
我们以计算员工薪水平均值的例子来说:
首先在用户自定义函数的构造函数中,定义聚合函数的输入参数的数据类型和聚合缓冲区中值的数据类型。

//定义员工薪水的输入参数类型为LongType
List<StructField> inputFields = new ArrayList<StructField>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
//定义员工薪水总数、员工个数的参数类型
List<StructField> bufferFields = new ArrayList<StructField>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);


对聚合缓冲区中值设置初始值。

@Override
    public void initialize(MutableAggregationBuffer buffer) {
        // TODO Auto-generated method stub
        buffer.update(0, 0L);
        buffer.update(1, 0L);
    }


把自定义函数的输入薪水数据转化为定义的聚合缓冲区的值(薪水总数、员工个数),并更新。

@Override
public void update(MutableAggregationBuffer buffer, Row input) {
    if (!input.isNullAt(0)) {
        long updatedSum = buffer.getLong(0) + input.getLong(0);
        long updatedCount = buffer.getLong(1) + 1;
        buffer.update(0, updatedSum);
        buffer.update(1, updatedCount);
    }
}


把多个聚合缓冲区的值进行合并。

@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
    // TODO Auto-generated method stub
    long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
    long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
    buffer1.update(0, mergedSum);
    buffer1.update(1, mergedCount);
}


最后通过聚合缓冲区的值计算输出结果。

@Override
public Object evaluate(Row buffer) {
    // TODO Auto-generated method stub
     return ((double) buffer.getLong(0)) / buffer.getLong(1);
}


就此自定义函数就开发完了,通过SparkSession的udf()方法会返回注册用户定义函数的方法集合UDFRegistration
通过UDFRegistration调用register方法进行自定义函数注册,使用如下:

// 注册自定义函数myAverage
spark.udf().register("myAverage", new MyAverage());
//读取json文件
spark.read().json("people.json").createOrReplaceTempView("people");
//使用自定义函数计算薪水平均值
spark.sql("SELECT myAverage(salary) as average_salary FROM people").show();
// +--------------+
// |average_salary|
// +--------------+
// |        5000|
// +--------------+


编程要求


请仔细阅读右侧代码,根据方法内的提示,在Begin - End区域内进行代码补充,编写自定义函数类MyAverage,用来计算用户薪水平均值,平台已提供了最后的实现:

spark.udf().register("myAverage", new MyAverage());
spark.read().json("people.json").createOrReplaceTempView("people");
spark.sql("SELECT myAverage(salary) as average_salary FROM people").show();

测试说明


补充完代码后,点击测评,平台会对你编写的代码进行测试,当你的结果与预期输出一致时,即为通过。

package com.educoder.bigData.sparksql;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class MyAverage extends UserDefinedAggregateFunction {
private static final long serialVersionUID = 1L;
private StructType inputSchema;
private StructType bufferSchema;
public MyAverage() {
List<StructField> inputFields = new ArrayList<StructField>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<StructField>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);
}
@Override
public StructType bufferSchema() {
// TODO Auto-generated method stub
return bufferSchema;
}
@Override
public DataType dataType() {
// TODO Auto-generated method stub
return DataTypes.DoubleType;
}
@Override
public boolean deterministic() {
// TODO Auto-generated method stub
return true;
}
@Override
public Object evaluate(Row buffer) {
// TODO Auto-generated method stub
return ((double) buffer.getLong(0)) / buffer.getLong(1);
}
@Override
public void initialize(MutableAggregationBuffer buffer) {
// TODO Auto-generated method stub
buffer.update(0, 0L);
buffer.update(1, 0L);
}
@Override
public StructType inputSchema() {
// TODO Auto-generated method stub
return inputSchema;
}
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
// TODO Auto-generated method stub
long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
buffer1.update(0, mergedSum);
buffer1.update(1, mergedCount);
}
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
long updatedSum = buffer.getLong(0) + input.getLong(0);
long updatedCount = buffer.getLong(1) + 1;
buffer.update(0, updatedSum);
buffer.update(1, updatedCount);
}
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1637311.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Tuxera NTFS使用教程 轻松实现磁盘格式转换的教程分享 ntfsMac软件怎么用

NTFS for Mac是Mac电脑里非常重要的工具之一&#xff0c;因为它太实用了&#xff0c;解决了NTFS移动硬盘在Mac上的写入问题。但是&#xff0c;小伙伴在安装完软件之后&#xff0c;通常再也不会关注它&#xff0c;甚至时间长了&#xff0c;也就忘了Mac里还有这么一个软件。 在Tu…

GB32960解析工具

几年前搞了一个用Qt开发的国标32960报文解析工具。分享给大家&#xff0c;只用1积分便可以下载。 国标32960新能源车协议解析工具资源-CSDN文库

(附源码)超级简单的SSM图书交易系统,包含技术栈、架构图、设计图、教程

先看首页效果&#xff0c;包含买家、卖家、管理员三个端口。启动有问题可以联系我解决&#xff0c;微信&#xff1a;keepgoing4u 架构图&#xff1a; 用到软件 Jdk1.8 Mysql IntelliJ IDEA Maven 项目技术&#xff1a; Spring Boot SSM JSP mybatis Maven B/S模式 配置…

云服务器的主要用途有哪些,使用云服务器具有哪些方面的优势

随着科技的飞速发展&#xff0c;云计算已经成为现代企业和个人用户不可或缺的技术支持&#xff0c;云计算技术已经逐渐渗透到我们生活的方方面面。云服务器作为云计算的核心组成部分&#xff0c;正在逐步改变我们的数据存储和处理方式&#xff0c;成为各类互联网用户实现综合业…

前端vite+rollup前端监控初始化——封装基础fmp消耗时间的npm包并且发布npm beta版本

文章目录 ⭐前言&#x1f496;vue3系列文章 ⭐初始化npm项目&#x1f496;type为module&#x1f496;rollup.config.js ⭐封装fmp耗时计算的class&#x1f496;npm build打包class对象 ⭐发布npm的beta版本&#x1f496; npm发布beta版本 ⭐安装web-performance-tool的beta版本…

2024年第二十一届 五一杯 (B题)大学生数学建模挑战赛 | 最大流问题,深度学习分析 | 数学建模完整代码解析

DeepVisionary 每日深度学习前沿科技推送&顶会论文&数学建模与科技信息前沿资讯分享&#xff0c;与你一起了解前沿科技知识&#xff01; 本次DeepVisionary带来的是五一杯的详细解读&#xff1a; 完整内容可以在文章末尾全文免费领取&阅读&#xff01; 第一个问题…

算法效率的判断及一些典型例题的讲解

一.算法效率 1.用处&#xff1a;判断算法的好坏&#xff0c;好的算法应该是高效的 2算法效率取决于时间复杂度和空间复杂度 <1>时间复杂度 1.1概念&#xff1a;算法中基本操作的执行次数就是算法的时间复杂度 1.2表示&#xff1a;大O的渐进表示法&#xff0c;例如O(N)…

什么是场内期权,场内期权是如何操作的?

今天期权懂带你了解什么是场内期权,场内期权是如何操作的&#xff1f;场内期权是标准化、规范化且在公开市场交易的金融衍生品。相比场外期权&#xff0c;场内期权具有更高的流动性和透明度。 什么是场内期权&#xff1f; 场内期权&#xff0c;也称为交易所期权&#xff0c;是…

【C++航海王:追寻罗杰的编程之路】C++11(四)

目录 1 -> 相关文章 【C航海王&#xff1a;追寻罗杰的编程之路】C11(一) 【C航海王&#xff1a;追寻罗杰的编程之路】C11(二) 【C航海王&#xff1a;追寻罗杰的编程之路】C11(三) 2 -> lambda表达式 2.1 -> C98中的一个例子 2.2 -> lambda表达式 2.3 ->…

HCIA-题目解析1

0x00 前言 遇到这样一道题,这种题目对于我来说还是比较复杂的,所以记录一下。主要还是和熟练度有关系。 0x01 题目 路由器RouterID邻居关系如下,下列说法正确的是 A:本路由器和Router-lD为10.0.3.3的路由器不能直接交换链路状态信息 B:DR路由器的Router-lD为10.0.1.2 C:…

Flutter运行项目一直:running gradle task

大体原因就是访问国外的资源由于网络等原因导致访问失败&#xff0c;解决方法就是换成国内的源 修改项目的android/build.gradle 文件&#xff0c;将里面的 google() mavenCentral()替换为 maven {allowInsecureProtocol trueurl https://maven.aliyun.com/repository/googl…

前端 CSS

目录 选择器 复合选择器 伪类-超链接 结构伪装选择器 伪元素选择器 画盒子 字体属性 CSS三大属性 Emmet写法 背景属性 显示模式 盒子模型 盒子模型-组成 盒子模型-向外溢出 盒子模型-圆角 盒子模型-阴影 flex position定位 CSS小精灵 字体图标 垂直对齐方式…

Oracle集群-常用查询及操作(工作日常整理)

1.Oracle集群状态 select * from gv$instance; 示例结果&#xff1a; 2.Oracle集群-增大表空间 常见问题&#xff1a; 导入时或使用时&#xff0c;提示无法extend table ,增加表空间即可 常用操作&#xff1a; 1&#xff09;查询表空间 select * from dba_tablespaces; --…

Redisson分布式锁,重试锁和锁续命的原理

RedissonLock 锁重试原理 tryLock有三个三个参数&#xff0c;第一个是等待时间&#xff0c;第二个是锁失效后自动释放的时间,不填默认为-1&#xff0c;第三个是时间单位&#xff1b; 当设置了第一个参数&#xff0c;那这个锁就成了可重试锁&#xff1b;获取锁失败后&#xff0c…

【redis】Redis数据类型(三)List类型

目录 List类型介绍特点 List数据结构附&#xff1a;3.2以前的版本(介绍一下压缩列表和双向链表)压缩列表ZipList双向链表LinkedList 常用命令lpush示例 lpushx示例 rpush示例 rpushx示例 LPOP示例 RPOP示例 BLPOP非阻塞行为阻塞行为相同的 key 被多个客户端同时阻塞在 MULTI/EX…

vscode 使用code runner 运行代码输出乱码

vscode 使用code runner 运行代码输出乱码 先指出问题所在&#xff1a; 代码文件使用的编码格式和终端使用的编码格式不一致&#xff0c;查看代码文件右下角&#xff0c;会显示代码文件的编码格式。 测试代码如下&#xff1a; #include<iostream> using namespace std…

专注 APT 攻击与防御—工具介绍-the-backdoor-factory

工具介绍 the-backdoor-factory 项目地址&#xff1a;GitHub - secretsquirrel/the-backdoor-factory: Patch PE, ELF, Mach-O binaries with shellcode new version in development, available only to sponsors 原理 可执行二进制文件中有大量的 00&#xff0c;这些 00 是…

计算机毕业设计python_django宠物领养系统z6rfy

本宠物领养系统主要包括两大功能模块&#xff0c;即管理员模块、用户模块。下面将对这两个大功能进行具体功能需求分析。 &#xff08;1&#xff09;管理员&#xff1a;管理员登录后主要功能包括个人中心、用户管理、送养宠物管理、地区类型管理、失信黑名单管理、申请领养管理…

瑞芯微-I2S | ALSA基础-3

针对音频设备&#xff0c;linux内核中包含了两类音频设备驱动框架&#xff1b; OSS&#xff1a;开放声音系统 包含dsp和mixer字符设备接口&#xff0c;应用访问底层硬件是直接通过sound设备节点实现的&#xff1b; ALSA&#xff1a;先进linux声音架构&#xff08;Advanced Lin…

bun 换源 国内阿里源 npmmirror 加速下载

Github https://github.com/oven-sh/bun 版本号 bun 1.1.5 windows 安装 bun 如果本机有 nodejs 环境, 可以 npm install -g bun 安装 ( 官方把 exe 已经传到了 npm 仓库, 走的国内 npm 镜像, 下载速度会很快) 没有 nodejs, 可以用 powershell 脚本安装 具体操作 全局 …