Spark SQL的高级用法

news2025/1/12 20:52:29

一. 快速生成多行的序列

需求:请生成一列数据, 内容为 1 , 2 , 3 , 4 ,5

-- 快速生成多行的序列
-- 方式一
select explode(split("1,2,3,4,5",","));
--方式二
/*
 序列函数sequence(start,stop,step):生成指定返回的列表数据
 [start,stop]必须传入,step步长可传可不传,默认为1,也可以传入负数,传入负数的时候,大数要在前,小数
 */
select explode(sequence(1,5));
select explode(sequence(1,5,1));
select explode(sequence(1,5,2));
select explode(sequence(5,1,-1));
select explode(sequence(5,1,-2));

二. 快速生成表数据

需求: 生成一个两行两列的数据, 第一行放置 男 M 第二行放置 女 F

-- 快速生成表数据
/*
 stack(n,expr1, ..., exprk),n代表要分为n行,expr1, ..., exprk是放入每一行每一列的元素
 如果不传入列名,则默认使用col0,col1等作为列名
 */
select stack(2,"男","M","女","F");
select stack(2,"男","M","女","F") as (n,v);

三. 如何将一个SQL的结果给到另外一个SQL进行使用

3.1 视图

临时视图关键字:temporary

  1. 分为永久视图临时视图
  2. 相同点:都不会真正的存储数据。主要是用来简化SQL语句
  3. 不同点:永久试图会创建元数据,在多个会话(Session)中都有效;临时视图只在当前会话有效

3.2 视图和表的区别

视图不会真正的存储数据,而表会真正的存储数据。
但是视图和表在使用的时候区别不大

-- 如何将一个SQL的结果给到另外一个SQL进行使用
-- 方式一:子查询
select
    *
from (select stack(2,"男","M","女","F"));

-- 方式二:子查询
with tmp as (
    select stack(2,"男","M","女","F")
) select * from tmp;

-- 方式三:永久视图
create view forever_view as
select stack(2,"男","M","女","F");

select * from forever_view;

-- 方式四:临时视图
create temporary view tmp_view as
select stack(2,"男","M","女","F");

select * from tmp_view;

-- 方式五:创建表
create table tb as
select stack(2,"男","M","女","F");

select * from tb;

-- 缓存表:类似Spark Core中的缓存,提高数据分析效率
cache table cache_tb as
select stack(2,"男","M","女","F");

-- 查询缓存表
select * from cache_tb;

-- 清理指定缓存
uncache table cache_tb;

select * from cache_tb;

-- 清空所有的缓存
clear cache;

四. 窗口函数

格式:
分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])

分析函数的分类:
1- 第一类: 排序函数。row_number() rank() dense_rank() ntile()

1、都是用来编号的
2、如果出现了重复(针对order by中的字段内容)数据
2.1- row_number:不管有没有重复,从1开始依次递增进行编号
2.2- rank():如果数据重复,编号相同,并且会占用后续的编号
2.3- dense_rank():如果数据重复,编号相同,但是不会占用后续的编号
2.4- ntile(n):将数据分为n个桶,不传入参数默认为1

2- 第二类: 聚合函数。sum() avg() count() max() min()…

1、可以通过窗口函数实现级联求各种值的操作。当后续遇到需要在计算的时候,将当前行或者之前之后的数据关联起来计算的情况,可以使用窗口函数。
2、如果没有排序字段,也就是没有order by语句,直接将窗口打开到最大,整个窗口内的数据全部被计算,不管执行到哪一行,都是针对整个窗口内的数据进行计算。
3、如果有排序字段,并且还存在重复数据的情况,默认会将重复范围内的数据放到一个窗口中计算
4、可以通过rows between xxx and xxx来限定窗口的统计数据范围
4.1- unbounded preceding: 从窗口的最开始
4.2- N preceding: 当前行的前N行,例如1 preceding、2 preceding
4.3- current row: 当前行
4.4- unbounded following: 到窗口的最末尾
4.5- N following: 当前行的后N行,例如1 following、2 following

3- 第三类: 取值函数。lead() lag() first_value() last_value()

-- 准备数据
create temporary view t1 (cookie,datestr,pv) as
values
           ('cookie1','2018-04-10',1),
            ('cookie1','2018-04-11',5),
            ('cookie1','2018-04-12',7),
            ('cookie1','2018-04-13',3),
            ('cookie1','2018-04-14',2),
            ('cookie1','2018-04-15',4),
            ('cookie1','2018-04-16',4),
            ('cookie2','2018-04-10',2),
            ('cookie2','2018-04-11',3),
            ('cookie2','2018-04-12',5),
            ('cookie2','2018-04-13',6),
            ('cookie2','2018-04-14',3),
            ('cookie2','2018-04-15',9),
            ('cookie2','2018-04-16',7);

