【博学谷学习记录】超强总结,用心分享|狂野大数据课程【Spark SQL函数定义】的总结分析

news2025/1/22 17:05:01

5.1 如何使用窗口函数

回顾:

窗口函数格式:
	分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])

学习的相关分析函数有那些? 
	第一类: row_number() rank() dense_rank() ntile()
	第二类: 和聚合函数组合使用  sum() avg() max() min() count()
	第三类: lag() lead() first_value() last_value()

如何在Spark SQL中使用呢?

  • SQL中: 与HIVE中应用基本没啥区别, 更多关注的是DSL写法
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window as win
import os

# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("演示: 如何在Spark SQL中使用窗口函数...")

    # 1- 创建SparkSession对象
    spark = SparkSession.builder.appName('df_write').master('local[*]').getOrCreate()

    # 2-读取外部文件的数据
    df = spark.read.csv(
        path='file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/pv.csv',
        header=True,
        inferSchema=True
    )
    df.createTempView('t1')
    # 3- 执行相关的操作
    # 需要: 统计每个cookie中, pv数量排名前二内容是哪些? (分组TOPN 问题)
    # SQL
    spark.sql("""
        with t2 as(
            select
                *,
                row_number() over (partition by cookieid order by pv desc) as rank1
            from t1 
        )
        select  * from  t2 where rank1 <=2
    """).show()

    # DSL:
    df.select(
        '*',
        F.row_number().over(win.partitionBy('cookieid').orderBy(F.desc('pv'))).alias('rank1')
    ).where('rank1 <= 2').show()

2 SQL函数的分类说明

整个SQL函数, 主要是分为以下三大类:

  • UDF函数: 用户自定义函数
    • 表示: 一进一出
    • 整个函数中, 大多数的函数都是属于一进一出的函数: split() substr()
  • UDAF函数: 用户自定义聚合函数
    • 表示: 多进一出
    • 例如: sum() avg() count() ….
  • UDTF函数: 用户自定义表生成函数
    • 表示: 一进多出
    • 指的: 进去一行数据, 最终产生多行 或者多列的数据
    • 例如: explode

在SQL中提供的内置函数, 都是属于以上三类中某一类函数

思考: 提供了那么多的函数, 为啥还需要自定义函数呢?

扩充函数. 在实际使用中, 并不能保证所有的操作可能用的函数都已经提前的内置好, 尤其是很多具有特殊业务处理功能, 其实并没有相对应函数 , 提供的函数更多是以公共的功能为主函数, 此时需要进行自定义, 从而扩充新的功能

​ 在Spark SQL中, 对于自定义函数, 原生支持的粒度并不是特别好, 目前原生的PY方案仅支持自定义UDF函数, 无法自定义UDAF函数和UDTF函数, 在1.6版本之后, Java 和scala语言支持了自定义UDAF函数,但是Python并不支持,Spark官方提供了解决的方案: 基于pandas来自定义UDF 和 UDAF函数. 但是对于UDTF函数, Spark是不支持自定义,但是Spark支持HIVE的函数定义, 所以可以通过HIVE自定义函数来解决

在这里插入图片描述

	虽然Python支持自定义UDF函数, 但是其效率并不是特别的高效, 因为在使用的时候, 传递一行处理一行, 返回一行的操作, 这样会带来非常大的序列化开销问题, 以及网络开销问题, 导致原生UDF函数效率不好
	
	早期解决方案: 基于 scala/Java来编写自定义UDF函数, 然后基于Python使用即可
	
	目前主要解决方案: 引入Arrow框架, 可以基于内存来完成数据传输工作, 可以大大降低了序列化开销问题, 提供传输的效率, 解决了原生问题, 同时还可以基于pandas的自定义函数, 利用pandas函数优势, 完成各种处理操作
	
	所以后期主推的方案: 基于pandas 自定义函数, 然后底层基于arrow完成数据传输工作

3 Spark SQL原生自定义函数

如何自定义原生函数流程(非常重要):

第一步: 在Python中创建一个python的函数, 在这个函数中书写自定义函数的功能逻辑代码即可

