聊一聊UDF/UDTF/UDAF是什么,开发要点及如何使用?

news2024/11/23 0:04:20

背景介绍

UDF来源于Hive,Hive可以允许用户编写自己定义的函数UDF,然后在查询中进行使用。星环Inceptor中的UDF开发规范与Hive相同,目前有3种UDF:

A. UDF--以单个数据行为参数,输出单个数据行;

UDF(User Defined Function),即用户自定义函数,能结合SQL语句一起使用,更好地表达复杂的业务逻辑,一般以单个数据行为参数,输出单个数据行;比如数学函数、字符串函数、时间函数、拼接函数

B. UDTF: 以一个数据行为参数,输出多个数据行为一个表作为输出;

UDTF(User Defined Table Function),即用户自定义表函数,它与UDF类似。区别在于UDF只能实现一对一,而它用来实现多(行/列)对多(行/列)数据的处理逻辑。一般以一个数据行为参数,输出多个数据行为一个表作为输出,如lateral、view、explore;

C. UDAF: 以多个数据行为参数,输出一个数据行;

UDAF(User Defined Aggregate Function)用户自定义聚合函数,是由用户自主定义的,用法同如MAX、MIN和SUM已定义的聚合函数一样的处理函数。而且,不同于只能处理标量数据的系统定义的聚合函数,UDAF的可以接受并处理更广泛的数据类型,如用对象类型、隐式类型或者LOB存储的多媒体数据。由于UDAF也属于聚合函数中的一种,同样也需要与GROUPBY结合使用。

一般UDAF以多个数据行为参数,接收多个数据行,并输出一个数据行,比如COUNT、MAX;

UDF、UDTF、UDAF的开发要点及使用DEMO

星环Quark计算引擎中内置了很多函数,同时支持用户自行扩展,按规则添加后即可在sql执行过程中使用,目前支持UDF、UDTF、UDAF三种类型,一般UDF应用场景较多,后面将着重介绍UDF的开发与使用。UDAF及UDTF将主要介绍开发要点以及Demo示例。

Quark的UDF接口兼容开源Hive的UDF接口,用户可以参考开源Hive的UDF手册,或者直接把开源Hive的UDF迁移到Quark上。

UDF

Quark数据类型

Quark类型

Java原始类型

Java包装类

hadoop.hive.ioWritable

tinyintbyteByteByteWritable
smallintshortShortShortWritable
intintIntegerIntWritable
bigintlongLongLongWritable
string-StringText
charcharCharacterHiveCharWritable
booleanbooleanBooleanBooleanWritable
floatfloatFloatFloatWritable
double doubleDoubleDoubleWritable
decimal-BigDecimalHiveDecimalWritable
date-DateDateWritable
array-ListArrayListWritable
Map<K,V>-Map<K.V>HashMapWritable

UDF函数

Quark 提供了两个实现 UDF 的方式:

第一种:继承 UDF 类
  • 优点:实现简单;支持Quark的基本类型、数组和Map;支持函数重载。
  • 缺点:逻辑较为简单,只适合用于实现简单的函数
第二种:继承 GenericUDF 类
  • 优点:支持任意长度、任意类型的参数;可以根据参数个数和类型实现不同的逻辑;资源消耗更低;可以实现初始化和关闭资源的逻辑(initialize、close)。
  • 缺点:实现比继承UDF要复杂一些

一般在以下几种场景下考虑使用GenericUDF:

  • 传参情况复杂,比如某UDF要传参数有多种数量或多种类型的情况,在UDF中支持这种场景我们需要实现N个不同的evaluate()方法分别对应N种场景的传参,在GenericUDF我们只需在一个方法内加上判断逻辑,对不同的输入路由到不同的处理逻辑上即可。还有比如某UDF参数既要支持String list参数,也要支持Integer list参数。你可能认为我们只要继续多重载方法就好了,但是Java不支持同一个方法重载参数只有泛型类型不一样,所以该场景只能用GenericUDF。
  • 需要传非Writable的或复杂数据类型作为参数。比如嵌套数据结构,传入Map的key-value中的value为list数据类型,或者比如数据域数量不确定的Struct结构,都更适合使用GenericUDF在运行时捕获数据的内部构造。
  • 该UDF被大量、高频地使用,所以从收益上考虑,会尽可能地优化一切可以优化的地方,则GenericUDF相比UDF在operator中避免了多次反射转化的资源消耗(后面会细讲),更适合被考虑。
  • 该UDF函数功能未来预期的重构、扩展场景较多,需要做得足够可扩展,则GenericUDF在这方面更优秀。

