Spark_SQL函数定义(定义UDF函数、使用窗口函数)

news2025/1/11 11:10:53

                    一、UDF函数定义

        (1)函数定义

        (2)Spark支持定义函数

        (3)定义UDF函数

                (4)定义返回Array类型的UDF

        (5)定义返回字典类型的UDF

二、窗口函数

        (1)开窗函数简述

        (2)窗口函数的语法


一、UDF函数定义

        (1)函数定义

        无论Hive还是SparkSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在pyspark.sql.functions中。SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。
        Hive中自定义函数有三种类型:

        第一种:UDF(User-Defined_-function)函数

                · 一对一的关系,输入一个值经过函数以后输出一个值;

                · 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;

        第二种:UDAF(User-Defined Aggregation Function)聚合函数

                · 多对一的关系,输入多个值输出一个值,通常于groupBy联合使用;

        第三种:UDTF(User-Defined Table-Generating Functions)函数

                · 一对多的关系,输入一个值输出多个值(一行变多为行);

                · 用户自定义生成函数,有点像flatMap;

        (2)Spark支持定义函数

        目前来说Spark框架各个版本及各种语言对自定义函数的支持:在SparkSQL中,目前仅仅支持UDF函数和UDAF函数,目前Python仅支持UDF。

Spark版本及支持函数定义
Apache Spark VersionSpark SQL UDF(Python,Java,Scala)Spark SQL UDAF(Java,Scala)Spark SQL UDF(R)Hive UDF,UDAF,UDTF
1.1-1.4
1.5experimental
1.6
2.0
        (3)定义UDF函数

        ①sparksession.udf.register()

        注册的UDF可以用于DSL和SQL,返回值用于DSL风格,传参内给的名字用于SQL风格。

        ②pyspark.sql.functions.udf

        仅能用于DSL风格

        其中F是:from pyspark.sql import functions as F。其中,被注册为UDF的方法名是指具体的计算方法,如:def add(x, y): x + y  。 add就是将要被注册成UDF的方法名

# cording:utf8
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])
    df = rdd.toDF(['num'])

    # TODO 1:方式1 sparksession.udf.register(),DSL和SQL风格均可使用
    # UDF的处理函数
    def num_ride_10(num):
        return num * 10
    # 参数1:注册的UDF的名称,这个UDF名称,仅可以用于SQL风格
    # 参数2:UDF的处理逻辑,是一个单独定义的方法
    # 参数3:声明UDF的返回值类型,注意:UDF注册时候,必要声明返回值类型,并且UDF的真实返回值一定要和声明的返回值一致
    # 当前这种方式定义的UDF,可以通过参数1的名称用于SQL风格,通过返回值对象用户的DSL风格
    udf2 = spark.udf.register('udf1', num_ride_10, IntegerType())

    # SQL风格中使用
    # selectExpr 以SELECT的表达式执行,表达式SQL风格的表达式(字符串)
    # select方法,接受普通的字符串字段名,或者返回值时Column对象的计算
    df.selectExpr('udf1(num)').show()

    # DSL 风格使用
    # 返回值UDF对象,如果作为方法使用,传入的参数一定是Column对象
    df.select(udf2(df['num'])).show()

    # TODO 2:方式2注册,仅能用于DSL风格
    udf3 = F.udf(num_ride_10, IntegerType())
    df.select(udf3(df['num'])).show()

        方式1结果:

        方式2结果:

                (4)定义返回Array类型的UDF

        注意:数组或者list类型,可以使用spark的ArrayType来描述即可。

        注意:声明ArrayType要类似这样::ArrayType(StringType()),在ArrayType中传入数组内的数据类型。

