Hive的UDF开发之向量化表达式(VectorizedExpressions)

news2024/11/16 16:45:58

1. 背景

笔者的大数据平台XSailboat的SailWorks模块包含离线分析功能。离线分析的后台实现,包含调度引擎、执行引擎、计算引擎和存储引擎。计算和存储引擎由Hive提供,调度引擎和执行引擎由我们自己实现。调度引擎根据DAG图和调度计划,安排执行顺序,监控执行过程。执行引擎接收调度引擎安排的任务,向Yarn申请容器,在容器中执行具体的任务。

我们的离线分析支持编写Hive的UDF函数,打包上传,并声明使用函数。
在这里插入图片描述
我们通常会通过继承org.apache.hadoop.hive.ql.udf.generic.GenericUDF来自定义自己的UDF函数,再参考Hive实现的内置UDF函数时,经常会看到在它的类名上,有@VectorizedExpressions注解,翻译过来即“向量化表达式”。在此记录一下自己学习到的知识和理解。

官方文档《Vectorized Query Execution》
有以下应该至少知道的点:

  1. 向量化查询缺省是关闭的;
  2. 要能支持向量化查询,数据存储格式必需是ORC格式(我们主要是用CSV格式)。

通常所说的向量化计算主要是从以下几个方面提升效率:

  1. 利用CPU底册指令对向量的运算
  2. 利用多核/多线程的能力进行并发计算

而Hive的向量化执行,主要是代码逻辑聚合并充分利用上下文,减少判断次数,减少对象的访问处理和序列化次数,数据切块并行。

2. 实践

package com.cimstech.udf.date;

import java.io.UnsupportedEncodingException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;

import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.metadata.HiveException;

import com.cimstech.xfront.common.excep.WrapException;
import com.cimstech.xfront.common.text.XString;

public class VectorUDFStringToTimstamp extends VectorExpression
{

	private static final long serialVersionUID = 1L;
	
	/**
	 * 列序号
	 */
	int mColNum0 ;
	/**
	 * 时间格式
	 */
	String mDateFmt ;
	
	transient SimpleDateFormat mSdf ;
	
	/**
	 * 必需得有1个无参的构造函数.		<br />
	 * hive会先通过无参构造函数创建一个实例,然后调用getDescriptor()方法,取得描述。
	 * 通过描述知道有哪几列,分别是什么格式的,才知道怎么调用有参构造函数。
	 */
	public VectorUDFStringToTimstamp()
	{
		super() ;
	}
	
	/**
	 * 有参构造函数的参数要和getDescriptor中取得的描述相对应。
	 * Column类型的输入,在此用int类型列序号表示			<br />
	 * 标量列直接是相应类型即可。						
	 * @param aColNum0
	 * @param aDateFmt
	 * @param aOutputColumnNum
	 */
	public VectorUDFStringToTimstamp(int aColNum0 , String aDateFmt, int aOutputColumnNum)
	{
		super(aOutputColumnNum) ;
		mColNum0 = aColNum0 ;
		mDateFmt = aDateFmt ;
	}
	
	@Override
	public String vectorExpressionParameters()
	{
		return getColumnParamString(0 , mColNum0)
				+ " , val " + mDateFmt ;
	}
	
	private void setDatetime(TimestampColumnVector aTimestampColVector, byte[][] aVector, int aElementNum) throws HiveException
	{
		if(mSdf == null)
			mSdf = new SimpleDateFormat(mDateFmt) ;
		String dateStr = null ;
		try
		{
			dateStr = new String(aVector[aElementNum] , "UTF-8") ;
			aTimestampColVector.getScratchTimestamp().setTime(mSdf.parse(dateStr).getTime()) ;
		}
		catch (UnsupportedEncodingException e)
		{
			WrapException.wrapThrow(e) ;
			return ;		// dead code
		}
		catch(ParseException e)
		{
			throw new HiveException(XString.msgFmt("时间字符串[{}]无法按模式[{}]解析!" , dateStr , mDateFmt)) ;
		}
		aTimestampColVector.setFromScratchTimestamp(aElementNum);
	}