select * from t1;
-- 1- 第一类: 排序函数。row_number() rank() dense_rank() ntile()
select
    cookie,pv,
    row_number() over (partition by cookie order by pv desc) as rs1,
    rank() over (partition by cookie order by pv desc) as rs2,
    dense_rank() over (partition by cookie order by pv desc) as rs3,
    ntile() over (partition by cookie order by pv desc) as rs4
from t1;
-- 2- 第二类: 聚合函数。sum() avg() count() max() min()...
select
    cookie,pv,
    -- 一次性直接将窗口打开到最大
    sum(pv) over(partition by cookie) as rs1,
    -- 依次慢慢打开窗口,如果数据相同,直接放到同一个窗口中
    sum(pv) over(partition by cookie order by pv) as rs2,
    -- 依次慢慢打开窗口,限定窗口的统计范围从窗口的最开始到当前行
    sum(pv) over(partition by cookie order by pv rows between unbounded preceding and current row) as rs3,
    -- 以当前行为中心,往前推一行。也就是从上一行计算到当前行
    sum(pv) over(partition by cookie order by pv rows between 1 preceding and current row ) as rs4,
    -- 从窗口的最开始一直统计到窗口的最终结尾
    sum(pv) over(partition by cookie order by pv rows between unbounded preceding and unbounded following) as rs5,
    -- 从当前行统计到窗口的结尾
    sum(pv) over(partition by cookie order by pv rows between current row and unbounded following) as rs6,
    -- 以当前行为中心,统计上一行、当前行、下一行总共3行的数据
    sum(pv) over(partition by cookie order by pv rows between 1 preceding and 1 following) as rs7,
    sum(pv) over(partition by cookie order by pv rows between 2 preceding and 3 following) as rs8
from t1;

-- 3- 第三类: 取值函数。lead() lag() first_value() last_value()
select
    cookie,pv,
    -- 默认取下一行数据
    lead(pv) over(partition by cookie order by pv) as rs1,
    -- 默认取上一行数据
    lag(pv) over(partition by cookie order by pv) as rs2,
    -- 默认取窗口内的第一条数据
    first_value(pv) over(partition by cookie order by pv) as rs3,
    -- 默认取窗口内的最后一条数据
    last_value(pv) over(partition by cookie order by pv) as rs4
from t1;

五. 横向迭代

/*
需求: 已知 c1列数据, 计算出 c2 和 c3列数据
c2 = c1+2
c3=c1*(c2+3)
 */
-- 数据准备
select explode(sequence(1,3));
select stack(3,1,2,3);

-- 方式一:子查询
-- 计算c2
with t1 as (
    select explode(sequence(1,3)) as c1
)select c1,(c1+2) as c2 from t1;
-- 计算c3
with t1 as (
    select explode(sequence(1,3)) as c1
)
select c1,c2,c1*(c2+3) as c3 from
(select c1,(c1+2) as c2 from t1);

-- 方式二:视图方式
-- 准备数据
create temporary view view_t1 as
select explode(sequence(1,3)) as c1;

select * from view_t1;
-- 计算c2并创建视图
create temporary view view_t2 as
select c1,(c1+2) as c2 from view_t1;

select * from view_t2;
-- 计算c3并创建视图
create temporary view view_t3 as
select c1,c2,c1*(c2+3) as c3 from view_t2;

select * from view_t3;

六. 纵向迭代

需求: 计算 c4:

计算逻辑: 当c2=1 , 则 c4=1 ; 否则 c4 = (上一个c4 + 当前的c3)/2
在这里插入图片描述

-- 数据准备
create temporary view view_data (c1,c2,c3)
as values
(1,1,6),
(1,2,23),
(1,3,8),
(1,4,4),
(1,5,10),
(2,1,23),
(2,2,14),
(2,3,17),
(2,4,20);

select * from view_data;

方式一:创建临时视图继续计算c4的值,对于练习阶段数据量小还行,即使是数量小,也有很多重复代码,所以对于以后海量数据的计算,这种方法显然是不合理的。

--方式一:
-- 步骤一:当c2=1 , 则 c4=1
create temporary view col_tmp1 as
select c1,c2,c3,if(c2=1,1,null)as c4 from view_data;

select * from col_tmp1;

-- 步骤二:否则 c4 = (上一个c4 +  当前的c3)/2
create temporary view col_tmp2 as
select
c1,c2,c3,
if(c2=1,1,((lag(c4) over (partition by c1 order by c2))+c3)/2) as c4
from col_tmp1;

select * from col_tmp2;

create temporary view col_tmp3 as
select
c1,c2,c3,
if(c2=1,1,((lag(c4) over (partition by c1 order by c2))+c3)/2) as c4
from col_tmp2;

select * from col_tmp3;

create temporary view col_tmp4 as
select
c1,c2,c3,
if(c2=1,1,((lag(c4) over (partition by c1 order by c2))+c3)/2) as c4
from col_tmp3;

