主要控制优化使用哪种join算法。
1 Common Join
Common Join是Hive中最稳定的join算法也是默认的join算法,其通过一个MapReduce Job完成一个join操作。Map端负责读取join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。
sql语句中的join操作和执行计划中的Common Join任务并非一对一的关系,一个sql语句中的相邻的且关联字段相同的多个join操作可以合并为一个Common Join任务,关联字段不同时只能分2个Common Join任务。
例如:
--两个join操作的关联字段均为b表的key1字段,
--则该语句的两个join操作可以由一个common join任务实现,即通过一个MapReduce任务实现
select
a.val,
b.val,
c.val
from a
join b on a.key=b.key1
join c on c.key=b.key1;
--两个join操作的关联字段都不同,
--则该语句的两个join操作需要各自通过一个common join任务实现,即通过2个MapReduce任务实现
select
a.val,
b.val,
c.val
from a
join b on a.key=b.key1
join c on c.key=b.key2;
2 Map Join
2.1 Map Join算法概述
Map Join算法可以通过两个只有map阶段的Job完成一个join操作。适用于大表join小表。
第一个Job会读取小表数据,将其制作为hash table,并上传至Hadoop分布式缓存(本质上是上传至HDFS)。
第二个Job会先从分布式缓存中读取小表数据,并缓存在Map Task的内存中,然后扫描大表数据,在map端即可完成关联操作。
2.2 优化说明
Map Join有两种触发方式,一种是用户在SQL语句中增加hint提示,另外一种是Hive优化器根据参与join表的数据量大小,自动触发。
(1)Hint提示
通过指定map join算法,且将ta作为map join的小表。(方法已过时,不推荐使用)
select /*+ mapjoin(ta) */
ta.id,
tb.id
from table_a ta
join table_b tb
on ta.id=tb.id;
(2)自动触发
Hive在编译SQL语句阶段,开始所有的join操作均采用Common Join算法实现。
在之后的物理优化阶段,Hive会根据每个Common Join任务所需表的大小判断该Common Join任务是否能够转换为Map Join任务;若满足要求,就把Common Join任务自动转换为Map Join任务。
存在问题:
在SQL的编译阶段,有些Common Join任务所需的表的大小是未知的(例如对子查询进行join操作),所以在编译阶是无法确定这种Common Join任务是否能转换成Map Join任务。
解决:
Hive会在编译阶段生成一个条件任务(Conditional Task(按顺序执行计划)),包含一个计划列表,计划列表中包含转换后所有可能的Map Join任务以及原有的Common Join任务(作为后备任务,无论如何都会跑完的最稳定的任务)。在运行时决定最终具体采用哪个计划。
当使用的表的大小都知道,可以不使用条件任务(Conditional Task);当使用子查询等不知道表的大小时就在编译阶段生成一个条件任务。
大致思路:
优化前的执行计划按顺序从TaskA到CommonJoinTask到TaskC执行;优化后的执行计划按顺序执行到条件任务(Conditional Task),如果有Map Join任务就先执行,如果所有Map Join任务都没有执行成功就执行CommonJoinTask任务。
Map join自动转换的具体判断逻辑(针对执行计划里的CommonJoinTask能不能转为MapJoinTask):
参数说明:
--启动Map Join自动转换,默认开启
set hive.auto.convert.join=true;
--一个Common Join operator转为Map Join operator的判断条件,
--若该Common Join相关的表中,存在n-1张(即除大表之外的表)表的已知大小总和<=该值,
--则生成一个Map Join计划,此时可能存在多种n-1张表的组合均满足该条件,
--则hive会为每种满足条件的组合均生成一个Map Join计划,同时还会保留原有的Common Join计划作为后备(back up)计划,
--实际运行时,优先执行Map Join计划,若不能执行成功,则启动Common Join后备计划。
--即控制内存中能放下小表的大小的总阈值
set hive.mapjoin.smalltable.filesize=250000;
--开启无条件转Map Join
set hive.auto.convert.join.noconditionaltask=true;
--无条件转Map Join时的小表之和阈值,若一个Common Join operator相关的表中,存在n-1张表的大小总和<=该值,
--此时hive便不会再为每种n-1张表的组合均生成Map Join计划,同时也不会保留Common Join作为后备计划。
--而是只生成一个最优的Map Join计划。
set hive.auto.convert.join.noconditionaltask.size=10000000;
2.3 案例
1、示例SQL
select
*
from order_detailod
join product_infoproduct on od.product_id = product.id
joinprovince_info province on od.province_id = province.id;
2、优化前
该SQL语句共有三张表进行两次join操作,且两次join操作的关联字段不同。故优化前的执行计划应该包含两个Common Join operator,也就是由两个MapReduce任务实现。
执行计划:
3、优化思路
参与join的三张表的数据量:
--获取表或分区的大小:
desc formatted table_name partition(partition_col='partition_value');
可将product_info和province_info作为小表进行Map Join优化。
(1)方案一、
使用条件任务进展自动转换,保证将两个Common Join operator均可转为Map Join operator,并保留Common Join作为后备计划,保证计算任务的稳定。
--启用Map Join自动转换
set hive.auto.convert.join=true;
--不使用无条件转换Map Join,即使用条件任务
set hive.auto.convert.join.noconditionaltask=false;
--调整hive.mapjoin.smalltable.filesize参数,使其大于等于product_info
set hive.mapjoin.smalltable.filesize=25285707;
执行计划:
因为hive.mapjoin.smalltable.filesize参数给的足够大,所以并没有执行到后备任务。
使用条件任务,虽然两个join都是走的MapJoin,但是并没有做合并,所以执行慢。
(2)方案二、
直接将两个Common Join operator转为两个Map Join operator,并且由于两个Map Join operator的小表大小之和小于等于hive.auto.convert.join.noconditionaltask.size,故两个Map Join operator任务可合并为同一个。这个方案计算效率最高,但需要的内存也是最多的。
--启用Map Join自动转换
set hive.auto.convert.join=true;
--使用无条件转换Map Join,即使不用条件任务,因为已经知道表的大小
set hive.auto.convert.join.noconditionaltask=true;
--调整hive.auto.convert.join.noconditionaltask.size参数,
--使其大于等于product_info和province_info之和
--即两小表先合并
set hive.auto.convert.join.noconditionaltask.size=25286076;
执行计划:
当不使用条件任务时,只要设置内存大于小表之和,MapJoinTask是可以合并的,加速的执行。
(3)方案三、
直接将两个Common Join operator转为Map Join operator,但不会将两个Map Join的任务合并。该方案计算效率比方案二低,但需要的内存也更少。
--启用Map Join自动转换
set hive.auto.convert.join=true;
--使用无条件转换Map Join,即使不用条件任务
set hive.auto.convert.join.noconditionaltask=true;
--调整hive.auto.convert.join.noconditionaltask.size参数,使其等于product_info
--即调整大小为小表里较大的表的大小
set hive.auto.convert.join.noconditionaltask.size=25285707;
执行计划:
将两个Common Join operator转为Map Join operator,但不会将两个Map Join的任务合并。该方案计算效率比方案二低,但需要的内存也更少。
3 Bucket Map Join
3.1 Bucket Map Join 算法概述
Bucket Map Join是对Map Join算法的改进,其打破了Map Join只适用于大表join小表的限制,可用于大表join大表的场景。
核心思想:
两表的分桶间进行Map Join操作的条件:
(1)参与join的表均为分桶表;
(2)关联字段为分桶字段;
(3)其中一张表的分桶数量是另外一张表分桶数量的整数倍(保证参与join的两张表的分桶之间具有明确的关联关系)。
第二个Job的Map端只需缓存其所需的分桶即可,不需再缓存小表的全表数据。
原理:
对于小表B来说,比如有哈希值0,1,2,3,4,5,6,7,8。哈希值模以2(因为表B分两个个桶),分到B表0号桶的哈希值有0、2、4、6、8;分到B表1号桶的有1、3、5、7。
同样的哈希值对于大表A来说,哈希值模以4(因为表A分4个桶),分到A表0、1、2、3号桶的哈希值分别为0、4、8,1、5,2、6,3、7。
所以大表A的0号和2号桶与小表B的0号桶关联(也即该3个分桶表进行join),大表A的1号和3号桶与小表B的1号桶关联。
Bucket Map Join时分两个阶段完成。
第一个阶段:由Map本地读取小一点表的分桶表,分别对分桶做hash table,然后该小一点的表的室友hash table都上传到HDFS内存分布式缓存中。
第二个阶段:读取大表数据与小表数据相关联。使用Bucket Map Join在第二个阶段使用BucketInputFormat(专门用来读取分桶表),切片策略使用一个桶一个切片(即大表有多少个分桶就有多少个Mapper),每一个Mapper就负责一个Bucket。由于大表中的每个Buckct与小表中的Bucket有关联关系,所以每个Mapper只需要缓存自己所需要的较小的表的关联的Bucket。然后每个Mapper扫描大表里的一个Bucket进行join。
可以理解为把大表拆分为多个小表,再对小表进行join。
3.2 优化说明
Bucket Map Join不支持自动转换,必须通过用户在SQL语句中提供如下Hint提示,并配置相关参数。
(1)Hint提示
select /*+ mapjoin(ta) */
ta.id,
tb.id
from table_a ta
join table_b tb onta.id=tb.id;
(2)相关参数
--关闭cbo优化,cbo会导致hint信息被忽略
set hive.cbo.enable=false;
--map join hint默认会被忽略(因为已经过时),需将如下参数设置为false
set hive.ignore.mapjoin.hint=false;
--启用bucket map join优化功能
set hive.optimize.bucketmapjoin = true;
3.3 案例
1、示例SQL
select
*
from(
select
*
from order_detail
where dt='2020-06-14'
)od
join(
select
*
from payment_detail
where dt='2020-06-14'
)pd
onod.id=pd.order_detail_id;
2、优化前
上述SQL有两张表进行一次join操作,优化前执行计划包含一个common join task,通过一个MapReduce Job实现。
3、优化思路
查看参与join两张表的大小:
存在问题:
两张表相对较大,若采用普通的Map Join算法,则Map端需要较多的内存来缓存数据。但Map端的内存不可能无上限的分配。
解决:
当参与Join的表数据量均过大时,可以考虑采用Bucket Map Join算法。
使用Bucket Map Join:
(1)根据源表创建两个分桶表:
order_detail建议分16个bucket,payment_detail建议分8个bucket,注意分桶个数的倍数关系以及分桶字段(必须按关联字段分桶)。
--订单表
drop table ifexists order_detail_bucketed;
create tableorder_detail_bucketed(
id string comment '订单id',
user_id string comment '用户id',
product_id string comment '商品id',
province_id string comment '省份id',
create_time string comment '下单时间',
product_num int comment '商品件数',
total_amount decimal(16, 2) comment '下单金额'
)
clustered by (id)into 16 buckets
row formatdelimited fields terminated by '\t';
--支付表
drop table ifexists payment_detail_bucketed;
create tablepayment_detail_bucketed(
id string comment '支付id',
order_detail_id string comment '订单明细id',
user_id string comment '用户id',
payment_time string comment '支付时间',
total_amount decimal(16, 2) comment '支付金额'
)
clustered by(order_detail_id) into 8 buckets
row formatdelimited fields terminated by '\t';
(2)向两个桶导入数据
--订单表
insert overwritetable order_detail_bucketed
select
id,
user_id,
product_id,
province_id,
create_time,
product_num,
total_amount
from order_detail
wheredt='2020-06-14';
--分桶表
insert overwritetable payment_detail_bucketed
select
id,
order_detail_id,
user_id,
payment_time,
total_amount
frompayment_detail
wheredt='2020-06-14';
当数据量大时,数据存储直接用分桶存,就不用重新创建分桶再导入数据。
(3)设置参数
--关闭cbo优化,cbo会导致hint信息被忽略,需将如下参数修改为false
set hive.cbo.enable=false;
--map join hint默认会被忽略(因为已经过时),需将如下参数修改为false
set hive.ignore.mapjoin.hint=false;
--启用bucket map join优化功能,默认不启用,需将如下参数修改为true
set hive.optimize.bucketmapjoin = true;
(4)重写SQL语句
select /*+ mapjoin(pd)*/
*
from order_detail_bucketed od
join payment_detail_bucketed pd on od.id =pd.order_detail_id;
执行计划:
Bucket Map Join的执行计划的基本信息和普通的Map Join无异,若想看到差异,可查看执行计划的详细信息。
explain extended select /*+ mapjoin(pd) */
*
fromorder_detail_bucketed od
joinpayment_detail_bucketed pd on od.id = pd.order_detail_id;
详细执行计划中,如在Map Join Operator中看到 “BucketMapJoin: true”,则表明使用的Join算法为Bucket Map Join。
4 Sort Merge Bucket Map Join
4.1 Sort Merge Bucket Map Join算法概述
Sort Merge Bucket Map Join(简称SMB Map Join)基于Bucket Map Join。
两表的分桶间进行SMB Map Join操作的条件:
(1)参与join的表均为分桶表;
(2)分桶内的数据是有序的;
(3)分桶字段、排序字段和关联字段为相同字段;
(4)其中一张表的分桶数量是另外一张表分桶数量的整数倍(保证参与join的两张表的分桶之间具有明确的关联关系)。
Sort Merge Bucket Map Join和Bucket Map Join的异同点:
(1)相同点:利用两表各分桶之间的关联关系,在分桶之间进行join操作。
(2)不同点:分桶之间的join操作的实现原理:
(i)Bucket Map Join两个分桶之间的join实现原理为Hash Join算法:对参与join的一张表构建hash table,然后扫描另外一张表,然后进行逐行匹配。
(ii)SMB Map Join,两个分桶之间的join实现原理为Sort Merge Join算法:需要在两张按照关联字段排好序的表中进行。
两表的关联字段已经排好序,先拿到大表里的第一个key,对于重复的key,当查到下一个key与当前key不同时则说明key相同的数据已经拿完。采用同样的办法拿小表的数据。在读数据时不需要把整个分桶表的数据都拿到内存,因为只需要按顺序读每行数据,只要读到下一行key不同时则说明数据已经拿完。
当两表的数据已经拿完之后就可以进行join操作。
SMB Map Join在进行Join操作时,Map端是无需对整个Bucket构建hash table,也无需在Map端缓存整个Bucket数据的;每个Mapper只需按顺序逐个key读取两个分桶的数据进行join即可,所以SMB Map Join对内存没有要求。
4.2 优化说明
Sort Merge Bucket Map Join有两种触发方式,包括Hint提示和自动转换。Hint提示已过时,不推荐使用。
Sort Merge Bucket Map Join不需要考虑内存的大小,只要是按关联字段分桶的,且桶的数量成倍数关系,且桶内按分桶字段有序即可使用。
相关参数:
--启动Sort Merge BucketMap Join优化(总开关)
sethive.optimize.bucketmapjoin.sortedmerge=true;
--使用自动转换SMB Join
sethive.auto.convert.sortmerge.join=true;
4.3 案例
1、示例SQL
select
*
from(
select
*
from order_detail
where dt='2020-06-14'
)od
join(
select
*
from payment_detail
where dt='2020-06-14'
)pd
onod.id=pd.order_detail_id;
2、优化前
上述SQL有两张表进行一次join操作,优化前执行计划包含一个common join task,通过一个MapReduce Job实现。
3、优化思路
查看参与join两张表的大小:
存在问题:
两张表相对较大,若采用普通的Map Join算法,则Map端需要较多的内存来缓存数据。但Map端的内存不可能无上限的分配。
解决:
当参与Join的表数据量均过大时,除了可以考虑采用Bucket Map Join算法,还可以考虑SMB Join。相较于Bucket Map Join,SMB Map Join对分桶大小是没有要求的。
使用SMB Map Join:
(1)根据源表创建两个分桶表:
order_detail建议分16个bucket,payment_detail建议分8个bucket,注意分桶个数的倍数关系以及分桶字段(必须按关联字段分桶)和排序字段。
--订单表
drop table ifexists order_detail_sorted_bucketed;
create table order_detail_sorted_bucketed(
id string comment '订单id',
user_id string comment '用户id',
product_id string comment '商品id',
province_id string comment '省份id',
create_time string comment '下单时间',
product_num int comment '商品件数',
total_amount decimal(16, 2) comment '下单金额'
)
clustered by (id) sorted by(id) into 16 buckets
row formatdelimited fields terminated by '\t';
--支付表
drop table ifexists payment_detail_sorted_bucketed;
create tablepayment_detail_sorted_bucketed(
id string comment '支付id',
order_detail_id string comment '订单明细id',
user_id string comment '用户id',
payment_time string comment '支付时间',
total_amount decimal(16, 2) comment '支付金额'
)
clustered by (order_detail_id) sortedby(order_detail_id) into 8 buckets
row formatdelimited fields terminated by '\t';
(2)向两个桶导入数据
--订单表
insert overwrite table order_detail_sorted_bucketed
select
id,
user_id,
product_id,
province_id,
create_time,
product_num,
total_amount
from order_detail
where dt='2020-06-14';
--分桶表
insert overwrite table payment_detail_sorted_bucketed
select
id,
order_detail_id,
user_id,
payment_time,
total_amount
from payment_detail
where dt='2020-06-14';
当数据量大时,数据存储直接用分桶存,就不用重新创建分桶再导入数据。
(3)设置参数
--启动Sort Merge Bucket Map Join优化
set hive.optimize.bucketmapjoin.sortedmerge=true;
--使用自动转换SMB Join
set hive.auto.convert.sortmerge.join=true;
(4)重写SQL语句
select
*
from order_detail_sorted_bucketed od
join payment_detail_sorted_bucketed pd
on od.id = pd.order_detail_id;
执行计划:
5 Join总结
5.1 Map Join
适用于大表join小表,对于SQL来说,能通过Map Join就走Map Join,不能通过就走Common Join。所以需要在hive-site.xml中配置全局参数:
<!--启动Map Join自动转换,默认开启,全局设置为true-->
<property>
<name>hive.auto.convert.join</name>
<value>true</value>
</property>
<!-- 取决于map端能缓存多少东西,一般为map端总内存的1\2~2\3作为缓存 设置时为文件大小,所以需要再除以10-->
<property>
<name>hive.mapjoin.smalltable.filesize</name>
<value>250000</value>
</property>
<!--开启无条件转Map Join,全局设置为true-->
<property>
<name>hive.auto.convert.join.noconditionaltask</name>
<value>true</value>
</property>
<!-- 取决于map端能缓存多少东西,一般为map端总内存的1\2~2\3作为缓存 设置时为文件大小,所以需要再除以10-->
<property>
<name>hive.auto.convert.join.noconditionaltask.size</name>
<value>100000</value>
</property>
当只有一个SQL跑特别慢时,可以用set进行单独调优根据上述的分析方法。
5.2 Bucket Map Join
适用于两个大表进行Map Join操作。需要考虑每个表要分几个桶(取决于Map端又多大内存),每个文件大小最好不超过500M。且不同的表中分桶个数要成倍数关系。
5.3 Sort Merge Bucket Map Join
优化大表进行Map Join,不对内存大小有要求,只要分桶中分桶字段有序。