大数据Hive中的UDF:自定义数据处理的利器(下)

news2025/1/18 10:05:45

在上一篇文章中,我们对第一种用户定义函数(UDF)进行了基础介绍。接下来,本文将带您深入了解剩余的两种UDF函数类型。

文章目录

    • 1. UDAF
      • 1.1 简单UDAF
      • 1.2 通用UDAF
    • 2. UDTF
    • 3. 总结

1. UDAF

1.1 简单UDAF

第一种方式是 Simple(简单) 方式,即继承 org.apache.hadoop.hive.ql.exec.UDAF 类,并在派生类中以静态内部类的方式实现 org.apache.hadoop.hive.ql.exec.UDAFEvaluator 接口。这个计算类将负责执行具体的聚合逻辑,具体步骤如下:

a)初始化(init):首先,我们需要实现UDAFEvaluator接口的init方法,用于初始化聚合过程中所需的任何资源或状态。

b)迭代(iterate):接下来,iterate方法将被用来处理传入的数据。此方法将逐个接收数据项,并更新聚合状态。它返回一个布尔值,指示是否继续迭代或停止。

c)部分终止(terminatePartial):在迭代完成后,terminatePartial方法将被调用。它的作用类似于Hadoop中的Combiner,用于返回一个中间聚合结果,以便在多个任务之间进行合并。

d)合并(merge):merge方法用于接收来自terminatePartial的中间结果,并将其合并以形成更接近最终结果的聚合状态。此方法同样返回一个布尔值,指示合并操作是否成功。

e)最终终止(terminate):最后,terminate方法将被用来生成并返回聚合操作的最终结果。



import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;



// 自定义的UDAF类,用于计算最大值
public class MyMaxUDAF extends UDAF {

    // 实现UDAFEvaluator接口的静态内部类
    static public class MaxIntEvaluator implements UDAFEvaluator {
        // 存放当前聚合操作过程中的最大值
        private int mMax;
        // 用于标记聚合数据集是否为空
        private boolean mEmpty;

        // 构造方法,用于执行初始化操作
        public MaxIntEvaluator() {
            super();
            init();
        }

        // 初始化方法,用于重置聚合状态
        public void init() {
            // 初始化最大值为0
            mMax = 0;
            // 初始化聚合数据集为空
            mEmpty = true;
        }

        // 迭代处理每一行数据。每次调用处理一行记录
        public boolean iterate(IntWritable o) {
            // 检查传入的数据是否为null
            if (o != null) {
                // 如果当前聚合数据集为空,则直接将当前值设置为最大值
                if (mEmpty) {
                    mMax = o.get();
                    mEmpty = false; // 更新状态,标记聚合数据集不再为空
                } else {
                    // 聚合数据集不为空时,用当前值和之前的最大值比较,保留较大的那个
                    mMax = Math.max(mMax, o.get());
                }
            }
            return true;
        }

        // 输出Map阶段处理结果的方法,返回当前的最大值
        public IntWritable terminatePartial() {
            // 如果聚合数据集为空,则返回null;否则,返回当前的最大值
            return mEmpty ? null : new IntWritable(mMax);
        }

        // Combine/Reduce阶段,合并处理结果
        public boolean merge(IntWritable o) {
            // 通过调用iterate方法进行合并操作
            return iterate(o);
        }

        // 返回最终的聚集函数结果
        public IntWritable terminate() {
            // 如果聚合数据集为空,则返回null;否则,返回最终的最大值
            return mEmpty ? null : new IntWritable(mMax);
        }
    }
}


1.2 通用UDAF

编写简单的UDAF(用户定义聚合函数)相对容易,但这种方法由于依赖Java的反射机制,可能会牺牲一些性能,并且它不支持变长参数等高级特性。相比之下,通用UDAF(Generic UDAF)提供了这些高级特性的支持,虽然它的编写可能不如简单UDAF那样直接明了。
Hive社区推崇使用通用UDAF作为最佳实践,建议采用新的抽象类org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver来替代旧的UDAF接口,并推荐使用org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator抽象类来替换旧的UDAFEvaluator接口。这种新方法不仅提升了性能,还增加了灵活性,使得UDAF的功能更加强大和多样化。

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.io.IntWritable;