第二步: 将Python函数注册到Spark SQL中, 成为Spark SQL的函数
	注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)
		参数1: UDF函数的名称, 此名称用于后续在SQL语法中使用 , 可以任意定义名称, 但是要符合定义名称规范
		参数2: python函数的名称, 表示将哪个python的函数注册为Spark SQL的函数
		参数3: 返回值的类型, 用于表示当前这个Python的函数返回的类型对应的Spark SQL的数据类型
		udf对象: 此对象主要是用于DSL中
	
	注册方式二:  udf对象 = F.udf(参数1,参数2)
		参数1: python函数的名称, 表示将哪个python的函数注册为Spark SQL的函数
		参数2: 返回值的类型, 用于表示当前这个Python的函数返回的类型对应的Spark SQL的数据类型
		udf对象: 此对象主要是用于DSL中
		
		说明: 此种方式还支持语法糖写法:  @F.udf(returnType=返回值类型) 需要放置到对应函数上面
第三步: 在Spark SQL的DSL/SQL中进行使用即可

演示操作: 请自定义一个函数, 完成对数据统一添加一个后缀名的操作

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os

# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("演示原生的自定义函数:")

    # 1- 创建SparkSession对象
    spark = SparkSession.builder.appName('df_write').master('local[*]').getOrCreate()

    # 2- 初始化一些数据
    df = spark.createDataFrame(
        data=[(1,'张三','北京'),(2,'李四','上海'),(3,'王五','广州'),(4,'赵六','深圳'),(5,'田七','杭州')],
        schema='id int,name string,address string'
    )
    df.createTempView('t1')
    # 3- 执行相关的操作:
    # 请自定义一个函数, 完成对数据统一添加一个后缀名的操作
    # 3.1 定义一个Python的函数, 接收一个数据, 给数据添加一个后缀返回
    @F.udf(returnType=StringType())
    def add_post(data):
        return data+'_boxuegu'

    # 3.2 对函数进行注册操作
    # 注册方式一
    # 当采用注解方式注册函数的使用, 如果依然想在SQL中使用, 可以再次使用方式一注册,但是不需要设置返回值类型了
    spark.udf.register('add_post_sql',add_post)

    # 注册方式二: 还有一种语法糖模式
    #add_post_dsl = F.udf(add_post,StringType())

    # 3.3 使用自定义函数
    # SQL
    spark.sql("""
        select
            *,
            add_post_sql(address) as r1
        from t1
    """).show()

    # DSL
    df.select(
        '*',
        add_post('address').alias('r1')
    ).show()

演示操作: 自定义一个函数, 让其返回值的类型为字典 列表 元组

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os

# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("演示原生的自定义函数:")

    # 1- 创建SparkSession对象
    spark = SparkSession.builder.appName('df_write').master('local[*]').getOrCreate()

    # 2- 初始化一些数据
    df = spark.createDataFrame(
        data=[(1,'张三 北京'),(2,'李四 上海'),(3,'王五 广州'),(4,'赵六 深圳'),(5,'田七 杭州')],
        schema='id int,name_address string'
    )
    df.createTempView('t1')
    # 3- 执行相关的操作:
    # 需求: 自定义一个函数, 将姓名和地址拆分开
    schema = StructType().add('name',StringType()).add('address',StringType())

    @F.udf(returnType=schema)
    def split_data(data):
        res = data.split(' ')
        #return [res[0],res[1]]
        #return (res[0], res[1])
        return {'name':res[0],'address':res[1]}
    # 使用字典返回, key值 必须和schema中定义字段名称保持一致

    # 注册函数
    spark.udf.register('split_data',split_data)

    # 使用函数
    # SQL
    df1 = spark.sql("""
        select
            *,
            split_data(name_address)['name'] as name,
            split_data(name_address)['address'] as address
        from t1
    """)

    df1.printSchema()
    df1.show()

    # DSL
    df.select(
        '*',
        split_data('name_address')['name'].alias('name'),
        split_data('name_address')['address'].alias('address')
    ).show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/390569.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