pom文件的依赖导入

UDF开发依赖

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>inceptor-exec</artifactId>
    <version>xxx</version>
</dependency>

继承示例

1.继承 UDF 类

该方式实现简单,只需新建一个类继承org.apache.hadoop.hive.ql.exec.UDF;

继承UDF类必须实现evaluate方法且返回值类型不能为 void,支持定义多个evaluate方法不同参数列表用于处理不同类型数据;

可通过完善@Description展示UDF用法 UDF样例。

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.exec.Description;
 
 
@Description(
    name="my_plus",
    value="my_plus() - if string, do concat; if integer, do plus",
    extended = "Example : \n    >select my_plus('a', 'b');\n    >ab\n    >select my_plus(3, 5);\n    >8"
)
/**
 * 实现UDF函数,若字符串执行拼接,int类型执行加法运算。
 */
public class AddUDF extends UDF {
    /**
     * 编写一个函数,要求如下:
     * 1. 函数名必须为 evaluate
     * 2. 参数和返回值类型可以为:Java基本类型、Java包装类、org.apache.hadoop.io.Writable等类型、List、Map
     * 3. 函数一定要有返回值,不能为 void
     */
    public String evaluate(String... parameters) {
        if (parameters == null || parameters.length == 0) {
            return null;
        }
        StringBuilder sb = new StringBuilder();
        for (String param : parameters) {
            sb.append(param);
        }
        return sb.toString();
    }
    /**
     * 支持函数重载
     */
    public int evaluate(IntWritable... parameters) {
        if (parameters == null || parameters.length == 0) {
            return 0;
        }
        long sum = 0;
        for (IntWritable currentNum : parameters) {
            sum = Math.addExact(sum, currentNum.get());
        }
        return (int) sum;
    }
}
2.继承 GenericUDF 类

GenericUDF相比与UDF功能更丰富,支持所有参数类型,实现起来也更加复杂。org.apache.hadoop.hive.ql.udf.generic.GenericUDF API提供了一个通用的接口将任何数据类型的对象当作泛型Object去调用和输出,参数类型由ObjectInspector封装;参数Writable类由DeferredObject封装,使用时简单类型可直接从Writable获取,复杂类型可由ObjectInspector解析。

Java的ObjectInspector类,用于帮助Quark了解复杂对象的内部架构,通过创建特定的ObjectInspector对象替代创建具体类对象,在内存中储存某类对象的信息。在UDF中,ObjectInspector用于帮助Hive引擎将HQL转成MR Job时确定输入和输出的数据类型。Hive语句会生成MapReduce Job执行,所以使用的是Hadoop数据格式,不是编写UDF的Java的数据类型,比如Java的int在Hadoop为IntWritable,String在Hadoop为Text格式,所以我们需要将UDF内的Java数据类型转成正确的Hadoop数据类型以支持Hive将HQL生成MapReduce Job。

继承 GenericUDF 后,必须实现以下三个方法:

public class MyCountUDF extends GenericUDF {
        private PrimitiveObjectInspector.PrimitiveCategory[] inputType;
        private transient ObjectInspectorConverters.Converter intConverter;
        private transient ObjectInspectorConverters.Converter longConverter;
        // 初始化
        @Override
        public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
              
        }
        // DeferredObject封装实际参数的对应Writable类
        @Override
        public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
         
        }
        // 函数信息
        @Override
        public String getDisplayString(String[] strings) {
          
        }
}

initialize()方法只在 GenericUDF 初始化时被调用一次,执行一些初始化操作,包括:参数个数检查;参数类型检查与转换;确定返回值类型。

a. 参数个数检查;

可通过 arguments 数组的长度来判断函数参数的个数:

//  检查该记录是否传过来正确的参数数量,arguments的长度不为2时,则抛出异常
    if (arguments.length != 2) {
      throw new UDFArgumentLengthException("arrayContainsExample only takes 2 arguments: List<T>, T");
    }
b. 参数类型检查与转换;