// 通过继承AbstractGenericUDAFResolver并使用Description注解来定义一个新的UDAF。
@Description(name = "max_int", value = "_FUNC_(int) - Returns the maximum value of the column")
public class MyMaxUDAF2 extends AbstractGenericUDAFResolver {

    // 聚合函数的求值器内部类,继承自GenericUDAFEvaluator。
    public static class MaxIntEvaluator extends GenericUDAFEvaluator {

        // 用于存储输入参数的ObjectInspector。
        private PrimitiveObjectInspector inputOI;
        // 用于存储聚合结果。
        private IntWritable result;

        // 初始化方法,用于设置聚合函数的参数和返回类型。
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);
            // 确认参数是原始类型并初始化inputOI。
            inputOI = (PrimitiveObjectInspector) parameters[0];
            // 设置聚合函数的返回类型为可写的整型。
            return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
        }

        // 创建聚合缓冲区对象的方法。
        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            MaxAggBuffer buffer = new MaxAggBuffer();
            reset(buffer);
            return buffer;
        }

        // 重置聚合缓冲区对象的方法。
        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            ((MaxAggBuffer) agg).setValue(Integer.MIN_VALUE);
        }

        // 迭代方法,用于处理每一行数据。
        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            if (parameters[0] != null) {
                MaxAggBuffer myagg = (MaxAggBuffer) agg;
                // 从参数中获取整数值并更新聚合缓冲区中的最大值。
                int value = PrimitiveObjectInspectorUtils.getInt(parameters[0], inputOI);
                if (value > myagg.value) {
                    myagg.setValue(value);
                }
            }
        }

        // 终止部分聚合的方法,通常返回最终聚合结果。
        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            return terminate(agg);
        }

        // 合并部分聚合结果的方法。
        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            if (partial != null) {
                MaxAggBuffer myagg = (MaxAggBuffer) agg;
                // 从部分聚合结果中获取整数值并更新聚合缓冲区中的最大值。
                int partialValue = PrimitiveObjectInspectorUtils.getInt(partial, inputOI);
                if (partialValue > myagg.value) {
                    myagg.setValue(partialValue);
                }
            }
        }

        // 终止方法,用于返回最终聚合结果。
        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            MaxAggBuffer myagg = (MaxAggBuffer) agg;
            // 创建IntWritable对象并设置聚合结果,然后返回。
            result = new IntWritable(myagg.value);
            return result;
        }

        // 聚合缓冲区对象的内部类定义,用于存储聚合过程中的中间状态。
        static class MaxAggBuffer implements AggregationBuffer {
            int value; // 聚合缓冲区中的值
            // 设置聚合缓冲区中的值
            void setValue(int val) { value = val; }
        }
    }
}

特性/UDAF类型简单UDAF通用UDAF
性能依赖反射,性能较低不依赖反射,性能较高
参数灵活性不支持变长参数支持变长参数
易用性编写简单直观编写复杂,功能强大
推荐使用适合简单聚合操作适合复杂聚合逻辑和高性能需求
接口和抽象类旧的UDAF接口和UDAFEvaluator新的AbstractGenericUDAFResolverGenericUDAFEvaluator
功能特性功能有限,实现常见聚合支持复杂迭代逻辑和自定义终止逻辑
应用场景- 快速开发和原型设计
- 实现基本聚合操作,如求和、平均值
- 对性能要求不高的小型项目
- 实现复杂的数据分析和处理
- 大数据量处理,需要高性能
- 需要变长参数支持的复杂查询
- 高级功能实现,如窗口函数、复杂的分组聚合

选择UDAF类型时应根据实际需求和上述特性来决定,以确保既能满足功能需求,又能获得较好的性能表现。

