Spark高级用法-自定义函数

news2024/11/23 16:22:01

用户可以根据需求自己封装计算的逻辑,对字段数据进行计算

内置函数,是spark提供的对字段操作的方法 ,split(字段) 对字段中的数进行切割,F.sum(字段) 会将该字段下的数据进行求和

实际业务中又能内置函数不满足计算需求,此时就需要自定义行数,完成字段数据的业务处

 函数分类

  • udf
    • 自定义
    • 一进一出
  • udaf
    • 聚合
    • 自定义
    • 多进一出
  • udtf
    • 爆炸
    • 一进多出

UDF函数

对每一行数据以此进行计算,返回每一行的结果 

1)不带装饰器

# UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *

ss = SparkSession.builder.getOrCreate()

# 读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')

df.show()

# 自定义字符串长度计算函数
# @F.udf(returnType=IntegerType())  # 使用装饰器注册函数,只能在DSL方法中使用,不能在SQL中使用
def len_func(field):
    """
        自定义函数,函数名可以自己指定
    :param field: 是用来结构处理的字段数据,可以定义多个。根据实际处理的字段数量决定定义多少个接收参数
    :return: 返回处理后的数据
    """
    if field is None:
        return 0
    else:
        data = len(field)
        return data

# 将自定义的函数注册到spark中使用
# 参数一 指定spark中使用函数的名
# 参数二  指定自定义函数的名
# 参数三  指定函数的返回值类型
# 接收参数  定义和函数名一样的值
len_func = ss.udf.register('len_func',len_func,returnType = IntegerType())

# 在spark中使用
df2 = df.select('id','name','gender',len_func('name'))
df2.show()


# 使用sql语句
df.createTempView('stu')

df3 = ss.sql('select * ,len_FUNC(name) from stu')
df3.show()

2)带有装饰器

# UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *

ss = SparkSession.builder.getOrCreate()

# 读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')

df.show()

# 自定义字符串长度计算函数
@F.udf(returnType=IntegerType())  # 使用装饰器注册函数,只能在DSL方法中使用,不能在SQL中使用
def len_func(field):
    """
        自定义函数,函数名可以自己指定
    :param field: 是用来结构处理的字段数据,可以定义多个。根据实际处理的字段数量决定定义多少个接收参数
    :return: 返回处理后的数据
    """
    if field is None:
        return 0
    else:
        data = len(field)
        return data

# 在spark中使用
df2 = df.select('id','name','gender',len_func('name'))
df2.show()

装饰器注册

  • 只能在DSL方法中使用,在sql语句中无法使用

UDAF函数

多进一出 主要是聚合

使用pandas中的series实现,可以读取一列数据存储在pandas的seriess中进行数据的聚合

# 读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='id int,name string,gender string,age int,cls string')

df.show()

# 自定义udaf函数
# 装饰器注册
@F.pandas_udf(returnType=IntegerType())
# 自定义udaf函数
# fileds:pd.Series 给数据字段指定一个类型
#  -> float 指定返回值类型
# udaf函数注册需要两步,第一步现指定装饰器
def sub(filed:pd.Series) -> int:
    """
        自定义udaf函数,实现累减
    :param field: 接收的字段列数据  pd.Series声明字段数据的类型,接收一列数据可以使用pandas的series类型
    :return:
    """
    # field是series,就按照series方式操作
    n = filed[0] # 取出第一个值作为初始值

    for i in filed[1::]:
        n-=i
    return n

# regidter方法注册
sub = ss.udf.register('sub',sub)

# 使用udaf函数  缺少  PyArrow  pandas中series类型交个spark程序无法识别,spark是有scala实现,scala中没有对应的series类型
# 可以使用 PyArrow框架将series转为scale能识别的数据类型
df2 = df.select(sub('age'))
df2.show()

  • arrow框架 pyarrow
    • Apache Arrow 是一种内存中的列式数据格式,用于Spark中,以在JVM和Python进程之间有效地传输数据。目前这对使用 Pandas/NumPy 数据的 Python 用户最有益,提升传输速度。

    • 在线安装 三台机器安装

      • 进入虚拟环境 conda activate base

      • 在线安装 pip install pyspark[sql] -i Verifying - USTC Mirrors

    • 离线安装 三台机器安装

      • pip install pyarrow-10.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl

 

 安装pyarrow