西电软件体系结构核心考点汇总(期末真题+核心考点)

文章目录前言一、历年真题二、核心考点汇总2.1 什么是软件体系架构?(软件体系结构的定义)2.2 架构风格优缺点2.3 质量属性2.4 质量评估前言 主要针对西安电子科技大学《软件体系结构》的核心考点进行汇总。 【期末期间总结资料如下】 针对西电计科院软件工程专业大三《软件体…

【QT】使用QML构建一个简易的计算器界面(三)

前面两篇对计算器界面的布局和显示以及实现功能做了相关优化&#xff0c;但是对输入显示那一块还没有具体的处理步骤&#xff0c;包括对输入表达式的合法性检查&#xff0c;显示框的多行历史显示等功能还需要添加&#xff0c;接下来将从这几个方面对这些功能进行添加。 1、对输…

概率论 1.3 古典概型与几何概型

1.3.1 排列与组合排列从n个不同元素任取r(r<n)个元素排成一列(考虑元素出现的先后次序)&#xff0c;称此为一个排列&#xff0c;此种排列的总数为n(n-1)....(n-r1)n!/(n-r)&#xff01;&#xff0c;若rn,则称为全排列&#xff0c;2.重复排列从n个不同元素中每次取出一个,放回…

GPIO输入和输出以及八种工作模式

一.GPIO的简介 GPIO &#xff08;general purpose input output&#xff09;是通用输入输出端口的简称&#xff0c;简单来说就是软件可控制的引脚&#xff0c;STM32芯片的GPIO引脚与外部传感器连接起来&#xff0c;从而实现与外部通讯、控制以及数据采集的功能。 1.引脚全是GP…

[2.1.1]进程管理——进程的概念、组成、特征

文章目录第二章 进程管理进程的概念、组成、特征&#xff08;一&#xff09;进程的概念&#xff08;二&#xff09;进程的组成——PCB&#xff08;三&#xff09;进程的组成——程序段、数据段补充&#xff1a;程序是如何运行的&#xff1f;&#xff08;四&#xff09;进程的特…

vue3 插槽使用详解

目录1 前言2 插槽的使用2.1 基本使用2.2 具名插槽2.3 动态插槽名2.4 插槽传值3 总结1 前言 Vue 实现了一套内容分发的 API&#xff0c;将 <slot> 元素作为承载分发内容的出口&#xff0c;使用插槽使得vue组件的设计更加灵活。 在vue版本更迭中&#xff0c;尽管插槽的使…

常用的设计模式之一(创建型模式)

设计模式可分为三大类&#xff1a; 创建型模式 (Creational Patterns)结构性模式 (Structural Patterns)行为型模式 (Behavioral Patterns) 模式描述包括创建型模式工厂模式&#xff08;Factory Pattern&#xff09; 抽象工厂模式&#xff08;Abstract Factory Pattern&#…

并发编程——可见性与有序性

如果有兴趣了解更多相关内容&#xff0c;欢迎来我的个人网站看看&#xff1a;耶瞳空间 JMM即Java Memory Model&#xff0c;它定义了主存、工作内存抽象概念&#xff0c;底层对应着CPU寄存器、缓存、硬件内存、CPU指令优化等。JMM体现在以下几个方面&#xff1a; 原子性&…

Web API

DOM API 1、选中页面元素 let elem document.querySelector(CSS选择器); console.log(elem); console.dir(elem); 2、事件 鼠标点击事件 onclick 鼠标移动事件 onmousemove 等等 事件源 .box&#xff0c;事件类型 onlick&#xff0c;事件处理方式 alert(hello) let d…

[Mybatis1]介绍与快速入门

文章目录 Mybatis概述 持久层 框架 Mybatis与JDBC对比 JDBC代码的缺陷 Mybatis简化JDBC Mybatis快速入门案例 整体案例项目结构 1.创建user表&#xff0c;添加数据 2.创建Maven项目&#xff0c;导入坐标 3.编写Mybatis核心配置文件 4.编写数据库返回对象的实体类 5. 编写S…

QML Button详解