2. UDTF

  • 继承GenericUDTF类的步骤:
    开发自定义的表生成函数(UDTF)时,首先需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF这个抽象类,它为UDTF提供了一个通用的实现框架。

  • 实现initialize()、process()和close()方法:
    为了完成自定义UDTF的功能,需要实现三个核心方法:initialize()用于初始化UDTF,process()用于处理输入数据并生成输出,close()用于执行清理操作。

    - initialize()方法的调用与作用:在UDTF的执行过程中,initialize()方法是首先被调用的。它负责初始化UDTF的状态,并返回关于UDTF返回行的信息,包括返回行的个数和类型。

    • process()方法的执行:initialize()方法执行完成后,接下来会调用process()方法。该方法是UDTF的核心,负责对输入参数进行处理。在process()方法中,可以通过调用forward()方法将处理结果逐行返回。
    • close()方法的清理作用:在UDTF的所有处理工作完成后,最终会调用close()方法。这个方法用于执行必要的清理工作,如释放资源或关闭文件句柄等,确保UDTF在结束时不会留下任何未处理的事务。

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * 自定义一个UDTF,实现将一个由任意分割符分隔的字符串切割成独立的单词。
 **/
public class LineToWordUDTF extends GenericUDTF {

    // 用于存储输出单词的集合
    private ArrayList<String> outList = new ArrayList<String>();

    /**
     * initialize方法:当GenericUDTF函数初始化时被调用一次,用于执行一些初始化操作。
     * 包括:
     *      1. 判断函数参数个数
     *      2. 判断函数参数类型
     *      3. 确定函数返回值类型
     */
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        // 1. 定义输出数据的列名和类型
        List<String> fieldNames = new ArrayList<String>();
        List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

        // 2. 添加输出数据的列名和类型
        fieldNames.add("lineToWord"); // 输出列名
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); // 输出列类型

        // 返回输出数据的ObjectInspector
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    /**
     * process方法:自定义UDTF的核心逻辑实现方法。
     * 代码实现步骤可以分为三部分:
     *  1. 参数接收
     *  2. 自定义UDTF核心逻辑
     *  3. 输出结果
     */
    @Override
    public void process(Object[] objects) throws HiveException {
        // 1. 获取原始数据
        String arg = objects[0].toString(); // 假设第一个参数为要分割的字符串

        // 2. 获取数据传入的第二个参数,此处为分隔符
        String splitKey = objects[1].toString(); // 假设第二个参数为分隔符

        // 3. 将原始数据按照传入的分隔符进行切分
        String[] fields = arg.split(splitKey); // 分割字符串
        // 4. 遍历切分后的结果,并写出
        for(String field : fields) {
            // 集合为复用的,首先清空集合
            outList.clear();

            // 将每个单词添加至集合
            outList.add(field);

            // 将集合内容通过forward方法写出,这里假设forward方法可以处理集合
            forward(outList);
        }
    }

    /**
     * close方法:当没有其他输入行时,调用该函数。
     * 可以进行一些资源关闭处理等最终处理。
     */
    @Override
    public void close() throws HiveException {
        // 资源清理逻辑,当前示例中无具体实现
    }

}


3. 总结

本文我们详细解析了UDAF和UDTF在Hive中的应用。通过实际代码示例,我们展示了UDAF如何帮助我们深入分析数据,以及UDTF如何简化复杂的数据转换任务。

感谢您的阅读和支持。如果您对UDAF、UDTF或Hive的其他高级功能有疑问,或者想要更深入地讨论,欢迎在文章下留言或直接联系我们。期待我们的下一次分享,一起在大数据的世界里探索新知。

再次感谢,希望您喜欢这次的分享。我们下次见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1690580.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

叶面积指数(LAI)数据、NPP数据、GPP数据、植被覆盖度数据获取

引言 多种卫星遥感数据反演叶面积指数&#xff08;LAI&#xff09;产品是地理遥感生态网推出的生态环境类数据产品之一。产品包括2000-2009年逐8天数据&#xff0c;值域是-100-689之间&#xff0c;数据类型为32bit整型。该产品经过遥感数据获取、计算归一化植被指数、解译植被类…

