Spark 7:Spark SQL 函数定义

news2025/1/16 8:19:36

SparkSQL 定义UDF函数

8e9d7ddcc2f04a4fa3d0b7e78921b4f8.png

21cc4f7f05aa4c4d8e9db15c1d271cb1.png

方式1语法:
udf对象 = sparksession.udf.register(参数1,参数2,参数3)
参数1:UDF名称,可用于SQL风格
参数2:被注册成UDF的方法名
参数3:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
方式2语法:
udf对象 = F.udf(参数1, 参数2)
参数1:被注册成UDF的方法名
参数2:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
其中F是:
from pyspark.sql import functions as F
其中,被注册成UDF的方法名是指具体的计算方法,如:
def add(x, y): x + y
add就是将要被注册成UDF的方法名

8dee25a8da254eec809eb0c8a5d95baf.png  

# coding:utf8
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])
    df = rdd.toDF(["num"])

    # TODO 1: 方式1 sparksession.udf.register(), DSL和SQL风格均可以使用
    # UDF的处理函数
    def num_ride_10(num):
        return num * 10
    # 参数1: 注册的UDF的名称, 这个udf名称, 仅可以用于 SQL风格
    # 参数2: UDF的处理逻辑, 是一个单独的方法
    # 参数3: 声明UDF的返回值类型, 注意: UDF注册时候, 必须声明返回值类型, 并且UDF的真实返回值一定要和声明的返回值一致
    # 返回值对象: 这是一个UDF对象, 仅可以用于 DSL 语法
    # 当前这种方式定义的UDF, 可以通过参数1的名称用于SQL风格, 通过返回值对象用户DSL风格
    udf2 = spark.udf.register("udf1", num_ride_10, IntegerType())

    # SQL风格中使用
    # selectExpr 以SELECT的表达式执行, 表达式 SQL风格的表达式(字符串)
    # select方法, 接受普通的字符串字段名, 或者返回值是Column对象的计算
    df.selectExpr("udf1(num)").show()

    # DSL 风格中使用
    # 返回值UDF对象 如果作为方法使用, 传入的参数 一定是Column对象
    df.select(udf2(df['num'])).show()

    # TODO 2: 方式2注册, 仅能用于DSL风格
    udf3 = F.udf(num_ride_10, IntegerType())
    df.select(udf3(df['num'])).show()

    df.selectExpr("udf3(num)").show()

2562d46c19ee47c3b20f47fea545fb36.png

af9b0504fc474fd8896cb20f93094922.png 

# coding:utf8
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])
    df = rdd.toDF(["line"])

    # 注册UDF, UDF的执行函数定义
    def split_line(data):
        return data.split(" ")  # 返回值是一个Array对象
    # TODO1 方式1 构建UDF
    udf2 = spark.udf.register("udf1", split_line, ArrayType(StringType()))

    # DLS风格
    df.select(udf2(df['line'])).show()
    # SQL风格
    df.createTempView("lines")
    spark.sql("SELECT udf1(line) FROM lines").show(truncate=False)

    # TODO 2 方式2的形式构建UDF
    udf3 = F.udf(split_line, ArrayType(StringType()))
    df.select(udf3(df['line'])).show(truncate=False)

0e2c268c421848cfbc639a80fc067838.png

fe0861021c4d4be5a8b0566697bf5905.png 

# coding:utf8
import string
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 假设 有三个数字  1 2 3  我们传入数字 ,返回数字所在序号对应的 字母 然后和数字结合形成dict返回
    # 比如传入1 我们返回 {"num":1, "letters": "a"}
    rdd = sc.parallelize([[1], [2], [3]])
    df = rdd.toDF(["num"])

    # 注册UDF
    def process(data):
        return {"num": data, "letters": string.ascii_letters[data]}

    """
    UDF的返回值是字典的话, 需要用StructType来接收
    """
    udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\
                              add("letters", StringType(), nullable=True))

    df.selectExpr("udf1(num)").show(truncate=False)
    df.select(udf1(df['num'])).show(truncate=False)

2034e0d567664867a1c76c153620c99e.png  

c5a05c06e0c24009abe4b20cdc81ed7e.png

 SparkSQL 使用窗口函数

f3cf9802e8084fc4bf2518765b500397.png

7d43ea9fba3c479094ff69bb62cd99da.png

# coding:utf8
# 演示sparksql 窗口函数(开窗函数)
import string
from pyspark.sql import SparkSession
# 导入StructType对象
from pyspark.sql.types import ArrayType, StringType, StructType, IntegerType
import pandas as pd
from pyspark.sql import functions as F