针对该UDF的每个参数,initialize()方法都会收到一个对应的ObjectInspector参数,通过遍历ObjectInspector数组检查每个参数类型,根据参数类型构造ObjectInspectorConverters.Converter,用于将Hive传递的参数类型转换为对应的Writable封装对象ObjectInspector,供后续统一处理。

ObjectInspector内部有一个枚举类 Category,代表了当前 ObjectInspector 的类型。

public interface ObjectInspector extends Cloneable {
  public static enum Category {
    PRIMITIVE, // Hive原始类型
    LIST, // Hive数组
    MAP, // Hive Map
    STRUCT, // 结构体
    UNION // 联合体
  };
}

Quark原始类型又细分了多种子类型,PrimitiveObjectInspector 实现了 ObjectInspector,可以更加具体的表示对应的Hive原始类型。

public interface PrimitiveObjectInspector extends ObjectInspector {
 
  /**
   * The primitive types supported by Quark.
   */
  public static enum PrimitiveCategory {
    VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING,
    DATE, TIMESTAMP, BINARY, DECIMAL, VARCHAR, CHAR, INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME,
    UNKNOWN
  };
}

参数类型检查与转换示例:

for (int i = 0; i < length; i++) {       // 遍历每个参数
    ObjectInspector currentOI = arguments[i];
    ObjectInspector.Category type = currentOI.getCategory();     // 获取参数类型
    if (type != ObjectInspector.Category.PRIMITIVE) {         // 检查参数类型
        throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type);
    }
    PrimitiveObjectInspector.PrimitiveCategory primitiveType =
        ((PrimitiveObjectInspector) currentOI).getPrimitiveCategory();
    inputType[i] = primitiveType;
    switch (primitiveType) {        // 参数类型转换
        case INT:
            if (intConverter == null) {
                ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
                intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI);
            }
            break;
        case LONG:
            if (longConverter == null) {
                ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
                longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI);
            }
            break;
        default:
            throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType);
    }
}
c. 确定函数返回值类型

initialize() 需要 return 一个 ObjectInspector 实例,用于表示自定义UDF返回值类型。initialize() 的返回值决定了 evaluate() 的返回值类型。创建ObjectInspector时,不要用new的方式创建,应该用工厂模式去创建以保证相同类型的ObjectInspector只有一个实例,且同一个ObjectInspector可以在代码中多处被使用。

// 自定义UDF返回值类型为Long
return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
完整的 initialize() 函数
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        int length = arguments.length;
        inputType = new PrimitiveObjectInspector.PrimitiveCategory[length];
        for (int i = 0; i < length; i++) {
            ObjectInspector currentOI = arguments[i];
            ObjectInspector.Category type = currentOI.getCategory();
            if (type != ObjectInspector.Category.PRIMITIVE) {
                throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type);
            }
            PrimitiveObjectInspector.PrimitiveCategory primitiveType =
                ((PrimitiveObjectInspector) currentOI).getPrimitiveCategory();
            inputType[i] = primitiveType;
            switch (primitiveType) {
                case INT:
                    if (intConverter == null) {
                        ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
                        intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI);
                    }
                    break;
                case LONG:
                    if (longConverter == null) {
                        ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
                        longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI);
                    }
                    break;
                default:
                    throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType);
            }
        }
        return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
    }

evaluate()方法是GenericUDF的核心方法,自定义UDF的实现逻辑。代码实现步骤可以分为三部分:参数接收;自定义UDF核心逻辑;返回处理结果。

第一步:参数接收

evaluate() 的参数就是 自定义UDF 的参数。

/**
 * Evaluate the GenericUDF with the arguments.
 *
 * @param arguments
 *          The arguments as DeferedObject, use DeferedObject.get() to get the
 *          actual argument Object. The Objects can be inspected by the
 *          ObjectInspectors passed in the initialize call.
 * @return The
 */
public abstract Object evaluate(DeferredObject[] arguments)
  throws HiveException;

通过源码注释可知,DeferedObject.get() 可以获取参数的值。

/**
 * A Defered Object allows us to do lazy-evaluation and short-circuiting.
 * GenericUDF use DeferedObject to pass arguments.
 */
public static interface DeferredObject {
  void prepare(int version) throws HiveException;
  Object get() throws HiveException;
};

再看看 DeferredObject 的源码,DeferedObject.get() 返回的是 Object,传入的参数不同,会是不同的Java类型。