select * from col_tmp4;

create temporary view col_tmp5 as
select
c1,c2,c3,
if(c2=1,1,((lag(c4) over (partition by c1 order by c2))+c3)/2) as c4
from col_tmp4;

select * from col_tmp5;

方式二:基于pandas进行自定义聚合函数(UDAF)操作

#!/usr/bin/env python
# @desc : 
__coding__ = "utf-8"
__author__ = "bytedance"

import pyspark.sql.functions as F
import pandas as pd
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config('spark.sql.shuffle.partitions',1)\
        .appName('sparksql_udaf')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    spark.sql("""
        create temporary view view_data (c1,c2,c3)
        as values
        (1,1,6),
        (1,2,23),
        (1,3,8),
        (1,4,4),
        (1,5,10),
        (2,1,23),
        (2,2,14),
        (2,3,17),
        (2,4,20)
    """)

    # 3- 数据处理
    # 3.1- 当c2=1 , 则 c4=1
    spark.sql("""
        create temporary view heng_tmp_1 as
        select
            c1,c2,c3,if(c2=1,1,null) as c4
        from view_data
    """)

    spark.sql("""
        select * from heng_tmp_1
    """).show()

    # 3.2- 否则 c4 = (上一个c4 +  当前的c3)/2
    # 3.2.1- 基于Pandas实现UDAF函数,创建自定义的Python函数
    # 3.2.2- 注册进SparkSQL中
    # @F.pandas_udf(returnType=FloatType())
    @F.pandas_udf(returnType="float")
    def c4_udaf_func(c3:pd.Series, c4:pd.Series) -> float:
        print(f"{c3}")
        print(f"{c4}")

        tmp_c4 = None

        for i in range(0,len(c3)):
            if i==0:
                tmp_c4 = c4[i] # c4[0]
            else:
                tmp_c4 = (tmp_c4 + c3[i]) / 2

        return tmp_c4

    spark.udf.register("c4_udaf",c4_udaf_func)

    spark.sql("""
        select 
            c1,c2,c3,
            c4_udaf(c3,c4) over(partition by c1 order by c2) as c4
        from heng_tmp_1
    """).show()

    # 4- 数据输出
    # 5- 释放资源
    spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1423600.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

充电桩项目实战:搞定多数据源!

你好,我是田哥 最近,我在对充电桩项目进行微服务升级中,既然是项目升级,难免会遇到各种各样的问题。比如:分布式事务问题、多数据源问题、分布式锁问题等。 项目技术栈: SpringSpring BootSpring Cloud Ali…

实战教程:使用Spring Boot和Vue.js开发社区团购管理系统

✍✍计算机编程指导师 ⭐⭐个人介绍:自己非常喜欢研究技术问题!专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目:有源码或者技术上的问题欢迎在评论区一起讨论交流! ⚡⚡ Java实战 |…

【Docker】了解Docker Desktop桌面应用程序,TA是如何管理和运行Docker容器(1)

欢迎来到《小5讲堂》,大家好,我是全栈小5。 这是《Docker容器》序列文章,每篇文章将以博主理解的角度展开讲解, 特别是针对知识点的概念进行叙说,大部分文章将会对这些概念进行实际例子验证,以此达到加深对…

linux安装mongodb数据库启动报错? 都是冰红茶滴水儿

先展示报错信息 网上一大推说是关闭不正确导致的,然后给出的解决方法是 ./mongod -f mongodb.conf --repair吊用没有,还是报错: about to fork child process, waiting until server is ready for connections. forked process: 302226 ERROR: child process failed, exited…

资深Android逆袭、华为鸿蒙为安卓程序员开辟了一条新道路

本文章主要从以下5个方面来展开聊聊这个话题: 1.什么是鸿蒙 2.鸿蒙系统发展时间线 3.鸿蒙是套壳Android吗? 4.鸿蒙的生态(用户以及开发者) 5.一些建议 1月18日,在鸿蒙生态千帆启航仪式上,华为宣布了继鸿蒙4…

【原创】VMware创建子网,并使用软路由获得访问互联网的能力,并通过静态路由让上层网络访问位于虚拟机的子网

前言 一看标题就很离谱,确实内容也有点复杂,我的初衷是为后面搞软路由做准备,先通过VMware进行可行性验证,确定方案是否可行,再做下一步的计划。结论当然可以的,能通能访问,强的不行。 网络拓…

jdk17新特性—— 密封类(Sealed Classes)

目录 一、密封类(Sealed Classes)的概述1.1、概述1.2、特性1.3、注意事项 二、密封类(Sealed Classes)代码示例2.1、密封类(Sealed Classes)代码结构示例2.2、密封类(Sealed Classes)代码示例 三、密封类(Sealed Classes)接口代码示例3.1、密封类(Sealed Classes)接口代码结构示…