conda activate base
pip install pyspark[sql] -i  https://pypi.mirrors.ustc.edu.cn/simple/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2211407.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI如何对产品设计带来更多的可能性?

AI(人工智能)对产品设计带来了广泛而深远的可能性,这些可能性主要体现在以下几个方面 1.创新设计的激发 创意生成:AI能够学习和模仿人类设计师的创作过程,通过深度学习等技术生成全新的、独特的设计概念。这些概念可能源于对大量设计案例的学习和分析&am…

AMD在Advancing AI发布会上发布三大核心硬件产品,挑战英伟达AI芯片市场

美国时间10月10日,AMD在旧金山召开了Advancing AI发布会。 这是老对手英伟达2024 AI Summit 结束后的第三天。与英伟达大会的不同之处在于,英伟达专注于软件方面的更新,而AMD主打“硬”牌。 这一次,他们带来了三款核心硬件产品&…

Python 批量转换 Shapefile 为 GeoJSON

批量转换 Shapefile (.shp) 为 GeoJSON 文件的脚本详解 🗺️🔄 在地理信息系统(GIS)和遥感领域,Shapefile(.shp)格式与GeoJSON格式是两种常用的数据格式。Shapefile 作为矢量数据的标准格式之一…

使用scss生成旋转圆圈

图片 html代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title>…

直播相关04-录制麦克风声音, 通过编程录音

一 通过编程录音 开发录音功能的主要步骤是&#xff1a; 注册设备获取输入格式对象打开设备采集数据释放资源 需要用到的FFmpeg库有4个。 extern "C" { // 设备相关API #include <libavdevice/avdevice.h> // 格式相关API&#xff0c;也就是说&#xff0c;win…

Mysql(2)—SQL语法详解(通俗易懂)

一、关于SQL 1.1 简介 SQL&#xff08;Structured Query Language&#xff0c;结构化查询语言&#xff09;是一种用于管理关系型数据库的标准编程语言。它主要用于数据的查询、插入、更新和删除等操作。SQL最初在1970年代由IBM的研究人员开发&#xff0c;旨在处理关系数据模型…

Python基础常见面试题总结

文章目录 1.深拷贝与浅拷贝2.迭代器3.生成器4.装饰器5.进程、线程、协程6.高阶函数7.魔法方法8.python垃圾回收机制 1.深拷贝与浅拷贝 浅拷贝是对地址的拷贝&#xff0c;只拷贝第一层&#xff0c;第一层改变的时候不会改变&#xff0c;内层改变才会改变。深拷贝是对值的拷贝&a…

【第十六周】回顾线性回归与逻辑回归以及它们的详细推导过程

目录 摘要Abstract1.线性回归1.1.一元线性回归1.1.1.函数凹凸性判断 1.2.多元线性回归1.3.进一步理解梯度下降法 2.逻辑回归2.1.信息论角度推导交叉熵损失函数2.2.概率论角度推导交叉熵损失函数 3.额外阅读&#xff1a;Label Smoothing3.1.One-hot 和 Label Smoothing 的优缺点…

解决报错:Invalid number of channels [PaErrorCode -9998]

继昨天重装了树莓派系统后&#xff0c;今天开始重新安装语音助手。在测试录音代码时遇到了报错“Invalid number of channels [PaErrorCode -9998]”&#xff0c;这是怎么回事&#xff1f; 有人说这是因为pyaudio没有安装成功造成的。于是&#xff0c;我pip3 install –upgrad…

利用python创建接口