测量模拟量的优选模块:新型设备M-SENS3 8

| 具有8路自由选择通道的新型设备M-SENS3 8 IPETRONIK推出的模拟量测量设备——M-SENS3 8是新一代设备的新成员。该模块具有8个通道&#xff0c;能够自由选择测量模式&#xff0c;不仅支持高精度电压和电流的测量&#xff0c;还新增了频率测量模式。各通道分辨率高达18位&…

Selenium常用命令(python版)

日升时奋斗&#xff0c;日落时自省 目录 1、Selenium 2、常见问题 1、Selenium 安装Python和配置环境没有涉及 注&#xff1a;如有侵权&#xff0c;立即删除 首先安装selenium包&#xff0c;安装方式很简单 pip install selenium 注:我这里已经安装好了&#xff0c;所以…

spring boot集成Knife4j

文章目录 一、Knife4j是什么&#xff1f;二、使用步骤1.引入依赖2.新增相关的配置类3.添加配置信息4.新建测试类5. 启动项目 三、其他版本集成时常见异常1. Failed to start bean ‘documentationPluginsBootstrapper2.访问地址后报404 一、Knife4j是什么&#xff1f; 前言&…

弘君资本股市行情:股指预计保持震荡上扬格局 关注汽车、银行等板块

弘君资本指出&#xff0c;近期商场体现全体分化&#xff0c;指数层面上看&#xff0c;沪指一路震动上行&#xff0c;创出年内新高&#xff0c;创业板指和科创50指数体现相对较弱&#xff0c;依然是底部震动走势。从盘面体现上看&#xff0c;轮动依然是当时商场的主基调&#xf…

逻辑分析仪 - 采样率/采样深度

采样深度&#xff08;Sampling Depth&#xff09; 采样深度指的是逻辑分析仪在一次捕获过程中可以记录的最大样本数量。简单来说&#xff0c;采样深度越大&#xff0c;逻辑分析仪可以记录的数据量就越多。这对于分析长时间的信号变化或复杂的信号序列非常重要。 采样率&#…

WEB攻防【2】——ASPX/.NET项目/DLL反编译/未授权访问/配置调试报错

ASP&#xff1a;windowsiisaspaccess .net&#xff1a;windowsiisaspxsqlserver IIS上的安全问题也会影响到 WEB漏洞&#xff1a;本身源码上的问题 服务漏洞&#xff1a;1、中间件 2、数据库 3、第三方软件 #知识点: 1、.NET:配置调试-信息泄绵 2、.NET:源码反编译-DLL…

使用Flask ORM进行数据库操作的技术指南

文章目录 安装Flask SQLAlchemy配置数据库连接创建模型类数据库操作插入数据查询数据更新数据删除数据 总结 Flask是一个轻量级的Python Web框架&#xff0c;其灵活性和易用性使其成为开发人员喜爱的选择。而ORM&#xff08;对象关系映射&#xff09;则是一种将数据库中的表与面…

HCIP-Datacom-ARST自选题库__OSPF单选【80道题】

1.OSPFV2是运行在IPV4网络的IGP&#xff0c;OSPFV3是运行在IPV6网络的ICP&#xff0c;OSPFV3与OSPFv2的报文类型相同&#xff0c;包括Hello报文、DD报文、LSR报文、LSU报文和LSAck报文。关于OSPFv3报文&#xff0c;以下哪个说法是正确的 OSPFv3使用报文头部的认证字段完成报文…

揭秘齿轮加工工艺的选用原则:精准打造高效传动的秘密武器

在机械制造领域&#xff0c;齿轮作为传动系统中的重要组成部分&#xff0c;其加工工艺的选择至关重要。不同的齿轮加工工艺会影响齿轮的精度、耐用性和效率。本文将通过递进式结构&#xff0c;深入探讨齿轮加工工艺的选用原则&#xff0c;带您了解如何精准打造高效传动的秘密武…