第二步:自定义UDF核心逻辑

这一部分根据实际项目需求自行编写。

第三步:返回处理结果

这一步和 initialize() 的返回值一一对应,基本类型返回值有两种:Writable类型 和 Java包装类型:

  • 在 initialize 指定的返回值类型为 Writable类型 时,在 evaluate() 中 return 的就应该是对应的 Writable实例。
  • 在 initialize 指定的返回值类型为 Java包装类型 时,在 evaluate() 中 return 的就应该是对应的 Java包装类实例。

evalute()示例

@Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        LongWritable out = new LongWritable();
        for (int i = 0; i < deferredObjects.length; i++) {
            PrimitiveObjectInspector.PrimitiveCategory type = this.inputType[i];
            Object param = deferredObjects[i].get();
            switch (type) {
                case INT:
                    Object intObject = intConverter.convert(param);
                    out.set(Math.addExact(out.get(), ((IntWritable) intObject).get()));
                    break;
                case LONG:
                    Object longObject = longConverter.convert(param);
                    out.set(Math.addExact(out.get(), ((LongWritable) longObject).get()));
                    break;
                default:
                    throw new IllegalStateException("Unexpected type in MyCountUDF evaluate : " + type);
            }
        }
        return out;
    }

getDisplayString() 返回的是 explain 时展示的信息。这里不能return null,否则可能在运行时抛出空指针异常。

@Override
public String getDisplayString(String[] strings) {
    return "my_count(" + Joiner.on(", ").join(strings) + ")";
}
自定义GenericUDF完整示例
@Description(
    name="my_count",
    value="my_count(...) - count int or long type numbers",
    extended = "Example :\n    >select my_count(3, 5);\n    >8\n    >select my_count(3, 5, 25);\n    >33"
)
public class MyCountUDF extends GenericUDF {
    private PrimitiveObjectInspector.PrimitiveCategory[] inputType;
    private transient ObjectInspectorConverters.Converter intConverter;
    private transient ObjectInspectorConverters.Converter longConverter;
    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        int length = objectInspectors.length;
        inputType = new PrimitiveObjectInspector.PrimitiveCategory[length];
        for (int i = 0; i < length; i++) {
            ObjectInspector currentOI = objectInspectors[i];
            ObjectInspector.Category type = currentOI.getCategory();
            if (type != ObjectInspector.Category.PRIMITIVE) {
                throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type);
            }
            PrimitiveObjectInspector.PrimitiveCategory primitiveType =
                ((PrimitiveObjectInspector) currentOI).getPrimitiveCategory();
            inputType[i] = primitiveType;
            switch (primitiveType) {
                case INT:
                    if (intConverter == null) {
                        ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
                        intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI);
                    }
                    break;
                case LONG:
                    if (longConverter == null) {
                        ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
                        longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI);
                    }
                    break;
                default:
                    throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType);
            }
        }
        return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
    }
    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        LongWritable out = new LongWritable();
        for (int i = 0; i < deferredObjects.length; i++) {
            PrimitiveObjectInspector.PrimitiveCategory type = this.inputType[i];
            Object param = deferredObjects[i].get();
            switch (type) {
                case INT:
                    Object intObject = intConverter.convert(param);
                    out.set(Math.addExact(out.get(), ((IntWritable) intObject).get()));
                    break;
                case LONG:
                    Object longObject = longConverter.convert(param);
                    out.set(Math.addExact(out.get(), ((LongWritable) longObject).get()));
                    break;
                default:
                    throw new IllegalStateException("Unexpected type in MyCountUDF evaluate : " + type);
            }
        }
        return out;
    }
    @Override
    public String getDisplayString(String[] strings) {
        return "my_count(" + Joiner.on(", ").join(strings) + ")";
    }
}

UDTF

UDTF函数作用都是输入一行数据,将该行数据拆分、并返回多行数据。不同的UDTF函数只是拆分的原理不同、作用的数据格式不同而已。

适用场景

  1. 流应用中对数据处理,如:字符串解析,hyperbase数据删除,时间段去重,时间段统计
  2. 数仓数集应用中需要将单行转换为多行,inceptor内置多种UDTF,如:explode,inline,json_tuple等

注意:返回UDTF结果的同时查询其他对象,须引用关键字 LATERAL VIEW

UDTF开发要点