if __name__ == '__main__':
    spark = SparkSession.builder. \
        appName("create df"). \
        master("local[*]"). \
        config("spark.sql.shuffle.partitions", "2"). \
        getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([
    ('张三', 'class_1', 99),
    ('王五', 'class_2', 35),
    ('王三', 'class_3', 57),
    ('王久', 'class_4', 12),
    ('王丽', 'class_5', 99),
    ('王娟', 'class_1', 90),
    ('王军', 'class_2', 91),
    ('王俊', 'class_3', 33),
    ('王君', 'class_4', 55),
    ('王珺', 'class_5', 66),
    ('郑颖', 'class_1', 11),
    ('郑辉', 'class_2', 33),
    ('张丽', 'class_3', 36),
    ('张张', 'class_4', 79),
    ('黄凯', 'class_5', 90),
    ('黄开', 'class_1', 90),
    ('黄恺', 'class_2', 90),
    ('王凯', 'class_3', 11),
    ('王凯杰', 'class_1', 11),
    ('王开杰', 'class_2', 3),
    ('王景亮', 'class_3', 99)
])
schema = StructType().add("name", StringType()). \
    add("class", StringType()). \
    add("score", IntegerType())
df = rdd.toDF(schema)
# 窗口函数只用于SQL风格, 所以注册表先
df.createTempView("stu")
# TODO 聚合窗口
spark.sql("""
SELECT *, AVG(score) OVER() AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER() AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu
# 两个SQL的结果集进行整合而来
spark.sql("""
SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu GROUP BY class
# 两个SQL的结果集进行整合而来
# TODO 排序窗口
spark.sql("""
SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank, 
DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank, 
RANK() OVER(ORDER BY score) AS rank
FROM stu
""").show()
# TODO NTILE
spark.sql("""
SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
""").show()

SparkSQL支持UDF和UDAF定义,但在Python中,暂时只能定义UDF
UDF定义支持2种方式, 1:使用SparkSession对象构建. 2: 使用functions包中提供的UDF API构建. 要注意, 方式1可用DSL和SQL风格, 方式2 仅可用于DSL风格
SparkSQL支持窗口函数使用, 常用SQL中的窗口函数均支持, 如聚合窗口\排序窗口\NTILE分组窗口等

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/930498.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

部分调试记录

Ubuntu16.04纯命令行安装VMwareTools hudahuahudahua-virtual-machine:~$ sudo apt-get install open-vm-tools -yhudahuahudahua-virtual-machine:~$ sudo apt-get install open-vm-tools-desktop无法加载so文件,版本问题 [rootdragonboard /]# ./Qserial -qws .…

工厂生产作业流程合规检测

工厂生产作业流程合规检测系统通过yolov7网络模型算法,工厂生产作业流程合规检测对作业人员的操作行为进行全面监测,通过图像识别算法和数据分析,对人员的操作动作、工具使用、安全防护等方面进行检测和评估,能够实时监测工人的操…

10行Python代码能做出哪些酷炫的事情?

Python凭借其简洁的代码,赢得了许多开发者的喜爱。因此也就促使了更多开发者用Python开发新的模块,从而形成良性循环,Python可以凭借更加简短的代码实现许多有趣的操作。下面我们来看看,我们用不超过10行代码能实现些什么有趣的功…

【Linux】【驱动】驱动挂载的时候给驱动传递参数

【Linux】【驱动】驱动挂载的时候给驱动传递参数 绪论1.什么是驱动传参驱动传参就是传递参数给我们的驱动举例:2.驱动传参数有什么作用呢?3. 传递单个参数使用如下的数组4. 传递数组使用以下函数: 传递数字值代码指令 传递数组代码传递数组指令 绪论 1.什么是驱动…

拒绝摆烂!C语言练习打卡第六天

🔥博客主页:小王又困了 📚系列专栏:每日一练 🌟人之为学,不日近则日退 ❤️感谢大家点赞👍收藏⭐评论✍️ 目录 一、选择题 📝1.第一题 📝2.第二题 &#x1f4d…

hbuilderx小程序基于Android的个人健身运动记录生活APP_45n2x

近些年来,随着科技的飞速发展,互联网的普及逐渐延伸到各行各业中,给人们生活带来了十分的便利,记录生活信息利用计算机网络实现信息化管理,使整个记录生活管理的发展和服务水平有显著提升。 本文拟采用Android平台进行…

玩转 PI 系列-看起来像服务器的 ARM 开发板矩阵-Firefly Cluster Server

前言 基于我个人的工作内容和兴趣,想要在家里搞一套服务器集群,用于容器/K8s 等方案的测试验证。 考虑过使用二手服务器,比如 Dell R730, 还搞了一套配置清单,如下: Dell R7303.5 尺寸规格硬盘CPU: 2686v4*2 内存&a…

【Linux网络】TCP UDP socket HTTP webSocket之间的区别

目录 一、OSI & TCP/IP模型 二、几者之间的关系 三、HTTP 四、Socket 五、WebSocket 5.1、WebSocket 优点 一、OSI & TCP/IP模型 首先我们要了解OSI七层模型,和预支对应的TCP/IP 四层的模型。 用下面的图可以看出,TCP UDP 工作在传输层&…

JavaScript:基本语法(变量与函数的定义与使用)

文章目录 script 标签srcdefer 延迟加载 基本语法定义变量 与 使用变量基本类型typeof 查看变量类型复合类型数组类型定义对象类型定义 函数定义函数使用函数 script 标签 src 和scc一样可以内嵌也可以外src外引。 一般是推荐外引。 <script src"idx.js">&l…

机器视觉应用开发四大软件,那一个对我们从0到1建设你的机器视觉知识体系更好

我们首先要理解什么是知识体系&#xff1a; 我们身处一个知识爆炸时代&#xff0c;我们面对各种课程&#xff0c;各种知识&#xff0c;“自身学习”&#xff0c;“高效记忆”&#xff0c;“批判性思维”&#xff0c;“解决问题的能力”。各种平台课程太多&#xff0c;各种买买…

《华为认证》配置vlan聚合

1、实验环境&#xff1a;如图所示&#xff0c;配置vlan 10、20、100 &#xff0c;vlan 100作为聚合vlan&#xff0c;vlan 10、vlan20作为子vlan&#xff0c;vlan 10和vlan 20配置成相同网段的ip地址。Vlanif 100 作为vlan 10 和vlan20的网关&#xff0c;在vlanif100上配置arp代…

linux查看正在运行的nginx在哪个文件夹当中

1、查出Nginx进程PID ps -ef|grep nginx2、查看Nginx进程启动时的工作目录 ls -la /proc/<PID>/cwd将<PID>替换为第一步中列出的Nginx进程的PID。该命令会显示Nginx进程在启动时所在的工作目录&#xff08;当前工作目录&#xff09;

基于Nodejs+vue的学习笔记分享系统设计与开发

本系统结合现今主流管理系统的功能模块以及设计方式进行分析&#xff0c;使用nodejs语言和vue.js框架进行开发设计&#xff0c;具体研究内容如下&#xff1a; (1) 管理员主要对个人中心&#xff0c;用户管理&#xff0c;笔记本管理&#xff0c;笔记分享管理&#xff0c;分享…

流量洪峰?不惧!手把手教你应对高并发挑战!

大家好&#xff0c;我是你们的小米&#xff01;今天我要和大家聊一个充满挑战和创意的话题&#xff1a;如何解决瞬时大流量高并发&#xff1f;想必很多小伙伴们在开发过程中都遇到过这个让人头疼的问题吧。别担心&#xff0c;我在这里和你分享我的经验&#xff0c;让你轻松驾驭…

网站巡检的重要性及其后果分析

在信息化、电子化的今天&#xff0c;网站已经成为企业、组织和个人展示形象、传播信息、提供服务的重要窗口。与此同时&#xff0c;为了确保网站的正常运行、用户体验和数据安全&#xff0c;定期的网站巡检就显得尤为重要。以下对网站巡检的重要性和可能的后果进行深入分析&…

部署 ssm 项目到云服务器上(购买云服务器 + 操作远程云服务器 + 服务器中的环境搭建 + 部署项目到服务器)

部署 Web 项目 1、获取 Linux 环境1.1、如何去买一个云服务器1.2、远程操作云服务器1.3、在 Linux 系统中搭建 Java Web 的运行环境。1&#xff09;安装 JDK&#xff08;使用包管理器 yum 来安装&#xff09;2&#xff09; 安装Tomcat3&#xff09;安装 MySQL。 1.4、在云服务器…

南卡开放式耳机再添新品,南卡OE CC会不会成为行业搅局者?

Nank南卡官方于8月25日宣布&#xff0c;将要上线一款百元级性价比神机-南卡OE CC&#xff0c;该新款开放式耳机以“年度开放式耳机百元标杆”为宣传口号&#xff0c;Nank南卡一直以来坚持产品力优先&#xff0c;在研发上一直都很激进&#xff0c;上一代的OE Pro首创了EAA悬停舒…

【无需公网IP】在树莓派上搭建Web站点

目录 1.概述 2.使用 Raspberry Pi Imager 安装 Raspberry Pi OS 3.设置 Apache Web 服务器 3.1测试 web 站点 3.2安装静态样例站点 3.3将web站点发布到公网 3.4安装 Cpolar 3.5cpolar进行token认证 3.6生成cpolar随机域名网址 3.7生成cpolar二级子域名 3.8将参数保存…

使用通信顺序进程(CSP)模型的 Go 语言通道

在并发编程中&#xff0c;许多编程语言采用共享内存/状态模型。然而&#xff0c;Go 通过实现 通信顺序进程&#xff08;CSP&#xff09;模型来区别于众多。在CSP中&#xff0c;程序由不共享状态的并行进程组成&#xff1b;相反&#xff0c;它们通过通道进行通信和同步操作。因此…

多线程常见面试题

常见的锁策略 这里讨论的锁策略,不仅仅局限于 Java 乐观锁 vs 悲观锁 锁冲突: 两个线程尝试获取一把锁&#xff0c;一个线程能获取成功,另一个线程阻塞等待。 乐观锁: 预该场景中,不太会出现锁冲突的情况。后续做的工作会更少。 悲观锁: 预测该场景,非常容易出现锁冲突。后…