最简单的 UDP-RTP 协议解析程序

最简单的 UDP-RTP 协议解析程序 最简单的 UDP-RTP 协议解析程序原理源程序结果下载链接参考 最简单的 UDP-RTP 协议解析程序 本文介绍网络协议数据的处理程序。网络协议数据在视频播放器中的位置如下所示。 本文中的程序是一个 UDP/RTP 协议流媒体数据解析器。该程序可以分析 …

Java | Leetcode Java题解之第109题有序链表转换二叉搜索树

题目&#xff1a; 题解&#xff1a; class Solution {ListNode globalHead;public TreeNode sortedListToBST(ListNode head) {globalHead head;int length getLength(head);return buildTree(0, length - 1);}public int getLength(ListNode head) {int ret 0;while (head…

彩虹聚合二级域名DNS管理系统源码v1.3

聚合DNS管理系统可以实现在一个网站内管理多个平台的域名解析&#xff0c; 目前已支持的域名平台有&#xff1a;阿里云、腾讯云、华为云、西部数码、CloudFlare。 本系统支持多用户&#xff0c;每个用户可分配不同的域名解析权限&#xff1b;支持API接口&#xff0c; 支持获…

结合反序列化注入tomcat内存马

0x01 前提概述 通过前几个内存马的学习我们可以知道&#xff0c;将内存马写在jsp文件上传并不是传统意义上的内存马注入&#xff0c;jsp文件本质上就是一个servlet&#xff0c;servlet会编译成class文件&#xff0c;也会实现文件落地。借用木头师傅的一张图 结合反序列化注入内…

​​​【收录 Hello 算法】10.1 二分查找

目录 10.1 二分查找 10.1.1 区间表示方法 10.1.2 优点与局限性 10.1 二分查找 二分查找&#xff08;binary search&#xff09;是一种基于分治策略的高效搜索算法。它利用数据的有序性&#xff0c;每轮缩小一半搜索范围&#xff0c;直至找到目标元素或搜索区间为空…

Python | Leetcode Python题解之第110题平衡二叉树

题目&#xff1a; 题解&#xff1a; class Solution:def isBalanced(self, root: TreeNode) -> bool:def height(root: TreeNode) -> int:if not root:return 0leftHeight height(root.left)rightHeight height(root.right)if leftHeight -1 or rightHeight -1 or a…

【论文笔记】| 定制化生成PuLID

PuLID: Pure and Lightning ID Customization via Contrastive Alignment ByteDance, arXiv:2404.16022v1 Theme: Customized generation 原文链接&#xff1a;https://arxiv.org/pdf/2404.16022 Main Work 提出了 Pure 和 Lightning ID 定制 (PuLID)&#xff0c;这是一种用于…

Golang | Leetcode Golang题解之第109题有序链表转换二叉搜索树

题目&#xff1a; 题解&#xff1a; var globalHead *ListNodefunc sortedListToBST(head *ListNode) *TreeNode {globalHead headlength : getLength(head)return buildTree(0, length - 1) }func getLength(head *ListNode) int {ret : 0for ; head ! nil; head head.Next…

数据分析【方差分析】四

方差分析的核心 什么是方差分析:方差分析是假设检验的一种延续与扩展,主要用于多个总体均值(三组或三组以上均值)是否相等做出假设检验,研究分类型自变量对数值型因变量的影响。 它的零假设和设备假设分别为: 单因素方差分析的前提条件 独立性 组内独立(随机抽样、…

618购物节快递量激增,EasyCVR视频智能分析助力快递网点智能升级

随着网络618购物节的到来&#xff0c;物流仓储与快递行业也迎来业务量暴增的情况。驿站网点和快递门店作为物流体系的重要组成部分&#xff0c;其安全性和运营效率日益受到关注。为了提升这些场所的安全防范能力和服务水平&#xff0c;实施视频智能监控方案显得尤为重要。 一、…