文章目录
零、本讲学习目标 一、Spark SQL内置函数 (一)内置函数概述
(二)内置函数演示 1、通过编程方式使用内置函数upper() 2、通过SQL语句的方式使用内置函数upper() 3、演示其它内置函数的使用
二、自定义函数
三、自定义聚合函数 (一)自定义聚合函数概述 (二)演示自定义聚合函数 1、提出任务:实现求员工平均工资功能的UDAF 2、编写程序,完成任务
四、开窗函数 (一)开窗函数概述 (二)开窗函数使用格式 (三)开窗函数案例演示 1、提出任务:统计前3名 2、编写程序,实现功能,完成任务
零、本讲学习目标
了解Spark SQL内置函数 学会使用自定义函数 学会自定义聚合函数 学会使用开窗函数
一、Spark SQL内置函数
(一)内置函数概述
1、10类内置函数
Spark SQL内置了大量的函数,位于API org.apache.spark.sql.functions中。这些函数主要分为10类:UDF函数、聚合函数、日期函数、排序函数、非聚合函数、数学函数、混杂函数、窗口函数、字符串函数、集合函数,大部分函数与Hive中相同。
2、两种使用方式
使用内置函数有两种方式:一种是通过编程的方式使用;另一种是在SQL语句中使用。
(二)内置函数演示
读取HDFS上的people.json
,得到数据帧,执行命令:val peopleDF = spark.read.json("hdfs://master:9000/input/people.json")
显示数据帧内容 导入Spark SQL内置函数,执行命令:import org.apache.spark.sql.functions._
1、通过编程方式使用内置函数upper()
利用upper()
函数将姓名转成大写,执行命令:peopleDF.select(upper(col("name")).as("name")).show()
上述代码中,使用select()方法传入需要查询的列,使用as()方法指定列的别名。代码col("name")
指定要查询的列,也可以使用$"name"
代替,但是需要导入import spark.implicits._
,执行命令:peopleDF.select(upper($"name").as("name")).show()
对某列使用了内置函数,如果还要显示其它列,就会报错
2、通过SQL语句的方式使用内置函数upper()
定义临时视图,执行命令:peopleDF.createTempView("t_people")
执行命令:spark.sql("select upper(name) as name from t_people").show()
执行命令:spark.sql("select upper(name) as name, age from t_people").show()
3、演示其它内置函数的使用
打印Schema信息,执行命令:peopleDF.printSchema()
查询name
列,执行命令:peopleDF.select("name").show()
可用SQL语句方式来完成同样的任务 查询name
列和age
列,其中将age
列的值增加1
,执行命令:peopleDF.select($"name", $"age" + 1).show()
可用SQL语句方式来完成同样的任务 查询年龄大于21的记录,执行命令:peopleDF.filter($"age" > 21).show()
可用SQL语句方式来完成同样的任务 根据age
进行分组,并求每一组的数量,执行命令:peopleDF.groupBy("age").count().show()
可用SQL语句方式来完成同样的任务
二、自定义函数
(一)自定义函数概述
当Spark SQL提供的内置函数不能满足查询需求时,用户可以根据自己的业务编写自定义函数(User Defined Functions,UDF),然后在Spark SQL中调用。
(二)演示自定义函数
1、提出任务:手机号保密
有这样一个需求:为了保护用户的隐私,当查询数据的时候,需要将用户手机号的中间4位用星号()代替,比如手机号158 ***1170。这时就可以写一个自定义函数来实现这个需求。
2、编写程序,完成任务
创建SparkSQLUDF
单例对象
package net. hw. sparksql
import org. apache. spark. rdd. RDD
import org. apache. spark. sql. types. { StringType, StructField, StructType}
import org. apache. spark. sql. { Row, SparkSession}
object SparkSQLUDF {
def main( args: Array[ String ] ) : Unit = {
val spark = SparkSession. builder( )
. appName( "SparkSQLUDF" )
. master( "local[*]" )
. getOrCreate( )
val arr = Array( "15892925678" , "13567892345" , "18034561290" , "13967678901" )
val rdd: RDD[ String ] = spark. sparkContext. makeRDD( arr)
val rowRDD: RDD[ Row] = rdd. map( line => Row( line) )
val schema = StructType(
List {
StructField( "phone" , StringType, true )
}
)
val df = spark. createDataFrame( rowRDD, schema)
val phoneUDF = ( phone: String ) => {
var result = "手机号码错误!"
if ( phone != null && phone. length == 11 ) {
val buffer = new StringBuffer( )
buffer. append( phone. substring( 0 , 3 ) )
buffer. append( "****" )
buffer. append( phone. substring( 7 ) )
result = buffer. toString
}
result
}
spark. udf. register( "phoneHide" , phoneUDF)
df. createTempView( "t_phone" )
spark. sql( "select phoneHide(phone) as phone from t_phone" ) . show( )
}
}
上述代码通过spark.udf.register()
方法注册一个自定义函数phoneHide
,然后使用spark.sql()
方法传入SQL
语句,在SQL
语句中调用自定义函数phoneHide
并传入指定的列,该列的每一个值将依次被自定义函数phoneHide
处理。 运行程序,查看结果
三、自定义聚合函数
(一)自定义聚合函数概述
Spark SQL提供了一些常用的聚合函数,如count()、countDistinct()、avg()、max()、min()等。此外,用户也可以根据自己的业务编写自定义聚合函数(User Defined AggregateFunctions,UDAF)。 UDF主要是针对单个输入返回单个输出,而UDAF则可以针对多个输入进行聚合计算返回单个输出,功能更加强大。
(二)演示自定义聚合函数
1、提出任务:实现求员工平均工资功能的UDAF
员工工资数据存储于HDFS上/input
目录里的employees.json
文件中
2、编写程序,完成任务
创建MyAverage
类,继承UserDefinedAggregateFunction
类
package net. hw. sparksql
import org. apache. spark. sql. { Row, SparkSession}
import org. apache. spark. sql. expressions. { MutableAggregationBuffer, UserDefinedAggregateFunction}
import org. apache. spark. sql. types. { DataType, DoubleType, LongType, StructField, StructType}
class MyAverage extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType (
List( StructField( "inputColumn" , LongType) )
)
override def bufferSchema: StructType = StructType(
List(
StructField( "sum" , LongType) ,
StructField( "count" , LongType)
)
)
override def dataType: DataType = DoubleType
override def deterministic: Boolean = true
override def initialize( buffer: MutableAggregationBuffer) : Unit = {
buffer( 0 ) = 0L
buffer( 1 ) = 0L
}
override def update( buffer: MutableAggregationBuffer, input: Row) : Unit = {
if ( ! input. isNullAt( 0 ) ) {
buffer( 0 ) = buffer. getLong( 0 ) + input. getLong( 0 )
buffer( 1 ) = buffer. getLong( 1 ) + 1
}
}
override def merge( buffer1: MutableAggregationBuffer, buffer2: Row) : Unit = {
buffer1( 0 ) = buffer1. getLong( 0 ) + buffer2. getLong( 0 )
buffer1( 1 ) = buffer1. getLong( 1 ) + buffer2. getLong( 1 )
}
override def evaluate( buffer: Row) : Double = buffer. getLong( 0 ) . toDouble / buffer. getLong( 1 )
}
object MyAverage {
def main( args: Array[ String ] ) : Unit = {
val spark = SparkSession. builder( )
. appName( "SparkSQLUDF" )
. master( "local[*]" )
. getOrCreate( )
spark. udf. register( "myAverage" , new MyAverage)
val df = spark. read. json( "hdfs://master:9000/input/employees.json" )
df. show( )
df. createOrReplaceTempView( "employees" )
val result = spark. sql( "select myAverage(salary) as average_salary from employees" )
result. show( )
spark. toString
}
}
运行程序,查看结果
四、开窗函数
(一)开窗函数概述
row_number()
开窗函数是Spark SQL中常用的一个窗口函数,使用该函数可以在查询结果中对每个分组的数据,按照其排序的顺序添加一列行号(从1开始),根据行号可以方便地对每一组数据取前N行(分组取TOPN)。
(二)开窗函数使用格式
row_number( ) over ( partition by 列名 order by 列名 desc ) 行号列别名
partition by:按照某一列进行分组 order by:分组后按照某一列进行组内排序 desc:降序,默认升序
(三)开窗函数案例演示
1、提出任务:统计前3名
统计每一个产品类别的销售额前3名(相当于分组求TOPN)
2、编写程序,实现功能,完成任务
创建SparkSQLWindowFunctionDemo
单例对象
package net. hw. sparksql
import org. apache. spark. sql. types. { IntegerType, StringType, StructField, StructType}
import org. apache. spark. sql. { Row, SparkSession}
object SparkSQLWindowFunctionDemo {
def main( args: Array[ String ] ) : Unit = {
val spark = SparkSession. builder( )
. appName( "SparkSQLUDF" )
. master( "local[*]" )
. getOrCreate( )
val arr = Array(
"2022-05-10,A,710" ,
"2022-05-10,B,530" ,
"2022-05-10,C,670" ,
"2022-05-11,A,520" ,
"2022-05-11,B,730" ,
"2022-05-11,C,610" ,
"2022-05-12,A,500" ,
"2022-05-12,B,700" ,
"2022-05-12,C,650" ,
"2022-05-13,A,620" ,
"2022-05-13,B,690" ,
"2022-05-13,C,700" ,
"2022-05-14,A,720" ,
"2022-05-14,B,680" ,
"2022-05-14,C,590"
)
val rowRDD = spark. sparkContext
. makeRDD( arr)
. map( line => Row(
line. split( "," ) ( 0 ) ,
line. split( "," ) ( 1 ) ,
line. split( "," ) ( 2 ) . toInt
) )
val structType = StructType(
List(
StructField( "date" , StringType, true ) ,
StructField( "type" , StringType, true ) ,
StructField( "money" , IntegerType, true )
) )
val df = spark. createDataFrame( rowRDD, structType)
df. createTempView( "t_sales" )
spark. sql(
"""
|select date, type, money,
| row_number() over (partition by type order by money desc) rank
| from t_sales
|""" . stripMargin
) . show( )
spark. sql(
"""
|select date, type, money, rank from
| (
| select date, type, money,
| row_number() over (partition by type order by money desc) rank
| from t_sales
| ) sale
|where sale.rank <= 3
|""" . stripMargin
) . show( )
}
}
运行程序,查看结果