目录 1. 创建一个简单的接口1.1 具体过程1.2 代码解读1. **导入 Flask**2. **创建 Flask 应用**3. **定义一个路由**4. **运行应用** 1.3 遗留问题 2. 创建一个复杂接口2.2 具体过程 1. 创建一个简单的接口 1.1 具体过程 from flask import Flaskapp Flask(__name__)app.rou…

pip安装指定版本的tensorflow

安装CPU版本&#xff1a;(以2.9.0版本为例) pip install tensorflow2.9.0安装GPU版本&#xff1a;(以2.9.0版本为例) pip install tensorflow-gpu2.9.0若下载缓慢&#xff0c;使用阿里国内镜像源加速下载&#xff1a;(以2.9.0版本为例) pip install -i https://mirrors.aliy…

一些硬件知识【20241013】

3C认证要花很多钱&#xff1a; X电容可以滤除差模信号干扰&#xff0c;当火线上有高频干扰信号时候&#xff0c;X电容利用两端压差将干扰送到N: Y电容针对于零火线上有相位相同的共模干扰信号的时候&#xff0c;将干扰导向大地&#xff1a; 电阻上并联一个电容有什么作用&#…

mac安装homebrew和git

简介 由于把自己的新mac拿来撸代码&#xff0c;开始环境搭建&#xff0c;安装各种工具和依赖&#xff0c;安装 git 需要先安装 homebrew&#xff0c;然后就遇到了 homebrew 安装失败的问题。 curl: (7) Failed to connect to raw.githubusercontent.com port 443: Connection…

多字节字符集MFC使用 Windows Visual Styles

新建一个记事本&#xff0c;然后添加以下代码 <?xml version"1.0" encoding"UTF-8" standalone"yes"?> <assembly xmlns"urn:schemas-microsoft-com:asm.v1" manifestVersion"1.0"><trustInfo xmlns"…

STM32 | STM32F4OTA_ESP8266_Bootloader为引导程序远程更新的代码(APP)

更新。点击上方"蓝字"关注我们 01、思路 >>> STM32F4OTA_ESP8266_Bootloader为引导程序 远程更新的代码&#xff08;APP&#xff09;:远程更新的APP Ymoden_server&#xff1a;为运行在Linux的TCP服务器 备注&#xff1a;STM32 OTA远程更新需要连接热点 电…

地级市-国内旅游收入、国内旅游人数数据(2000-2023年)

国内旅游收入是指国内游客在旅行过程中的全部花费&#xff0c;包括交通、参观游览、住宿、餐饮、购物和娱乐等。这一指标不包括国际游客在国内的消费&#xff0c;主要反映国内旅游市场的经济规模和发展水平&#xff0c;是评估旅游行业对国民经济贡献的重要参数。 地级市-国内旅…

安全可靠测评结果公告(2024年第2号)

大家可以选择对应的数据库&#xff0c;中央处理器&#xff0c;供参考

【C++】--内存管理

&#x1f47e;个人主页: 起名字真南 &#x1f47b;个人专栏:【数据结构初阶】 【C语言】 【C】 目录 1 C/C内存分布2 C语言中动态内存管理方式 &#xff1a;3 C内存管理方式3.1 new/delete操作内置类型3.2 new和delete操作自定义类型 4 operator new与operator delete4.1 opera…

Cortex-M 内核的 OS 特性

目录 一、通用堆栈知识二、双堆栈用法三、PendSV 中断介绍和用法四、SVC 软中断介绍和用法五、特权级和非特权级使用方法 一、通用堆栈知识 在前面讲解 STM32 启动文件的时候就已经提到过&#xff0c;有关堆栈大小的设置是在启动文件中设置的&#xff1a; Heap 主要用于 Mal…

学习Redisson实现分布式锁

官网&#xff1a;https://redisson.org/ 官方文档&#xff1a;https://redisson.org/docs/getting-started/ 官方中文文档&#xff1a;https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95 1、引入依赖 <!--redisson--> <dependency><groupId>or…