Spark学习(7)-SparkSQL函数定义

news2024/12/23 0:03:39

1 SparkSQL 定义UDF函数

在这里插入图片描述

目前在SparkSQL中,仅仅支持UDF和UDAF函数,python仅支持UDF

1.1 定义方式

定义方式有两种:

  1. sparksession.udf.register()
    注册的UDF可以用于DSL和SQL,返回值用于DSL风格,传参内的名字用于SQL风格。

    udf对象 = sparksession.udf.register(参数1,参数2,参数3

    参数1:UDF名称,可用于SQL风格
    参数2:被注册成UDF的方法名
    参数3:声明UDF的返回值类型
    udf对象: 返回值对象,是一个UDF对象,可用于DSL风格

  2. pyspark.sql.functions.udf
    仅能用于DSL风格

    udf对象 = F.udf(参数1, 参数2)
    

    参数1:被注册成UDF的方法名
    参数2:声明UDF的返回值类型
    udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
    其中F是:

    from pyspark.sql import functions as F
    

    其中,被注册成UDF的方法名是指具体的计算方法,如:

    #add就是将要被注册成UDF的方法名
    def add(x, y): x + y
    

1.2 构建一个Interger返回值类型的UDF

# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :01_create_integer_udf.py
@Date      :2022/11/29 16:51
@Author    :wuk
@Description  : 构建一个Integer返回值类型的的UDF
"""
from pyspark.sql import SparkSession, functions
from pyspark.sql.types import IntegerType

if __name__ == '__main__':
    spark = SparkSession.builder.master("local[*]")\
        .appName("test")\
        .config("spark.sql.shuffle.partitions", 2)\
        .getOrCreate()
    sc = spark.sparkContext

    # 构建一个rdd
    rdd_map = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x: [x])
    df = rdd_map.toDF(["num"])

    # TODO 1: 方式1 sparksession.udf.register(), DSL和SQL风格均可以使用
    # UDF的处理函数
    def num_ride_10(num):
        return num * 10

    # 参数1: 注册的UDF的名称, 这个udf名称, 仅可以用于 SQL风格
    # 参数2: UDF的处理逻辑, 是一个单独的方法
    # 参数3: 声明UDF的返回值类型, 注意: UDF注册时候, 必须声明返回值类型, 并且UDF的真实返回值一定要和声明的返回值一致
    # 返回值对象: 这是一个UDF对象, 仅可以用于 DSL 语法
    # 当前这种方式定义的UDF, 可以通过参数1的名称用于SQL风格, 通过返回值对象用户DSL风格
    udf1 = spark.udf.register("udf1",num_ride_10,IntegerType())
    # SQL风格中使用
    # selectExpr 以SELECT的表达式执行, 表达式 SQL风格的表达式(字符串)
    # select方法, 接受普通的字符串字段名, 或者返回值是Column对象的计算
    df.selectExpr("udf1(num)").show()

    # DSL 风格中使用
    # 返回值UDF对象 如果作为方法使用, 传入的参数 一定是Column对象
    df.select(udf1(df['num'])).show()

    # TODO 2: 方式2注册, 仅能用于DSL风格
    udf = functions.udf(num_ride_10, IntegerType())
    df.select(udf(df['num'])).show()

1.3 注册一个ArrayType(数字\list)类型的返回值UDF

# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :02_create_array_udf.py
@Date      :2022/11/29 17:21
@Author    :wuk
@Description  : 注册一个ArrayType(数字\list)类型的返回值UDF
"""

from pyspark.sql import SparkSession, functions
from pyspark.sql.types import StringType, ArrayType

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])
    df = rdd.toDF(["line"])

    # 注册UDF, UDF的执行函数定义
    def split_line(data):
        return data.split(" ")  # 返回值是一个Array对象

    # TODO1 方式1 构建UDF
    udf2 = spark.udf.register("udf1",split_line,ArrayType(StringType()))

    # DLS风格
    df.select(udf2(df['line'])).show(truncate=False)

    # SQL风格
    df.createTempView("lines")
    spark.sql("select udf1(line) from lines").show(truncate=False)

    # TODO 2 方式2的形式构建UDF
    udf3 = functions.udf(split_line, ArrayType(StringType()))
    df.select(udf3(df['line'])).show(truncate=False)