# cording:utf8
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([['hadoop spark flink'], ['hadoop flink java']])
    df = rdd.toDF(['line'])

    # 注册UDF,UDF的执行函数定义
    def split_line(data):
        return data.split(' ')

    # TODO 1:方式1 后见UDF
    udf2 = spark.udf.register('udf1', split_line, ArrayType(StringType()))

    # DLS 风格
    df.select(udf2(df['line'])).show()

    # SQL风格
    df.createTempView('lines')
    spark.sql('SELECT udf1(line) FROM lines').show(truncate=False)

    # TODO 2:方式的形式构建UDF
    udf3 = F.udf(split_line, ArrayType(StringType()))
    df.select(udf3(df['line'])).show(truncate=False)

        

        (5)定义返回字典类型的UDF

        注意:字典类型返回值,可以用StructType来进行描述,StructType是—个普通的Spark支持的结构化类型.
        只是可以用在:
                · DF中用于描述Schema
                · UDF中用于描述返回值是字典的数据

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # 假设 有三个数字: 1 2 3 在传入数字,返回数字所在序号对应的 字母 然后和数字结合组成dict返回
    # 例:传入1 返回{'num':1, 'letters': 'a'}
    rdd = sc.parallelize([[1], [2], [3]])
    df = rdd.toDF(['num'])

    # 注册UDF
    def process(data):
        return {'num': data, 'letters': string.ascii_letters[data]}

    '''
    UDF返回值是字典的话,需要用StructType来接收
    '''
    udf1 = spark.udf.register('udf1', process, StructType().add('num', IntegerType(), nullable=True).\
                              add('letters', StringType(), nullable=True))
    # SQL风格
    df.selectExpr('udf1(num)').show(truncate=False)
    # DSL风格
    df.select(udf1(df['num'])).show(truncate=False)

        (6)通过RDD构建UDAF函数

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
    df = rdd.map(lambda x: [x]).toDF(['num'])

    # 方法:使用RDD的mapPartitions 算子来完成聚合操作
    # 如果用mapPartitions API 完成UDAF聚合,一定要单分区
    single_partition_rdd = df.rdd.repartition(1)

    def process(iter):
        sum = 0
        for row in iter:
            sum += row['num']

        return [sum]    # 一定要嵌套list,因为mapPartitions方法要求返回值是list对象

    print(single_partition_rdd.mapPartitions(process).collect())

二、窗口函数

        (1)开窗函数简述

        ●介绍

        开窗函数的引入是为了既显示聚集前的数据又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。 开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用GROUP BY子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

        ●聚合函数和开窗函数

        聚合函数是将多行变成一行,count,avg...

        开窗函数是将一行变成多行;

        聚合函数如果要显示其他的列必须将列加入到group by中,开窗函数可以不使用group by,直接将所有信息显示出来。

        ●开窗函数分类

        1.聚合开窗函数 聚合函数(列)OVER(选项),这里的选项可以是PARTITION BY子句,但不可以是ORDER BY子句

        2.排序开窗函数 排序函数(列)OVER(选项),这里的选项可以是ORDER BY子句,也可以是OVER(PARTITION BY子句ORDER BY子句),但不可以是PARTITION BY子句。

        3.分区类型NTILE的窗口函数

        (2)窗口函数的语法

        窗口函数的语法:

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([
        ('张三', 'class_1', 99),
        ('王五', 'class_2', 35),
        ('王三', 'class_3', 57),
        ('王久', 'class_4', 12),
        ('王丽', 'class_5', 99),
        ('王娟', 'class_1', 90),
        ('王军', 'class_2', 91),
        ('王俊', 'class_3', 33),
        ('王君', 'class_4', 55),
        ('王珺', 'class_5', 66),
        ('郑颖', 'class_1', 11),
        ('郑辉', 'class_2', 33),
        ('张丽', 'class_3', 36),
        ('张张', 'class_4', 79),
        ('黄凯', 'class_5', 90),
        ('黄开', 'class_1', 90),
        ('黄恺', 'class_2', 90),
        ('王凯', 'class_3', 11),
        ('王凯杰', 'class_1', 11),
        ('王开杰', 'class_2', 3),
        ('王景亮', 'class_3', 99)])
    schema = StructType().add('name', StringType()).\
        add('class', StringType()).\
        add('score', IntegerType())
    df = rdd.toDF(schema)
    # 创建表
    df.createTempView('stu')

    # TODO 1:聚合窗口函数的演示
    spark.sql('''
        SELECT *, AVG(score) over() AS avg_socre FROM stu
    ''').show()

    # TODO 2: 排序相关的窗口函数计算
    # RANK over, DENSE_RANK over, ROW_NUMBER over
    spark.sql('''
        SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,
        DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
        RANK() OVER(ORDER BY score) AS RANK
        FROM stu
    ''').show()

    # TODO NTILE
    spark.sql('''
        SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
    ''').show()

        TODO1结果:

        TODO2结果展示:

        TODO3结果展示:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1135054.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用VScode做PPT:marp插件