	@Override
	public void evaluate(VectorizedRowBatch aBatch) throws HiveException
	{
		if (childExpressions != null)
		{
			evaluateChildren(aBatch);
		}

		int n = aBatch.size;
		if (n == 0)
			return;

		BytesColumnVector inputColVector = (BytesColumnVector) aBatch.cols[mColNum0];
		TimestampColumnVector outputColVector = (TimestampColumnVector) aBatch.cols[outputColumnNum];
		boolean[] inputIsNull = inputColVector.isNull;
		boolean[] outputIsNull = outputColVector.isNull;

		byte[][] vector = inputColVector.vector;

		if (inputColVector.isRepeating)
		{
			// 如果是重复的,那么只需要解析第1个就行
			if (inputColVector.noNulls || !inputIsNull[0])
			{
				outputIsNull[0] = false;
				setDatetime(outputColVector, vector, 0);
			}
			else
			{
				// 重复,且都是null,那么没有可解析的,如下设置即可
				outputIsNull[0] = true;
				outputColVector.noNulls = false;
			}
			outputColVector.isRepeating = true;
			return;
		}
		else
			outputColVector.isRepeating = false;

		if (inputColVector.noNulls) 	// 没有为null的
		{
			// selectedInUse为true,表示选中输入中的指定行进行处理。
			if (aBatch.selectedInUse)
			{
				int[] sel = aBatch.selected;
				if (!outputColVector.noNulls)		// 全局被标为了有null值,那么各个为止都需要单独设置是否为null
				{
					for (int j = 0; j != n; j++)
					{
						final int i = sel[j] ;
						outputIsNull[i] = false;		// 某一行,单独设置不为null
						setDatetime(outputColVector, vector, i);
					}
				}
				else
				{
					for (int j = 0; j != n; j++)
					{
						final int i = sel[j];
						// 全局被标为了没有null值,那么无需一行行标注非null
						setDatetime(outputColVector, vector, i);
					}
				}
			}
			else
			{
				// 输入是全局没有null值的,输出被全局标为了有null值,那么把输出改过来,改为全局没有null值
				if (!outputColVector.noNulls)		
				{
					Arrays.fill(outputIsNull, false);		// 所有输出都非null
					outputColVector.noNulls = true;			// 改为全局没有null值
				}
				for (int i = 0; i != n; i++)
				{
					setDatetime(outputColVector, vector, i);
				}
			}
		}
		else	// 输入数据是有null的
		{
			outputColVector.noNulls = false;

			if (aBatch.selectedInUse)
			{
				int[] sel = aBatch.selected;
				for (int j = 0; j != n; j++)
				{
					int i = sel[j] ;
					outputIsNull[i] = inputIsNull[i] ;
					if(!outputIsNull[i])
						setDatetime(outputColVector, vector, i) ;
				}
			}
			else
			{
				System.arraycopy(inputIsNull, 0, outputIsNull, 0, n);
				for (int i = 0; i != n; i++)
				{
					if(!outputIsNull[i])
						setDatetime(outputColVector, vector, i) ;
				}
			}
		}
	}