注意:
数组或者list类型,可以使用ArrayType来描述,同时需要传入数组内类型。

1.4 注册一个字典类型的返回值UDF

# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :03_create_dict_udf.py
@Date      :2022/11/29 18:15
@Author    :wuk
@Description  : 注册一个字典类型的返回值UDF
"""
import string

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder. \
        appName("test"). \
        master("local[*]"). \
        config("spark.sql.shuffle.partitions", 2). \
        getOrCreate()
    sc = spark.sparkContext

    # 假设 有三个数字  1 2 3  我们传入数字 ,返回数字所在序号对应的 字母 然后和数字结合形成dict返回
    # 比如传入1 我们返回 {"num":1, "letters": "a"}
    rdd = sc.parallelize([[1], [2], [3]])
    df = rdd.toDF(["num"])


    # 注册UDF
    def process(data):
        return {"num": data, "letters": string.ascii_letters[data]}


    """
    UDF的返回值是字典的话, 需要用StructType来接收
    """
    udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True). \
                              add("letters", StringType(), nullable=True))

    df.selectExpr("udf1(num)").show(truncate=False)
    df.select(udf1(df['num'])).show(truncate=False)


注意: 字典类型返回值, 可以用StructType来进行描述,StructType是一个普通的Spark支持的结构化类型
只是可以用在:

  • DF中用于描述Schema
  • UDF中用于描述返回值是字典的数据。

2 SparkSQL 使用窗口函数

2.1 介绍

2.2 语法

在这里插入图片描述

2.3 开窗函数的使用

# coding:utf8
# 演示sparksql 窗口函数(开窗函数)
import string
from pyspark.sql import SparkSession
# 导入StructType对象
from pyspark.sql.types import ArrayType, StringType, StructType, IntegerType
import pandas as pd
from pyspark.sql import functions as F

if __name__ == '__main__':
    spark = SparkSession.builder. \
        appName("create df"). \
        master("local[*]"). \
        config("spark.sql.shuffle.partitions", "2"). \
        getOrCreate()
    sc = spark.sparkContext
    rdd = sc.parallelize([
        ('张三', 'class_1', 99),
        ('王五', 'class_2', 35),
        ('王三', 'class_3', 57),
        ('王久', 'class_4', 12),
        ('王丽', 'class_5', 99),
        ('王娟', 'class_1', 90),
        ('王军', 'class_2', 91),
        ('王俊', 'class_3', 33),
        ('王君', 'class_4', 55),
        ('王珺', 'class_5', 66),
        ('郑颖', 'class_1', 11),
        ('郑辉', 'class_2', 33),
        ('张丽', 'class_3', 36),
        ('张张', 'class_4', 79),
        ('黄凯', 'class_5', 90),
        ('黄开', 'class_1', 90),
        ('黄恺', 'class_2', 90),
        ('王凯', 'class_3', 11),
        ('王凯杰', 'class_1', 11),
        ('王开杰', 'class_2', 3),
        ('王景亮', 'class_3', 99)
    ])
    schema = StructType().add("name", StringType()). \
        add("class", StringType()). \
        add("score", IntegerType())
    df = rdd.toDF(schema)
    # 窗口函数只用于SQL风格, 所以注册表先
    df.createTempView("stu")
    # TODO 聚合窗口
    spark.sql("""
    SELECT *, AVG(score) OVER() AS avg_score FROM stu
    """).show()
    # SELECT *, AVG(score) OVER() AS avg_score FROM stu 等同于
    # SELECT * FROM stu
    # SELECT AVG(score) FROM stu
    # 两个SQL的结果集进行整合而来
    spark.sql("""
    SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu
    """).show()
    # SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu 等同于
    # SELECT * FROM stu
    # SELECT AVG(score) FROM stu GROUP BY class
    # 两个SQL的结果集进行整合而来
    # TODO 排序窗口
    spark.sql("""
    SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank, 
    DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank, 
    RANK() OVER(ORDER BY score) AS rank
    FROM stu
    """).show()
    # TODO NTILE
    spark.sql("""
    SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
    """).show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/46702.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何在Odoo中添加水印?

为了防止信息的泄露,水印作为一种防泄密的方式,被使用的频率越来越高。 那么在Odoo中,如何添加水印呢?其实添加的方法有很多,如利用svg生成背景图,重复的dom元素覆盖等等。 本文主要讲解利用canvas输出背…

不懂单链表? 这篇文章就帮你搞明白

坚持看完,结尾有思维导图总结 链表对指针的操作要求不低链表的概念链表的特性链表的功能(最重要)定义和初始化头插头删细节说明尾插尾删寻找链表元素与打印链表在 某位置后插入删除在某位置的插入删除销毁链表链表的概念 什么是链表 官方概念:链表是一种…

链表(1)

我们以前学过的线性数据结构底层原理都是依托静态数组来实现的,今天我们讲学习一个最简单的动态数据结构---->链表! 掌握链表有助于学习更加复杂的数据结构,例如:二叉树、trie 链表的优点是不需要处理固定的问题,…

mavon-editor的使用

vue3vitets下安装mavon-editor 3.0.0-beta版本&#xff0c;效果如下&#xff1a; 安装 //引入样式 import mavon-editor/dist/css/index.css; import mavonEditor from mavon-editor; app.use(router).use(mavonEditor).mount(#app);<template><div class"rich…

zabbix主动监控和被动监控

目录 一、环境准备 1、搭建zabbix基础环境 二、主动监控与被动监控介绍 三、设置客户端为主动监控 1、给web2主机安装zabbix_agent 2、修改主动监控配置 四、设置zabbix管理端主动监控 1、克隆模板 2、给目标主机绑定主动监控模板 3、查看主动监控的数据 一、环境准备…

【HIT-OSLAB-实验中的碎碎念】

文章目录应该养成的好习惯删除 替换 修改 内容时 记得留备份遇到问题要通过文字 图片 等多种途径去记录不同的项目应该在不同的文件夹进行处理代码文档 记得添加一些注释用于说明功能多输出有区别度的提示信息s找bug 先定位错误 再改当一份代码有不同版本的时候 记得说明每份代…

109376-05-8,Boc-QRR-AMC, Hepsin substrate

Boc-QRR-AMC是跨膜丝氨酸蛋白酶hepsin的底物&#xff0c;也用于测定酿酒酵母中的可辛(Kex2内蛋白酶)。Boc-QRR-AMC的库存解决方案最好在DMSO中准备。 编号: 187545中文名称: Hepsin substrate&#xff1a;Boc-Gln-Arg-Arg-7-氨基-4-甲基香豆素英文名: Boc-Gln-Arg-Arg-AMCCAS号…

全球No.1集装箱人工智能企业CIMCAI中集飞瞳,集装箱信息识别铅封号识别API免费,集装箱识别率99.98%高泛化性,全球两千+企业用户使用

全球No.1集装箱人工智能企业CIMCAI中集飞瞳&#xff0c;先进人工智能AI科技打造飞瞳引擎™AI集装箱检测云服务&#xff0c;集装箱信息识别铅封号识别API免费&#xff0c;集装箱识别率99.98%高泛化性&#xff0c;全球两千企业用户使用。CIMCAI中集飞瞳成熟港航人工智能核心技术及…

3年功能测试拿8K,被刚入职的应届生反超,其实你在假装努力

最近朋友给我分享了一个他公司发生的事 大概的内容呢&#xff1a;公司一位工作3年的测试工资还没有新人高&#xff0c;对此怨气不小&#xff0c;她来公司辛辛苦苦三年&#xff0c;三年内迟到次数都不超过5次&#xff0c;每天都是按时上下班&#xff0c;工作也按量完成&#xf…

PyQT6关联信号槽 (六) 百篇文章学PyQT6

本文章是百篇文章学PyQT6的第六篇&#xff0c;本文讲述如何使用PySide创建UI界面&#xff0c;并且关联入PyCharm 新建的项目中成功运行第一个PyQT程序&#xff0c;并且使用 信号槽 connect 到函数&#xff0c;在写博客和学习的过程中会遇到很多问题&#xff0c;例如&#xff1a…

Python实现点选验证码识别, B站模拟登陆

话不多说&#xff0c;今天就分享一下如何用Python实现点选验证码识别&#xff0c;小破站模拟登陆 开发环境 Python 3.8Pycharm 2021.2谷歌浏览器谷歌驱动 模块使用 selenium >>> pip install selenium3.141.0 指定版本安装time打码平台 模块安装问题: -如果安装…

Java注解(Annotation)

一、什么是注解 个人理解&#xff0c;注解就是代码中的特殊标记&#xff0c;这些标记可以在编译、类加载、运行时被读取&#xff0c;从而做相对应的处理。 注解跟注释很像&#xff0c;区别是注释是给人看的&#xff1b;而注解是给程序看的&#xff0c;它可以被编译器读取。 …

ERP软件定价策略与模型设计

ERP软件定价(价格)的高低是ERP厂商整体竞争力强弱的一个重要指针&#xff0c;也是影响客户购买行为的重要因素。客户购买某一ERP软件&#xff0c;总是面临不同的ERP厂商﹑不同渠道的多种选择&#xff0c;ERP软件价格往往成了除软件功能﹑售后服务态度、实施水平等因素外&#x…

web前端-Ajax基础学习

web前端-Ajax基础学习1. Ajax基础描述1.1 URL地址的概念1.2 客户端和服务器的通信过程1.3 Ajax1.3.1 $.get()函数1.3.2 $.post()1.3.3 $.ajax()1.4 接口1.4.1 GET、POST方式请求的过程1.4.2 接口文档2. form表单与模版引擎2.1 表单的基本介绍2.2 form表单同步提交的缺点2.3 通过…

stm32 笔记 外部中断以及HAL库应用

外部中断 由外部设备发起的中断请求&#xff0c;会使得设备暂停当前的主程序&#xff0c;保存标志位并把当前指令压入堆栈&#xff0c;转而去执行中断的子程序。执行完毕后再弹出执行堆栈&#xff0c;恢复标志位&#xff0c;继续执行主程序。 STM32 的外部中断线 STM32的每个…

嵌入式 C语言/C++ 常见笔试、面试题 难疑点汇总(经典100道)

#pragma comment。将一个注释记录放置到对象文件或可执行文件中。 #pragma pack。用来改变编译器的字节对齐方式。 #pragma code_seg。它能够设置程序中的函数在obj文件中所在的代码段。如果未指定参数&#xff0c;函数将放置在默认代码段.text中 #pragma once。保证所在文件只…

Pytest接口测试框架实战项目搭建(三)

一、前言 前面相当于已经讲完整体框架搭建了&#xff0c;本篇主要讲述在实际业务系统的接口请求中&#xff0c;如何运用好该接口自动化测试框架。 二、步骤演示 1、在conf/api_path.py新增需要测试的接口&#xff0c;标黄底色为新加 存放测试接口仅这一个文件就行&#xff0c…

吉时利2604B系列数字源表,双通道,3A直流/10A脉冲

作为2600B系列源表SMU系列产品的一部分&#xff0c;2602B源表SMU是全新改良版双通道SMU&#xff0c;具有紧密集成的4象限设计&#xff0c;能同步源和测量电压/电流以提高研发到自动生产测试等应用的生产率。除保留了2602A的全部产品特点外&#xff0c;2602B还具有6位半分辨率、…

Android 面试题收集:Handler+Binder+Activity+时间分发机制+View绘制流程+……等

一、Handler相关知识 一个线程只有一个Looper&#xff0c;一个Messagequeue&#xff0c;可以创建多个handler。 1、Handler与Looper的关联是怎样的? 实例化 Handler 的时候 Handler 会去检查当前线程的 Looper 是否存在&#xff0c;如果不存在则会报异常&#xff0c;也就是…

关于TreeView的简单使用(Qt6.4.1)

前言 TreeView是在Qt6.3中加入的&#xff0c;弥补了Qt中无官方树图。笔者上手尝试了下&#xff0c;虽然有点麻烦&#xff0c;但官方也做了不少简化。 本次教程&#xff0c;笔者创建一个简单的示例&#xff0c;以帮助读者使用TreeView。 一、创建模型类 当前模型需要使用C定义…