1. 实现UDTF函数需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF

2. 然后重写/实现initialize, process, close三个方法

A. initialize初始化验证,返回字段名和字段类型

initialize初始化:UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型,名称)。initialize针对任务调一次, 作用是定义输出字段的列名、和输出字段的数据类型。

initialize方法示例
@Override
   /**
    * 返回数据类型:StructObjectInspector
    * 定义输出数据的列名、和数据类型。
    */
   public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
       List<String> fieldNames = new ArrayList<String>(); //fieldNames为输出的字段名
       fieldNames.add("world");
 
       List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); //类型,列输出类型
       fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
 
       return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
   }
B. 初始化完成后,调用process方法,对传入的参数进行处理,通过forword()方法把结果返回

process:初始化完成后,会调用process方法,对传入的参数进行处理,可以通过forword()方法把结果写出。process传入一行数据写出去多次,传入一行数据输出多行数据,如:mapreduce单词计数。process针对每行数据调用一次该方法。在initialize初始化的时候,定义输出字段的数据类型是集合,调用forward()将数据写入到一个缓冲区,写入缓冲区的数据也要是集合。

process方法示例
//数据的集合
 private List<String> dataList = new ArrayList<String>();
 
  /**
  * process(Object[] objects) 参数是一个数组,但是hive中的explode函数接受的是一个,一进多出
  * @param args
  * @throws HiveException
  */
 public void process(Object[] args) throws HiveException {
     //我们现在的需求是传入一个数据,在传入一个分割符
 
     //1.获取数据
     String data = args[0].toString();
 
     //2.获取分割符
     String splitKey = args[1].toString();
 
     //3.切分数据,得到一个数组
     String[] words = data.split(splitKey);
 
     //4.想把words里面的数据全部写出去。类似在map方法中,通过context.write方法
     // 定义是集合、写出去是一个string,类型不匹配,写出也要写出一个集合
     for (String word : words) {
         //5.将数据放置集合,EG:传入"hello,world,hdfs"---->写出需要写n次,hello\world
         dataList.clear();//清空数据集合
 
         dataList.add(word);
 
         //5.写出数据的操作
         forward(dataList);
     }
 }
C. 最后调用close()方法进行清理工作

最后close()方法调用,对需要清理的方法进行清理,close()方法针对整个任务调一次

UDTF DEMO

下面UDTF 实现的是字符串的分拆,多行输出

package io.transwarp.udtf;
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
public class SplitUDF extends GenericUDTF{
    @Override
    public void close() throws HiveException {
        // TODO Auto-generated method stub
    }
    @Override
    public StructObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException {
        // TODO Auto-generated method stub
        if(arg0.length != 1){
            throw new UDFArgumentLengthException("SplitString only takes one argument");
        }
         
        if(arg0[0].getCategory() != ObjectInspector.Category.PRIMITIVE){
            throw new UDFArgumentException("SplitString only takes string as a parameter");
        }
         
        ArrayList<String> fieldNames = new ArrayList<>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<>();
         
        fieldNames.add("col1");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("col2");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
         
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }
    @Override
    public void process(Object[] arg0) throws HiveException {
        // TODO Auto-generated method stub
        String input = arg0[0].toString();
        String[] inputSplits = input.split("#");
        for (int i = 0; i < inputSplits.length; i++) {
            try {
                String[] result = inputSplits[i].split(":");
                forward(result);
            } catch (Exception e) {
                continue;
            }
        }
    }
}

执行效果如下:

如何使用UDTF

将UDTF打包后,放在inceptor server 所在节点之上(建议不要放在/user/lib/hive/lib/下),之后在连接inceptor执行以下命令,生成临时函数(server有效,重启inceptor失效)

add jar /tmp/timestampUDF.jar
drop temporary function timestamp_ms;
create temporary function timestamp_ms as 'io.transwarp.udf.ToTimestamp';
  
select date, timestamp_ms(date) from table1;

 UDAF

正如前面所说,UDAF是由用户自主定义的,虽然UDAF的使用可以方便对数据的运算处理,但是使用它的数量建议不要过多,因为UDAF的数量增长和性能下降成线性关系。另外,如果存在大量的嵌套UDAF,系统的性能也会降低,建议用户在可能的情况下写一个没有嵌套或者嵌套较少的UDAF实现相同功能来提高性能。