项目解决方案:4G/5G看交通数字化视频服务平台技术方案

目 录 1.总体描述 2.系统结构图 3.系统功能 3.1 信息交互 3.2 语音对讲 3.3 实时码流转换 3.4 流媒体集群和扩容 3.5 负载均衡 3.6 流媒体分发 3.7 流媒体点播 4.系统标准 4.1 流媒体传输 4.2 视频格式 4.3 质量标准 5.设备清单 1.总体描述 视频监控平…

LabVIEW潜油电泵数据采集系统

LabVIEW潜油电泵数据采集系统 介绍一个基于LabVIEW的潜油电泵数据采集系统。该系统目的是通过高效的数据采集和处理,提高潜油电泵的性能监控和故障诊断能力。 系统由硬件和软件两部分组成。硬件部分主要包括数据采集卡、传感器和电泵等,而软件部分则是…

STM32实时时钟(RTC)的配置和使用方法详解

实时时钟(RTC)是STM32系列微控制器上的一个重要模块,用于提供准确的时间和日期信息。在本文中,我们将详细介绍STM32实时时钟的配置和使用方法。 ✅作者简介:热爱科研的嵌入式开发者,修心和技术同步精进 ❤欢…

如何恢复已删除的照片?

在这篇综合文章中发现恢复丢失照片的有效且免费的方法。无论您使用的是智能手机、iPhone、Windows 计算机、Mac、SD 卡还是数码相机,我们都提供有关如何恢复已删除照片的分步说明。此外,学习一些有价值的技巧,以防止将来意外删除照片。 意外…

scienceplots绘图浅尝

前言 科研写作中,黑压压的文字里面如果能有一些优美的图片无疑会给论文增色不少,绘图的工具有很多,常用的有Excel、Python、Matlab等,Matlab在绘图方面相较于Python有一种更加原生的科研风,而且可视化编辑图例、坐标轴…

【数据结构与算法】之哈希表系列-20240130

这里写目录标题 一、383. 赎金信二、387. 字符串中的第一个唯一字符三、389. 找不同四、409. 最长回文串五、448. 找到所有数组中消失的数字六、594. 最长和谐子序列 一、383. 赎金信 简单 给你两个字符串:ransomNote 和 magazine ,判断 ransomNote 能不…

三分钟教你入门规则引擎Drools

Drools是一款基于Java语言的开源的规则引擎,可以将复杂且多变的规则从硬编码中解放出来,以规则脚本的形式存放在文件或者特定的存储介质中(eg:数据库表),使得业务规则的变更不需要修正项目代码,重启服务器就可以在线上环境立即生效…

正则表达式 与文本三剑客(sed grep awk)

一,正则表达式 (一)正则表达式相关定义 1,正则表达式含义 REGEXP: Regular Expressions,由一类特殊字符及文本字符所编写的模式,其中有些字符(元字符)不表示字符字面意…

【学网攻】 第(17)节 -- 命名ACL访问控制列表

系列文章目录 目录 前言 一、ACL(访问控制列表)是什么? 二、实验 1.引入 总结 文章目录 【学网攻】 第(1)节 -- 认识网络【学网攻】 第(2)节 -- 交换机认识及使用【学网攻】 第(3)节 -- 交换机配置聚合端口【学网攻】 第(4)节 -- 交换机划分Vlan【学网攻】 第…

Packet tracer-实现VLAN内部通信

案例一: 要求PC1和PC2,PC3和PC4之间能够实现互访 两个VLAN,一个VLAN对应一个子网 以S2为例: 步骤 1:在 S2 上创建并命令 VLAN,把VLAN划分给活动的端口。 步骤 2:在 S3 上创建并命令 VLAN&…

LeetCode Hot100 回顾(二)

子串 560.和为K的子数组 使用前缀和预处理一下题目给的数组, 然后用二重循环遍历一遍就可以了。 239.滑动窗口最大值 看题面比较容易想到的是用优先级队列来解决, 但是STL中的priority_queue不支持随机删除, 如果要用优先级队列来解决这道题的话比较复杂。这道题的一种正确…

QT + opengl 环境搭建(glfw, glad),创建一个简单窗口

一.下载glfw,glad并编译 1.glfw个人理解就是对底层opengl的一些基本接口的封装,提供了一些渲染物体所需的最低限度的接口。它允许用户创建OpenGL上下文、定义窗口参数以及处理用户输入。glfw的下载地址:Download | GLFW,下载完成后…

SpringBoot 结合 liteflow 规则引擎使用

1、前言 在日常的开发过程中,经常会遇到一些串行或者并行的业务流程问题,而业务之间不必存在相关性。 在这样的场景下,使用策略和模板模式的结合可以很好的解决这个问题,但是使用编码的方式会使得文件太多,在业务的部分环节可以…