1. 背景
笔者的大数据平台XSailboat的SailWorks模块包含离线分析功能。离线分析的后台实现,包含调度引擎、执行引擎、计算引擎和存储引擎。计算和存储引擎由Hive提供,调度引擎和执行引擎由我们自己实现。调度引擎根据DAG图和调度计划,安排执行顺序,监控执行过程。执行引擎接收调度引擎安排的任务,向Yarn申请容器,在容器中执行具体的任务。
我们的离线分析支持编写Hive的UDF函数,打包上传,并声明使用函数。
我们通常会通过继承org.apache.hadoop.hive.ql.udf.generic.GenericUDF来自定义自己的UDF函数,再参考Hive实现的内置UDF函数时,经常会看到在它的类名上,有@VectorizedExpressions注解,翻译过来即“向量化表达式”。在此记录一下自己学习到的知识和理解。
官方文档《Vectorized Query Execution》
有以下应该至少知道的点:
- 向量化查询缺省是关闭的;
- 要能支持向量化查询,数据存储格式必需是ORC格式(我们主要是用CSV格式)。
通常所说的向量化计算主要是从以下几个方面提升效率:
- 利用CPU底册指令对向量的运算
- 利用多核/多线程的能力进行并发计算
而Hive的向量化执行,主要是代码逻辑聚合并充分利用上下文,减少判断次数,减少对象的访问处理和序列化次数,数据切块并行。
2. 实践
package com.cimstech.udf.date;
import java.io.UnsupportedEncodingException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import com.cimstech.xfront.common.excep.WrapException;
import com.cimstech.xfront.common.text.XString;
public class VectorUDFStringToTimstamp extends VectorExpression
{
private static final long serialVersionUID = 1L;
/**
* 列序号
*/
int mColNum0 ;
/**
* 时间格式
*/
String mDateFmt ;
transient SimpleDateFormat mSdf ;
/**
* 必需得有1个无参的构造函数. <br />
* hive会先通过无参构造函数创建一个实例,然后调用getDescriptor()方法,取得描述。
* 通过描述知道有哪几列,分别是什么格式的,才知道怎么调用有参构造函数。
*/
public VectorUDFStringToTimstamp()
{
super() ;
}
/**
* 有参构造函数的参数要和getDescriptor中取得的描述相对应。
* Column类型的输入,在此用int类型列序号表示 <br />
* 标量列直接是相应类型即可。
* @param aColNum0
* @param aDateFmt
* @param aOutputColumnNum
*/
public VectorUDFStringToTimstamp(int aColNum0 , String aDateFmt, int aOutputColumnNum)
{
super(aOutputColumnNum) ;
mColNum0 = aColNum0 ;
mDateFmt = aDateFmt ;
}
@Override
public String vectorExpressionParameters()
{
return getColumnParamString(0 , mColNum0)
+ " , val " + mDateFmt ;
}
private void setDatetime(TimestampColumnVector aTimestampColVector, byte[][] aVector, int aElementNum) throws HiveException
{
if(mSdf == null)
mSdf = new SimpleDateFormat(mDateFmt) ;
String dateStr = null ;
try
{
dateStr = new String(aVector[aElementNum] , "UTF-8") ;
aTimestampColVector.getScratchTimestamp().setTime(mSdf.parse(dateStr).getTime()) ;
}
catch (UnsupportedEncodingException e)
{
WrapException.wrapThrow(e) ;
return ; // dead code
}
catch(ParseException e)
{
throw new HiveException(XString.msgFmt("时间字符串[{}]无法按模式[{}]解析!" , dateStr , mDateFmt)) ;
}
aTimestampColVector.setFromScratchTimestamp(aElementNum);
}
@Override
public void evaluate(VectorizedRowBatch aBatch) throws HiveException
{
if (childExpressions != null)
{
evaluateChildren(aBatch);
}
int n = aBatch.size;
if (n == 0)
return;
BytesColumnVector inputColVector = (BytesColumnVector) aBatch.cols[mColNum0];
TimestampColumnVector outputColVector = (TimestampColumnVector) aBatch.cols[outputColumnNum];
boolean[] inputIsNull = inputColVector.isNull;
boolean[] outputIsNull = outputColVector.isNull;
byte[][] vector = inputColVector.vector;
if (inputColVector.isRepeating)
{
// 如果是重复的,那么只需要解析第1个就行
if (inputColVector.noNulls || !inputIsNull[0])
{
outputIsNull[0] = false;
setDatetime(outputColVector, vector, 0);
}
else
{
// 重复,且都是null,那么没有可解析的,如下设置即可
outputIsNull[0] = true;
outputColVector.noNulls = false;
}
outputColVector.isRepeating = true;
return;
}
else
outputColVector.isRepeating = false;
if (inputColVector.noNulls) // 没有为null的
{
// selectedInUse为true,表示选中输入中的指定行进行处理。
if (aBatch.selectedInUse)
{
int[] sel = aBatch.selected;
if (!outputColVector.noNulls) // 全局被标为了有null值,那么各个为止都需要单独设置是否为null
{
for (int j = 0; j != n; j++)
{
final int i = sel[j] ;
outputIsNull[i] = false; // 某一行,单独设置不为null
setDatetime(outputColVector, vector, i);
}
}
else
{
for (int j = 0; j != n; j++)
{
final int i = sel[j];
// 全局被标为了没有null值,那么无需一行行标注非null
setDatetime(outputColVector, vector, i);
}
}
}
else
{
// 输入是全局没有null值的,输出被全局标为了有null值,那么把输出改过来,改为全局没有null值
if (!outputColVector.noNulls)
{
Arrays.fill(outputIsNull, false); // 所有输出都非null
outputColVector.noNulls = true; // 改为全局没有null值
}
for (int i = 0; i != n; i++)
{
setDatetime(outputColVector, vector, i);
}
}
}
else // 输入数据是有null的
{
outputColVector.noNulls = false;
if (aBatch.selectedInUse)
{
int[] sel = aBatch.selected;
for (int j = 0; j != n; j++)
{
int i = sel[j] ;
outputIsNull[i] = inputIsNull[i] ;
if(!outputIsNull[i])
setDatetime(outputColVector, vector, i) ;
}
}
else
{
System.arraycopy(inputIsNull, 0, outputIsNull, 0, n);
for (int i = 0; i != n; i++)
{
if(!outputIsNull[i])
setDatetime(outputColVector, vector, i) ;
}
}
}
}
@Override
public Descriptor getDescriptor()
{
return (new VectorExpressionDescriptor.Builder())
// 不是过滤,都认为是投影(Projection)。投影是数据库理论中的专业术语
// 投影是根据输入,构造输出,填充输出列
// 过滤就是设置aBatch.selected
.setMode(VectorExpressionDescriptor.Mode.PROJECTION)
.setNumArguments(2)
.setArgumentTypes(VectorExpressionDescriptor.ArgumentType.STRING
, VectorExpressionDescriptor.ArgumentType.STRING)
.setInputExpressionTypes(VectorExpressionDescriptor.InputExpressionType.COLUMN
, VectorExpressionDescriptor.InputExpressionType.SCALAR) // 标量,指定的字符串常量,就是标量
.build();
}
}