	@Override
	public Descriptor getDescriptor()
	{
		return (new VectorExpressionDescriptor.Builder())
				// 不是过滤,都认为是投影(Projection)。投影是数据库理论中的专业术语
				// 投影是根据输入,构造输出,填充输出列
				// 过滤就是设置aBatch.selected
		        .setMode(VectorExpressionDescriptor.Mode.PROJECTION)		
		        .setNumArguments(2)
		        .setArgumentTypes(VectorExpressionDescriptor.ArgumentType.STRING
		        		, VectorExpressionDescriptor.ArgumentType.STRING)
		        .setInputExpressionTypes(VectorExpressionDescriptor.InputExpressionType.COLUMN
		        		, VectorExpressionDescriptor.InputExpressionType.SCALAR)		// 标量,指定的字符串常量,就是标量
		        .build();
	}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1461868.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024年天津高考数学备考:历年选择题真题练一练(2014~2023)

距离2024年高考还有不到四个月的时间&#xff0c;今天我们来看一下2014~2023年的天津市高考数学的选择题&#xff0c;从过去十年的真题中随机抽取5道题&#xff0c;并且提供解析。后附六分成长独家制作的在线练习集&#xff0c;科学、高效地反复刷这些真题&#xff0c;吃透真题…

K8S部署Java项目(Springboot项目)pod状态:CrashLoopBackOff

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

docker 镜像仓库实战

“面对脆弱的玩笑” 悉知Docker镜像仓库的命令后&#xff0c;我们总得将这些命令使用起来&#xff0c;在实践中深刻理解执行这些命令的实际效果。 综合实战1: 搭建一个Nginx服务 至于Nginx是什么&#xff0c;我想在这一篇中已经有过讲解: Nignx服务&#xff0c;也并非本篇要详…

基于springboot+vue的课程答疑系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

YOLOv5代码解读[02] models/yolov5l.yaml文件解析

文章目录 YOLOv5代码解读[02] models/yolov5l.yaml文件解析yolov5l.yaml文件检测头1--->耦合头检测头2--->解耦头检测头3--->ASFF检测头Model类解析parse_model函数 YOLOv5代码解读[02] models/yolov5l.yaml文件解析 yolov5l.yaml文件 # YOLOv5 &#x1f680; by Ult…

网络设备和网络软件

文章目录 网络设备和网络软件网卡交换机交换机的三个主要功能交换机的工作原理第二层交换和第三层交换交换机的堆叠和级联 路由器路由器工作原理 网关网关的分类 无线接入点(AP)调制解调器网络软件 网络设备和网络软件 网卡 网络接口卡又称网络适配器&#xff0c;简称网卡。网…

shell基础实验(1)

1、判断当前磁盘剩余空间是否有20G&#xff0c;如果小于20G&#xff0c;则将报警邮件发送给管理员&#xff0c;每天检查次磁盘剩余空间。 1.1.安装邮件服务,配置邮件服务 [rootserver ~]# yum install mailx -y[rootserver ~]# vim /etc/mail.rc set from1580540058qq.com …

抽象工厂模式 Abstract Factory

1.模式定义: 提供一个创建一系列相关或互相依赖对象的接口&#xff0c;而无需指定它们具体的类 2. 应用场景: 程序需要处理不同系列的相关产品&#xff0c;但是您不希望它依赖于这些产品的 具体类时&#xff0c; 可以使用抽象工厂 3.优点: 1.可以确信你从工厂得到的产品彼…

解决弹性布局父元素设置高自动换行,子元素均分高度问题(align-content: flex-start)

案例&#xff1a; <view class"abc"><view class"abc-item" v-for"(item,index) in 8" :key"index">看我</view> </view> <style lang"less">.abc{height: 100px;display: flex;flex-wrap: …

OD(8)之Mermaid流程图(flowcharts)使用详解

OD(8)之Mermaid流程图(flowcharts)使用详解 Author: Once Day Date: 2024年2月20日 漫漫长路才刚刚开始… 全系列文章可参考专栏: Linux实践记录_Once_day的博客-CSDN博客 参考文章: 关于 Mermaid | Mermaid 中文网 (nodejs.cn)Mermaid | Diagramming and charting tool‍…

数据结构—图

图是在数据结构中难度比较大&#xff0c;并且比较抽象一种数据结构。 图在地图&#xff0c;社交网络这方面有应用。 图的基本概念 图是由顶点集合及顶点间的关系组成的一种数据结构&#xff1a;G&#xff08;V&#xff0c;E&#xff09;。图标的英文&#xff1a;graph。 (x,…

Rust Vs Go:从头构建一个web服务

Go 和 Rust 之间的许多比较都强调它们在语法和初始学习曲线上的差异。然而&#xff0c;最终的决定性因素是重要项目的易用性。 “Rust 与 Go”争论 Rust vs Go 是一个不断出现的话题&#xff0c;并且已经有很多关于它的文章。部分原因是开发人员正在寻找信息来帮助他们决定下…

数据分析在企业培训系统中的关键作用与优势

数据分析在企业培训系统中扮演着关键的角色&#xff0c;它不仅能够帮助企业更好地了解员工培训的需求和效果&#xff0c;还能够提供有针对性的教育方案和提高培训效果。 数据分析可以帮助企业准确把握员工培训需求。通过收集和分析员工的培训需求调查和反馈信息&#xff0c;企…

浅谈WPF之利用RichTextBox实现富文本编辑器

在实际应用中&#xff0c;富文本随处可见&#xff0c;如留言板&#xff0c;聊天软件&#xff0c;文档编辑&#xff0c;特定格式内容等&#xff0c;在WPF开发中&#xff0c;如何实现富文本编辑呢&#xff1f;本文以一个简单的小例子&#xff0c;简述如何通过RichTextBox实现富文…

JavaCV之rtmp推流(FLV和M3U8)

JavaCV与FFmpeg FFmpeg是一款开源的多媒体处理工具集&#xff0c;它包含了一系列用于处理音频、视频、字幕等多媒体数据的库和工具。 JavaCV集成了FFmpeg库&#xff0c;使得Java开发者可以使用FFmpeg的功能&#xff0c;比如视频解码、编码、格式转换等。 除了FFmpeg&#xff…

01_02_mysql07_mysql8.0新特性

1.MySQL8新特性概述 MySQL从5.7版本直接跳跃发布了8.0版本 &#xff0c;可见这是一个令人兴奋的里程碑版本。MySQL 8版本在功能上做了显著的改进与增强&#xff0c;开发者对MySQL的源代码进行了重构&#xff0c;最突出的一点是多MySQL Optimizer优化器进行了改进。不仅在速度上…

在VsCode中通过Cookie登录LeetCode

在vscode中配置好leetcode之后&#xff0c;一般最常用的就是通过cookie登录leetcode ; 首先点击sign in &#xff0c; 然后选择最下面的 &#xff0c; LeetCode Cookie ! 然后输入username(也就是你的lc用户名) 或者 你leetcode绑定的邮箱 ; 输入完成之后 ; 就是要你输入你的l…

【Java EE初阶二十二】https的简单理解

1. 初识https 当前网络上,主要都是 HTTPS 了,很少能见到 HTTP.实际上 HTTPS 也是基于 HTTP.只不过 HTTPS 在 HTTP 的基础之上, 引入了"加密"机制&#xff1b;引入 HTTPS 防止你的数据被黑客篡改 &#xff1b; HTTPS 就是一个重要的保护措施.之所以能够安全, 最关键的…

C#知识点-14(索引器、foreach的循环原理、泛型、委托)

索引器 概念&#xff1a;索引器能够让我们的对象&#xff0c;以索引&#xff08;下标&#xff09;的形式&#xff0c;便捷地访问类中的集合&#xff08;数组、泛型集合、键值对&#xff09; 应用场景&#xff1a; 1、能够便捷地访问类中的集合 2、索引的数据类型、个数、顺序不…

从源码解析Kruise(K8S)原地升级原理

从源码解析Kruise原地升级原理 本文从源码的角度分析 Kruise 原地升级相关功能的实现。 本篇Kruise版本为v1.5.2。 Kruise项目地址: https://github.com/openkruise/kruise 更多云原生、K8S相关文章请点击【专栏】查看&#xff01; 原地升级的概念 当我们使用deployment等Wor…