UDAF开发要点

1. 用户的UDAF必须继承了org.apache.hadoop.hive.ql.exec.UDAF;

2. 用户的UDAF必须包含至少一个实现了org.apache.hadoop.hive.ql.exec的静态类,诸如常见的实现了 UDAFEvaluator。

3. 一个计算函数必须实现的5个方法的具体含义如下:

  • - init():主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。一般在静态类中定义一个内部字段来存放最终的结果。
  • - iterate():每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输入值合法或者正确计算了,则就返回true。
  • - terminatePartial():Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。
  • - merge():Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。
  • - terminate():Hive最终聚集结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。

UDAF DEMO

下面的UDAF DEMO目标是实现找到最大值功能,以表中某一字段为参数,返回最大值。

package udaf.transwarp.io;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
 
//UDAF是输入多个数据行,产生一个数据行
//用户自定义的UDAF必须是继承了UDAF,且内部包含多个实现了exec的静态类
public class MaxiNumber extends UDAF{
    public static class MaxiNumberIntUDAFEvaluator implements UDAFEvaluator{
        //最终结果
        private IntWritable result;
        //负责初始化计算函数并设置它的内部状态,result是存放最终结果的
        @Override
        public void init() {
            result=null;
        }
        //每次对一个新值进行聚集计算都会调用iterate方法
        public boolean iterate(IntWritable value)
        {
            if(value==null)
                return false;
            if(result==null)
              result=new IntWritable(value.get());
            else
              result.set(Math.max(result.get(), value.get()));
            return true;
        }
                                                                                                                                  
        //Hive需要部分聚集结果的时候会调用该方法
        //会返回一个封装了聚集计算当前状态的对象
        public IntWritable terminatePartial()
        {
            return result;
        }
        //合并两个部分聚集值会调用这个方法
        public boolean merge(IntWritable other)
        {
            return iterate(other);
        }
        //Hive需要最终聚集结果时候会调用该方法
        public IntWritable terminate()
        {
            return result;
        }
    }
}

UDF 的打包与使用

操作前提

将开发好自定义UDF函数的项目打包成jar包,注意:jar 包中的自定义UDF 类名,不能和现有UDF 类,在包名+类名上,完全相同

部署方式

常见的UDF部署方式有以下三种:

  • 把UDF固化到image里,重新打image(推荐);
  • 其次是通过创建临时UDF(add jar + temporary function)的方式;
  • 创建永久UDF(hdfs jar+permanent function)的方式(可行,但不是很推荐);

方式一 固化UDF

  • 视频示例(仅作示范,详情查看下方文字)

此方式的核心逻辑是把UDF jar包放到image的/usr/lib/inceptor/下面,重新制作image。具体步骤如下:

以更换inceptor中的inceptor_2.10-1.1.0-transwarp-6.1.0.jar为例:

1. 进入inceptor image

docker run -it <inceptor_image_id> bash

2. 打开另一个terminal

3. 替换container中的jar包

docker cp <jar包名称> <container_id>:/usr/lib/inceptor/ <jar包名称>

image.png

4. commit修改记录

docker commit <container_id> REPOSITORY:TAG

5. 打开manager管理页面重新启动inceptor服务

6.重启完成后即可查看quark server的pod下/usr/lib/inceptor/是否有新增的jar包

方式二 创建临时UDF

  • 视频示例(仅作示范,详情查看下方文字)

1. 查看已存在jar包

LIST JAR;

2. 添加jar包

ADD JAR[S] <local_path>;
// Local_path是jar包所在Inceptor server节点的路径。

3. 创建临时UDF

CREATE TEMPORARY FUNCTION [<db_name>.]<function_name> AS <class_name>;

临时UDF在Inceptor重启后失效。如果需要更新临时UDF,需要重启Inceptor重新创建该临时UDF。

示例:

4. 验证临时UDF

SELECT [<db_name>.]<function_name>() FROM SYSTEM.DUAL;

5. 删除临时UDF

DROP TEMPORARY FUNCTION <if exists> <function_name>;

方式三 创建永久UDF

建议优先选取前两种方式,此方式虽然可行但不推荐,故仅介绍基础命令,暂无视频提供。

1. 查看已存在jar包

LIST JAR;

2. 添加jar包