文章目录 初步认识指令设置图像设置布局设置 初步认识 marp是支持Markdown格式的PPT神器,有了这个就可以敲代码写PPT了。更绝的是,marp提供了VScode插件,故而可以愉快地在VScode中写PPT了。 在VScode扩展商店中搜索marp,安装Mar…

双向电平电压转换器TXS0102DCTR应用电路设计

1、TXS0102简介 TXS0102DCTR是一个2位双向电压电平转换器,主要用途是与数据I/O(例如I2C或1-wire)上的开漏驱动器连接(其中数据是双向的且无可用的控制信号),在混合电压系统之间建立数字开关兼容性。它使用…

Linux系统编程07

线程 为什么有了进程还需要线程 进程切换的时候会花费很大的代价 (1)上下文切换,CPU寄存器需要切换 (2)虚拟地址和物理地址的映射需要切换 进程间通信麻烦 线程是轻量级的进程 (1)线程是一个正…

【设计模式】第4节:创建型模式之“单例模式”

一、介绍 采取一定的方法保证在整个的软件系统中,对某个类只能存在一个对象实例,并且该类只提供一个取得其对象实例的方法。 不使用单例模式的UML类图: 使用单例模式的UML类图: 使用场景: 需要频繁创建或销毁的对象…

【密评】商用密码应用安全性评估从业人员考核题库(十二)

商用密码应用安全性评估从业人员考核题库(十二) 国密局给的参考题库5000道只是基础题,后续更新完5000还会继续更其他高质量题库,持续学习,共同进步。 2751 多项选择题 GM/T 0051《 密码设备管理 对称密钥管理技术规范…

倾斜摄影三维模型根节点合并效率提升的技术方法分析

倾斜摄影三维模型根节点合并效率提升的技术方法分析 提高倾斜摄影三维模型根节点合并效率是倾斜摄影领域的重要挑战之一。快速而准确地处理大量数据和复杂的场景需要使用高效的技术方法。本文将探讨几种可以提高倾斜摄影三维模型根节点合并效率的技术方法。 首先,使…

可视化工具Datart踩(避)坑指南(7)——下载的极限

作为目前国内开源版本最好用的可视化工具之一,Datart无疑是低成本高效率可供二开的可视化神兵利器。当然,免费的必然要付出一些踩坑的代价。 本篇我们来讲一讲可视化工具Datart踩(避)坑指南(7)之下载的极限…

大语言模型在天猫AI导购助理项目的实践!

本文主要介绍了Prompt设计、大语言模型SFT和LLM在手机天猫AI导购助理项目应用。 ChatGPT基本原理 “会说话的AI”,“智能体” 简单概括成以下几个步骤: 预处理文本:ChatGPT的输入文本需要进行预处理。 输入编码:ChatGPT将经过预…

Ubuntu22.04(非虚拟机)安装教程(2023最新最详细)

目录 简介 一.下载Ubuntu Server镜像,官方地址下载即可 ​二.安装Ubuntu镜像 简介 Linux是一种自由和开放源代码的操作系统内核,被广泛应用于各种计算机系统中。它以稳定性、安全性和灵活性而闻名,并成为服务器、嵌入式设备和个人计算机等…

国产手机性能再次飞升,H公司落后三代,但仍然比不过苹果

国产手机将采用全新的芯片,性能将进一步提升,这是国产手机的又一个重大进步,这次不再挤牙膏,真正为消费者带来性能跃升的手机,让消费者刷视频更流畅,玩游戏也更畅快。 据了解国产手机即将采用的新款芯片骁龙…

