UDF:一对一的函数【User Defined Functions】
- substr、split、concat、instr、length、from_unixtime
UDAF:多对一的函数【User Defined Aggregation Functions】 聚合函数
- count、sum、max、min、avg、collect_set/list
UDTF:一对多的函数【User Defined Tabular Functions】
-
explode、json_tuple【解析JSON格式】、parse_url_tuple【解析URL函数
Spark中支持UDF和UDAF两种,支持直接使用Hive中的UDF、UDAF、UDTF. pyspark中自定义函数的三种写法:
register
import os
from pyspark.sql import SparkSession
def func(line):
list1 = line.split("/")
return str(list1[0]) + "斤/" + str(list1[1]) + "cm"
if __name__ == '__main__':
# 配置环境
os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'
spark = SparkSession.builder \
.master("local[2]") \
.appName("第一个sparksql案例") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()
df = spark.read.format("csv").option("sep", "\t").option("header", True) \
.load("../../resources/input/a.txt")
df.createOrReplaceTempView("a")
spark.sql("""
select id,name,concat(split(msg,"/")[0],"斤/",split(msg,"/")[1],"cm") msg from a
""").show()
# 将编写好的python方法变成spark的sql方法
spark.udf.register("sqlfunc", func)
spark.sql("""
select id,name,sqlfunc(msg) msg from a
""").show()
spark.stop()
udf注册方式定义UDF函数
不常用,只能用于DSL开发中
定义:
UDF变量名 = F.udf(函数的处理逻辑, 返回值类型)
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
"""
------------------------------------------
Description : TODO:
SourceFile : _04自定义函数
Author : 老闫
Date : 2024/11/6 星期三
-------------------------------------------
"""
if __name__ == '__main__':
# 配置环境
os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
spark = SparkSession.builder.master("local[2]").appName("").config(
"spark.sql.shuffle.partitions", 2).getOrCreate()
df = spark.read.format("csv").option("sep","\t").option("header","true").load("../../datas/function/a.txt")
df.createOrReplaceTempView("a")
# 第二种方案:使用自定义函数
def myfunc(msg):
return msg.split("/")[0]+"斤/"+msg.split("/")[1]+"cm"
myfunc2= F.udf(myfunc, returnType=StringType())
# 假如自定义函数使用了register 的方式,不仅sql可以使用,DSL语法也可以使用
df.select(F.col("id"),F.col("name"),myfunc2(F.col("msg")).alias("msgg")).show()
spark.stop()