1.Button简介 Button表示用户可以按下或单击的按钮控件。按钮通常用于执行一个动作&#xff0c;或回答一个问题。典型的按钮有确定、应用、取消、关闭、是、否和帮助。 Button继承自AbstractButton&#xff0c;提供了以下几种信号。 void canceled() //当按…

Python笔记 -- 列表

文章目录1、列表简介2、修改、添加、删除元素2.1、添加2.2、删除3、排序、倒序4、遍历列表5、创建数值列表6、列表切片7、列表复制8、元组1、列表简介 在Python中用方括号[]表示列表&#xff0c;用逗号隔开表示其元素 通过索引访问列表 names [aa,bb,cc,dd]print(names[0]) …

游戏项目中的程序化生成(PCG):算法之外的问题与问题

本篇讨论的是什么 从概念上讲&#xff0c;PCG&#xff08;程序化生成&#xff09;的含义很广&#xff1a;任何通过规则计算得到的内容&#xff0c;都可算作是PCG。但在很多游戏项目的资料&#xff0c;包括本篇&#xff0c;讨论PCG时特指是&#xff1a;用一些算法/工具(特别是H…

C语言-基础了解-13-C enum枚举

C enum枚举 一、C枚举 枚举是 C 语言中的一种基本数据类型&#xff0c;用于定义一组具有离散值的常量。&#xff0c;它可以让数据更简洁&#xff0c;更易读。 枚举类型通常用于为程序中的一组相关的常量取名字&#xff0c;以便于程序的可读性和维护性。 定义一个枚举类型&a…

3.2 LED闪烁流水灯蜂鸣器

LED闪烁1.1 电路连接示意图LED采用低电平点亮的方式&#xff0c;利用ST-Link的3.3V进行供电。1.2程序设计1.21知识储备GPIO配置步骤步骤&#xff1a;1. 第⼀步&#xff0c;使⽤RCC开启GPIO的时钟2. 第⼆步&#xff0c;使⽤GPIO_Init()函数初始化GPIO3. 第三步&#xff0c;使⽤输…

JavaWeb--会话技术

会话技术1 会话跟踪技术的概述2 Cookie2.1 Cookie的基本使用2.2 Cookie的原理分析2.3 Cookie的使用细节2.3.1 Cookie的存活时间2.3.2 Cookie存储中文3 Session3.1 Session的基本使用3.2 Session的原理分析3.3 Session的使用细节3.3.1 Session钝化与活化3.3.2 Session销毁目标 理…

java坦克大战(1.0)

坦克大战 后面开始学习怎么使用java制造一个坦克大战游戏 但是不是直接开始做&#xff0c;而是随着这个游戏程序的制造&#xff0c;一边学习新知识融入到游戏中。包括多线程&#xff0c;反射&#xff0c;IO流… Java坐标体系 在几乎所有的坐标中都有一个x轴和y轴&#xff0c…

大数据项目实战之数据仓库:用户行为采集平台——第1章 数据仓库概念

第1章 数据仓库概念 数据仓库&#xff08;Data Warehouse&#xff09;&#xff0c;是为企业制定决策&#xff0c;提供数据支持的。可以帮助企业改进业务流程、提高产品质量等。 数据仓库的输入数据通常包括&#xff1a;业务数据、用户行为数据和爬虫数据等 业务数据&#xf…

Java - 对象的比较

一、问题提出 前面讲了优先级队列&#xff0c;优先级队列在插入元素时有个要求&#xff1a;插入的元素不能是null或者元素之间必须要能够进行比较&#xff0c;为了简单起见&#xff0c;我们只是插入了Integer类型&#xff0c; 那优先级队列中能否插入自定义类型对象呢&#xf…

深入理解JDK动态代理原理,使用javassist动手写一个动态代理框架

文章目录一、动手实现一个动态代理框架1、初识javassist2、使用javassist实现一个动态代理框架二、JDK动态代理1、编码实现2、基本原理&#xff08;1&#xff09;getProxyClass0方法&#xff08;2&#xff09;总结写在后面一、动手实现一个动态代理框架 1、初识javassist Jav…