ADD JAR[S] <local_or_hdfs_path>;
//Local_path是Inceptor server节点的路径。保证hive用户对jar所在的目录有读权限。

3. 创建永久UDF

CREATE PERMANENT FUNCTION [<db_name>.]<function_name> AS <class_name>;

如果Inceptor不在local mode,那么资源的地址也必须是非本地URI,比如HDFS地址。

4. 验证永久UDF

SELECT [<db_name>.]<function_name>() FROM SYSTEM.DUAL;

5. 删除永久UDF

DROP PERMANENT FUNCTION <if exists> <function_name>;

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1869426.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DM达梦数据库基本语法整理

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; &#x1f49d;&#x1f49…

生命在于学习——Python人工智能原理(2.6.1)

六 Python的文件系统 6.1 打开文件 在Python中&#xff0c;可以使用内置的open函数来打开文件&#xff0c;open函数的基本语法如下&#xff1a; file open(file_name, moder, buffering-1, encodingNone, errorsNone, newlineNone, closefdTrue, openerNone)参数说明&#…

“水刊”来了!专注发表水方向的SCI刊物,此“水刊”非彼水刊

【SciencePub学术】曾经的超级“水刊”《WATER SUPPLY》因为触碰红线&#xff0c;现在依旧被“on hold”中&#xff0c;解封日期恐怕也是遥遥无期。 来源&#xff1a;科睿唯安官网 后台有人私信小编有没有关于“水”类的期刊推荐&#xff0c;小编后台整理了一下&#xff0c;今…

一键生成AI动画视频?Animatediff 和 ComfyUI 更配哦!

大家好我是极客菌&#xff01; 之前我分享过 Animatediff 在 WebUI 中的应用&#xff0c;最近不是在分享 ComfyUI 嘛&#xff0c;那我们也来讲讲 Animatediff 在 ComfyUI 的应用。 如果从工作流和内存利用率的角度来说&#xff0c;Animatediff 和 ComfyUI 可能更配一些&#…

金融科技如何运用技术手段实现细颗粒度服务

随着金融科技的快速发展&#xff0c;金融机构正在通过采用各种技术手段来提供更加细颗粒度的服务&#xff0c;以满足客户日益增长的个性化需求。这些技术手段不仅提高了金融服务的效率和安全性&#xff0c;还显著提升了用户体验和满意度。 一、大数据分析与人工智能&#xff08…

iOS 其他应用的文件如何在分享中使用自己的应用打开

废话少说 一、第一步&#xff1a;先配置好plist文件 右击info.plist如下图文件打开 根据自己需要配置支持的文件类型&#xff0c;也可使用property List中配置&#xff0c;一样的 其他的文件可是参考文档&#xff1a;System-Declared Uniform Type Identifiers 可复制的代码&am…

基于Vue 3.x与TypeScript的PPTIST本地部署与无公网IP远程演示文稿

文章目录 前言1. 本地安装PPTist2. PPTist 使用介绍3. 安装Cpolar内网穿透4. 配置公网地址5. 配置固定公网地址 前言 本文主要介绍如何在Windows系统环境本地部署开源在线演示文稿应用PPTist&#xff0c;并结合cpolar内网穿透工具实现随时随地远程访问与使用该项目。 PPTist …

基于java + Springboot 的二手物品交易平台实现

目录 &#x1f4da; 前言 &#x1f4d1;摘要 &#x1f4d1;系统架构 &#x1f4da; 数据库设计 &#x1f4da; 系统功能的具体实现 &#x1f4ac; 登录模块 首页模块 二手商品轮播图添加 &#x1f4ac; 后台功能模块 二手商品商品列表 添加二手商品商品 添加购物车 &a…

CesiumJS【Basic】- #019 加载glb/gltf文件(Entity方式)

文章目录 加载glb/gltf文件(Entity方式)1 目标2 代码实现3 资源文件加载glb/gltf文件(Entity方式) 1 目标 使用Entity方式加载glb/gltf文件 2 代码实现 import * as Cesium from "cesium";const viewer = new Cesium.Viewer

langchain中的召回增强生成(RAG)一览

什么是RAG&#xff1f;[3] RAG是一种通过额外的、通常是私有或实时的数据来增强LLM知识的技术。LLM能够推理各种广泛的主题&#xff0c;但它们的知识仅限于它们训练时的公共数据&#xff0c;到达其特定时间节点为止。如果你想构建可以推理私人数据或在模型截止日期之后引入的数…

