在上一篇文章中,我们对第一种用户定义函数(UDF)进行了基础介绍。接下来,本文将带您深入了解剩余的两种UDF函数类型。
文章目录
- 1. UDAF
- 1.1 简单UDAF
- 1.2 通用UDAF
- 2. UDTF
- 3. 总结
1. UDAF
1.1 简单UDAF
第一种方式是 Simple(简单) 方式,即继承 org.apache.hadoop.hive.ql.exec.UDAF 类,并在派生类中以静态内部类的方式实现 org.apache.hadoop.hive.ql.exec.UDAFEvaluator 接口。这个计算类将负责执行具体的聚合逻辑,具体步骤如下:
a)初始化(init):首先,我们需要实现UDAFEvaluator接口的init方法,用于初始化聚合过程中所需的任何资源或状态。
b)迭代(iterate):接下来,iterate方法将被用来处理传入的数据。此方法将逐个接收数据项,并更新聚合状态。它返回一个布尔值,指示是否继续迭代或停止。
c)部分终止(terminatePartial):在迭代完成后,terminatePartial方法将被调用。它的作用类似于Hadoop中的Combiner,用于返回一个中间聚合结果,以便在多个任务之间进行合并。
d)合并(merge):merge方法用于接收来自terminatePartial的中间结果,并将其合并以形成更接近最终结果的聚合状态。此方法同样返回一个布尔值,指示合并操作是否成功。
e)最终终止(terminate):最后,terminate方法将被用来生成并返回聚合操作的最终结果。
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
// 自定义的UDAF类,用于计算最大值
public class MyMaxUDAF extends UDAF {
// 实现UDAFEvaluator接口的静态内部类
static public class MaxIntEvaluator implements UDAFEvaluator {
// 存放当前聚合操作过程中的最大值
private int mMax;
// 用于标记聚合数据集是否为空
private boolean mEmpty;
// 构造方法,用于执行初始化操作
public MaxIntEvaluator() {
super();
init();
}
// 初始化方法,用于重置聚合状态
public void init() {
// 初始化最大值为0
mMax = 0;
// 初始化聚合数据集为空
mEmpty = true;
}
// 迭代处理每一行数据。每次调用处理一行记录
public boolean iterate(IntWritable o) {
// 检查传入的数据是否为null
if (o != null) {
// 如果当前聚合数据集为空,则直接将当前值设置为最大值
if (mEmpty) {
mMax = o.get();
mEmpty = false; // 更新状态,标记聚合数据集不再为空
} else {
// 聚合数据集不为空时,用当前值和之前的最大值比较,保留较大的那个
mMax = Math.max(mMax, o.get());
}
}
return true;
}
// 输出Map阶段处理结果的方法,返回当前的最大值
public IntWritable terminatePartial() {
// 如果聚合数据集为空,则返回null;否则,返回当前的最大值
return mEmpty ? null : new IntWritable(mMax);
}
// Combine/Reduce阶段,合并处理结果
public boolean merge(IntWritable o) {
// 通过调用iterate方法进行合并操作
return iterate(o);
}
// 返回最终的聚集函数结果
public IntWritable terminate() {
// 如果聚合数据集为空,则返回null;否则,返回最终的最大值
return mEmpty ? null : new IntWritable(mMax);
}
}
}
1.2 通用UDAF
编写简单的UDAF(用户定义聚合函数)相对容易,但这种方法由于依赖Java的反射机制,可能会牺牲一些性能,并且它不支持变长参数等高级特性。相比之下,通用UDAF(Generic UDAF)提供了这些高级特性的支持,虽然它的编写可能不如简单UDAF那样直接明了。
Hive社区推崇使用通用UDAF作为最佳实践,建议采用新的抽象类org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
来替代旧的UDAF接口,并推荐使用org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
抽象类来替换旧的UDAFEvaluator
接口。这种新方法不仅提升了性能,还增加了灵活性,使得UDAF的功能更加强大和多样化。
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.io.IntWritable;
// 通过继承AbstractGenericUDAFResolver并使用Description注解来定义一个新的UDAF。
@Description(name = "max_int", value = "_FUNC_(int) - Returns the maximum value of the column")
public class MyMaxUDAF2 extends AbstractGenericUDAFResolver {
// 聚合函数的求值器内部类,继承自GenericUDAFEvaluator。
public static class MaxIntEvaluator extends GenericUDAFEvaluator {
// 用于存储输入参数的ObjectInspector。
private PrimitiveObjectInspector inputOI;
// 用于存储聚合结果。
private IntWritable result;
// 初始化方法,用于设置聚合函数的参数和返回类型。
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
// 确认参数是原始类型并初始化inputOI。
inputOI = (PrimitiveObjectInspector) parameters[0];
// 设置聚合函数的返回类型为可写的整型。
return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
}
// 创建聚合缓冲区对象的方法。
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
MaxAggBuffer buffer = new MaxAggBuffer();
reset(buffer);
return buffer;
}
// 重置聚合缓冲区对象的方法。
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((MaxAggBuffer) agg).setValue(Integer.MIN_VALUE);
}
// 迭代方法,用于处理每一行数据。
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
if (parameters[0] != null) {
MaxAggBuffer myagg = (MaxAggBuffer) agg;
// 从参数中获取整数值并更新聚合缓冲区中的最大值。
int value = PrimitiveObjectInspectorUtils.getInt(parameters[0], inputOI);
if (value > myagg.value) {
myagg.setValue(value);
}
}
}
// 终止部分聚合的方法,通常返回最终聚合结果。
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return terminate(agg);
}
// 合并部分聚合结果的方法。
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
if (partial != null) {
MaxAggBuffer myagg = (MaxAggBuffer) agg;
// 从部分聚合结果中获取整数值并更新聚合缓冲区中的最大值。
int partialValue = PrimitiveObjectInspectorUtils.getInt(partial, inputOI);
if (partialValue > myagg.value) {
myagg.setValue(partialValue);
}
}
}
// 终止方法,用于返回最终聚合结果。
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
MaxAggBuffer myagg = (MaxAggBuffer) agg;
// 创建IntWritable对象并设置聚合结果,然后返回。
result = new IntWritable(myagg.value);
return result;
}
// 聚合缓冲区对象的内部类定义,用于存储聚合过程中的中间状态。
static class MaxAggBuffer implements AggregationBuffer {
int value; // 聚合缓冲区中的值
// 设置聚合缓冲区中的值
void setValue(int val) { value = val; }
}
}
}
特性/UDAF类型 | 简单UDAF | 通用UDAF |
---|---|---|
性能 | 依赖反射,性能较低 | 不依赖反射,性能较高 |
参数灵活性 | 不支持变长参数 | 支持变长参数 |
易用性 | 编写简单直观 | 编写复杂,功能强大 |
推荐使用 | 适合简单聚合操作 | 适合复杂聚合逻辑和高性能需求 |
接口和抽象类 | 旧的UDAF接口和UDAFEvaluator | 新的AbstractGenericUDAFResolver 和GenericUDAFEvaluator |
功能特性 | 功能有限,实现常见聚合 | 支持复杂迭代逻辑和自定义终止逻辑 |
应用场景 | - 快速开发和原型设计 - 实现基本聚合操作,如求和、平均值 - 对性能要求不高的小型项目 | - 实现复杂的数据分析和处理 - 大数据量处理,需要高性能 - 需要变长参数支持的复杂查询 - 高级功能实现,如窗口函数、复杂的分组聚合 |
选择UDAF类型时应根据实际需求和上述特性来决定,以确保既能满足功能需求,又能获得较好的性能表现。
2. UDTF
-
继承GenericUDTF类的步骤:
开发自定义的表生成函数(UDTF)时,首先需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF这个抽象类,它为UDTF提供了一个通用的实现框架。 -
实现
initialize()、process()和close()
方法:
为了完成自定义UDTF的功能,需要实现三个核心方法:initialize()用于初始化UDTF,process()用于处理输入数据并生成输出,close()用于执行清理操作。-
initialize()
方法的调用与作用:在UDTF的执行过程中,initialize()方法是首先被调用的。它负责初始化UDTF的状态,并返回关于UDTF返回行的信息,包括返回行的个数和类型。process()
方法的执行:initialize()方法执行完成后,接下来会调用process()方法。该方法是UDTF的核心,负责对输入参数进行处理。在process()方法中,可以通过调用forward()方法将处理结果逐行返回。close()
方法的清理作用:在UDTF的所有处理工作完成后,最终会调用close()方法。这个方法用于执行必要的清理工作,如释放资源或关闭文件句柄等,确保UDTF在结束时不会留下任何未处理的事务。
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;
/**
* 自定义一个UDTF,实现将一个由任意分割符分隔的字符串切割成独立的单词。
**/
public class LineToWordUDTF extends GenericUDTF {
// 用于存储输出单词的集合
private ArrayList<String> outList = new ArrayList<String>();
/**
* initialize方法:当GenericUDTF函数初始化时被调用一次,用于执行一些初始化操作。
* 包括:
* 1. 判断函数参数个数
* 2. 判断函数参数类型
* 3. 确定函数返回值类型
*/
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
// 1. 定义输出数据的列名和类型
List<String> fieldNames = new ArrayList<String>();
List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
// 2. 添加输出数据的列名和类型
fieldNames.add("lineToWord"); // 输出列名
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); // 输出列类型
// 返回输出数据的ObjectInspector
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
/**
* process方法:自定义UDTF的核心逻辑实现方法。
* 代码实现步骤可以分为三部分:
* 1. 参数接收
* 2. 自定义UDTF核心逻辑
* 3. 输出结果
*/
@Override
public void process(Object[] objects) throws HiveException {
// 1. 获取原始数据
String arg = objects[0].toString(); // 假设第一个参数为要分割的字符串
// 2. 获取数据传入的第二个参数,此处为分隔符
String splitKey = objects[1].toString(); // 假设第二个参数为分隔符
// 3. 将原始数据按照传入的分隔符进行切分
String[] fields = arg.split(splitKey); // 分割字符串
// 4. 遍历切分后的结果,并写出
for(String field : fields) {
// 集合为复用的,首先清空集合
outList.clear();
// 将每个单词添加至集合
outList.add(field);
// 将集合内容通过forward方法写出,这里假设forward方法可以处理集合
forward(outList);
}
}
/**
* close方法:当没有其他输入行时,调用该函数。
* 可以进行一些资源关闭处理等最终处理。
*/
@Override
public void close() throws HiveException {
// 资源清理逻辑,当前示例中无具体实现
}
}
3. 总结
本文我们详细解析了UDAF和UDTF在Hive中的应用。通过实际代码示例,我们展示了UDAF如何帮助我们深入分析数据,以及UDTF如何简化复杂的数据转换任务。
感谢您的阅读和支持。如果您对UDAF、UDTF或Hive的其他高级功能有疑问,或者想要更深入地讨论,欢迎在文章下留言或直接联系我们。期待我们的下一次分享,一起在大数据的世界里探索新知。
再次感谢,希望您喜欢这次的分享。我们下次见!