EMT4J—— Java 版本迁移检测工具

最近因为工作需要研究了emt4j,这里写一篇文章记录一下。 非专业Java er,有不同意见欢迎评论区分享。 目录 EMT4J是什么? 如何使用? Command-line Java Agent 简单的源码分析 目录分析 规则解析 参考资料 EMT4J是什么&am…

nginx只允许英文名的文件下载,中文名就是找不到文件

本文主要向大家介绍了Linux运维知识之linux下nginx不支持中文URL路径的解决方案,通过具体的内容向大家展现,希望对大家学习Linux运维知识有所帮助。 1、确定你的系统是UTF编码 [rootlocalhost ~]# echo $LAGN en_US.UTF-8 2、nginx配置文件里默认编码…

python爬虫之正则表达式实战----爬取图片

文章目录 1. 图片爬取流程分析2. 爬取家常菜图片 1. 图片爬取流程分析 先获取网址,URL:https://www.xiachufang.com/category/40076/ 定位想要爬取的内容使用正则表达式爬取导入模块指定URLUA伪装(模拟浏览器)发起请求&#xff0…

【springcloud-config】配置中心客户端导入依赖spring-cloud-config-server后,maven一直爆红问题解决

问题描述 配置中心客户端导入了 spring-cloud-config-server 后&#xff0c;导入依赖爆红&#xff1b; 解决办法&#xff1a; 参考官网中文文档&#xff1a;spring-cloud -config 配置中心 中文文档 补充导入 spring-config-starter-config 配置即可 <!--springcloud-c…

Transformer英语-法语机器翻译实例

依照Transformer结构来实例化编码器&#xff0d;解码器模型。在这里&#xff0c;指定Transformer编码器和解码器都是2层&#xff0c;都使用4头注意力。为了进行序列到序列的学习&#xff0c;我们在英语-法语机器翻译数据集上训练Transformer模型&#xff0c;如图11.2所示。 da…

【设计模式】第5节:创建型模式之“简单工厂、工厂方法和抽象工厂模式”

一、简单工厂模式 ProductFactory是创建商品的工厂&#xff0c;商品Product可以实现Product接口中的一些功能。 当需要根据入参的不同生成多种不同的产品时&#xff0c;可以将生成不同产品的逻辑剥离出来&#xff0c;使用产品工厂创建不同的产品。 二、工厂方法 ConcreteFact…

知识点滴 - Email地址不区分大小写

电子邮件地址本身对字符大小写不敏感。这意味着实际的电子邮件地址&#xff0c;如 "exampleemail.com"&#xff0c;并不区分字母的大小写。无论你输入的是大写字母还是小写字母&#xff0c;它仍然会到达同一个电子邮件账户。例如&#xff0c;如果您的电子邮件地址是 …

信创生态丨九州未来与超聚变完成兼容性互认证

近日&#xff0c;九州未来与超聚变积极开展了兼容性适配工作&#xff0c;并完成产品兼容性互认证。双方兼容性测试基于Intel64、鲲鹏架构完成&#xff0c;测试结果显示&#xff1a;九州未来Animbus IaaS V8可基于超聚变FusionOS 23服务器操作系统安全稳定运行&#xff0c;产品相…

并发编程-线程池ForkJoinPool工作原理分析

由一道算法题引发的思考 算法题&#xff1a; 如何充分利用多核CPU的性能&#xff0c;快速 对一个2千万大小的数组进行排序&#xff1f; 分解 求解 合并 这道算法题可以拆解来看&#xff1a; 1&#xff09;首先这是一道排序的算法题&#xff0c;而且是需要使用高效的排序算法…

[量化投资-学习笔记003]Python+TDengine从零开始搭建量化分析平台-Grafana画K线图

在前面两个笔记&#xff1a; PythonTDengine从零开始搭建量化分析平台-数据存储 PythonTDengine从零开始搭建量化分析平台-MA均线的多种实现方式 中有提到使用 Grafana 画图&#xff0c;不过画的都是均线。除了均线&#xff0c;Grafana 非常人性的提供离 K线图模块。 配置简单…