linux 安装腾讯会议和解决ubuntu打开腾讯会议提示:不兼容 wayland 协议

一. 下载腾讯会议安装包 腾讯会议下载链接 二. 命令行安装 cd [安装包路径] sudo dpkg -i TencentMeeting_0300000000_3.19.1.400_x86_64_default.publish.deb 三. 打开腾讯会议提示无法支持wayland 协议 解决方法: 打开终端 sudo vi /etc/gdm3/custom.conf打开 #Wayland…

vue3 【提效】全局布局 vite-plugin-vue-layouts 实用教程

一个常见的需求是&#xff0c;同模块的若干页面需要使用同一种布局&#xff0c;比如俱乐部相关的页面的顶部需要展示俱乐部的名称&#xff0c;其他页面顶部需要展示网站名称。 通常实现的方法是&#xff0c;将俱乐部的名称和网站名称定义成公共组件&#xff0c;在每个页面都书…

pycharm terminal终端不能激活 conda 虚拟环境,解决方法

# 1. 确保执行策略已更改 Set-ExecutionPolicy RemoteSigned -Scope CurrentUser# 2. 初始化Conda conda init powershell# 3. 重启PowerShell# 4. 验证Conda初始化 conda --version# 5. 激活Conda环境 conda activate shi_labelme关闭所有的终端&#xff0c;然后重新打开新的终…

算法力扣刷题记录六【203移除链表元素】

前言 链表篇&#xff0c;开始。 记录六&#xff1a;力扣【203移除链表元素】 一、数据结构——链表 来源【代码随想录】&#xff0c;总结&#xff1a; &#xff08;1&#xff09;线性结构。内存地址不连续&#xff0c;通过指针指向串联一起。 &#xff08;2&#xff09;链表类…

互联网企业出海不得不面对的安全问题

在出海的互联网企业中&#xff0c;为什么游戏、电商企业总是被“D”&#xff1f;究其内因&#xff0c;这或与游戏和电商等业务的商业模式和技术应用有较大的关系。 首先&#xff0c;对于游戏和电商等行业而言&#xff0c;良好的用户体验是业务增长的关键点。对于普通用户而言&a…

当大模型开始「考上」一本

参加 2024 河南高考&#xff0c;豆包和文心 4.0 过了一本线&#xff0c;但比 GPT-4o 还差点。 今天的大模型&#xff0c;智力水平到底如何&#xff1f; 2024 年高考陆续出分&#xff0c;我们想要解开这个过去一年普罗大众一直争论不休的话题。高考是衡量人类智力和学识水平的…

鸿蒙:自定义组件、自定义函数、自定义样式

一、自定义组件 1.新建组件文件夹&#xff0c;新建自定义组件文件 . 2.编辑自定义组件&#xff0c;并通过 Component //声明组件 export struct PageHeader { //结构体private title: stringbuild() { //uiRow() {Image($rawfile(左返回.png)).width(15%)Text(this.title)…

C++实现一个简单的Qt信号槽机制

昨天写这个文章《深入探讨C的高级反射机制&#xff08;2&#xff09;&#xff1a;写个能用的反射库》的时候就在想&#xff0c;是不是也能在这套反射逻辑的基础上&#xff0c;实现一个类似Qt的信号槽机制&#xff1f; Qt信号槽机制简介 所谓的Qt的信号槽&#xff08;Signals …

吉时利 Keithley2461 数字源表

Keithley2461吉时利SMU高电流数字源表 2461 型图形化高电流数字 SourceMeter SMU 2461 高电流 SMU 凭借其 10A/1000W 脉冲电流和 7A/100W 直流电流能力以及双 18 位 1MS/s 数字转换器&#xff0c;优化用于检定和测试高功率材料、器件和模块&#xff0c;例如碳化硅 (SiC)、氮化…

WIFI各版本的带宽

带宽的定义&#xff1a; 带宽在网络领域通常指信道带宽&#xff0c;即信号在频谱中占用的频宽&#xff0c;单位是MHz&#xff08;兆赫&#xff09;。在无线通信中&#xff0c;带宽越宽&#xff0c;能够传输的数据量越大&#xff0c;因此信道带宽直接影响着数据传输速率。WiFi标…