1.调优概述
Hive 作为大数据领域常用的数据仓库组件,在设计和查询时要特别注意效率。影响 Hive 效率的几乎从不是数据量过大,而是数据倾斜、数据冗余、Job或I/O过多、MapReduce分配不合理等等。对 Hive 的调优既包含 Hive 的建表设计方面,对HQL 语句本身的优化,也包含 Hive 配置参数和底层引擎 MapReduce 方面的调整。
总之,Hive调优的作用:在保证业务结果不变的前提下,降低资源的使用量,减少任务的执行时间。
2.调优须知
- 对于大数据计算引擎来说:数据量大不是问题,数据倾斜是个问题。
- 复杂的HQL底层会转换成多个MapReduce Job并行或者串行执行,Job数比较多的作业运行效
率相对比较低,比如即使只有几百行数据的表,如果多次关联多次汇总,产生十几个Job,耗时很长。原因是 MapReduce 作业初始化的时间比较长。 - 在进行Hive大数据分析时,常见的聚合操作比如sum,count,max,min,UDAF等 ,不怕数据倾斜问题,MapReduce 在 Mapper阶段的预聚合操作,使数据倾斜不成问题。
- 合理的建表设计,可以达到事半功倍的效果。
- 设置合理的 MapReduce 的 Task 并行度,能有效提升性能。(比如,10w+数据量级别的计算,用100 个 reduceTask,那是相当的浪费,1个足够,但是如果是亿级别的数据量,那么1个Task又显得捉襟见肘)
- 了解数据分布,是解决数据倾斜问题的一个新思路。
- 数据量较大的情况下,慎用 count(distinct),group by 容易产生倾斜问题。
- 对小文件进行合并,是行之有效的提高调度效率的方法,假如所有的作业设置合理的文件数,对任务的整体调度效率也会产生积极的正向影响。
- 优化时把握整体,单个作业最优不如整体最优。
3.调优细节
3.1 Hive建表设计层面
Hive的建表设计层面调优,主要考虑如何合理组织数据的,方便后续的高效计算。比如建表的类型文件存储格式,是否压缩等等。
3.1.1 利用分区表优化
分区表 是在某一个或者几个维度上对数据进行分类存储,一个分区对应一个目录。如果筛选条件里有分区字段,那么 Hive 只需要遍历对应分区目录下的文件即可,不需要遍历全部数据,使得处理的数据量大大减少,从而提高查询效率。
在进行HQL查询时,会根据某一个字段进行筛选,那么将该表改为分区表,该字段即为分区字段。
-- 根据city字段进行分区
select1: select .... where country = "china"
select2: select .... where country = "china"
select3: select .... where country = "china"
select4: select .... where country = "china"
1、创建分区表
CREATE TABLE page_view(viewTime INT, userid BIGINT,
page_url STRING, referrer_url STRING,
ip STRING COMMENT 'IP Address of the User')
PARTITIONED BY(date STRING, country STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '1'
STORED AS TEXTFILE;
2、加载数据,并指定分区标志
load data local inpath '/home/bigdata/pv_2018-07-08_us.txt' into table page_view
partition(date='2018-07-08', country='US');
3、查询指定标志的分区内容
SELECT page_views.* FROM page_views
WHERE page_views.date >= '2008-03-01'
AND page_views.date <= '2008-03-31'
AND page_views.referrer_url like '%xyz.com';
总而言之:分区表的目的就是分散数据,避免查询的时候进行全表扫描。
3.1.2 利用分桶表优化
分桶表是指将数据以指定列的值为 key 进行hash, hash到指定数目的桶中,使得筛选时不用全局遍历所有的数据,只需要遍历所在桶即可,提高查询效率。分桶表的优点: 在join过程中做优化 , 支持高效采样。
例如:以 userid 这一列为 bucket 的依据,共设置 32 个 buckets
CREATE TABLE page_view(viewTime INT, userid BIGINT,
page_url STRING, referrer_url STRING,
ip STRING COMMENT 'IP Address of the User')
COMMENT 'This is the page view table'
PARTITIONED BY(dt STRING, country STRING)
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '1'
COLLECTION ITEMS TERMINATED BY '2'
MAP KEYS TERMINATED BY '3'
STORED AS SEQUENCEFILE;
--分桶的语法
-- 按照userid来分桶,按照viewtime进行桶内排序
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
两个表以相同方式(相同字段)划分桶,两个表的桶个数是倍数关系
create table order(cid int,price float) clustered by(cid) into 32 buckets;
create table customer(id int,first string) clustered by(id) into 32 buckets;
select price from order t join customer s on t.cid = s.id
通常情况下,Sampling 在全体数据上进行采样,效率自然就低,需要取访问所有数据。而如果一个表已经对某一列制作了 bucket,就可以采样所有桶中指定序号的某个桶,直接减少了访问量。
-- 采样了 page_view 中 32 个桶中的第三个桶的全部数据
SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 32);
-- 采样了 page_view 中 32 个桶中的第三个桶的一半数据
SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 64);
--采样方式:分桶采样、随机采样、数据块采样
3.1.3 选择合适的文件存储方式
在 HiveSQL 的 create table 语句中,可以使用 stored as … 指定表的存储格式。Apache Hive
支持 Apache Hadoop 中使用的几种熟悉的文件格式,比如 TextFile、SequenceFile、RCFile、
Avro、ORC、ParquetFile等。
创建表时,特别是宽表,尽量使用 ORC、ParquetFile 这些列式存储格式,因为列式存储的表,每一列的数据在物理上是存储在一起,Hive查询时会只遍历需要列数据,大大减少处理的数据量。
存储格式 | 备注 |
---|---|
TextFile | 行式存储,默认存储格式。每一行是一条记录,以换行符"\n"结尾。数据不做压缩,磁盘开销大。 |
Sequence File | 二进制文件,以key-value的形式序列化到文件中,存储方式为行式存储,使用方便、可分割、可压缩 |
RC File | 按行分块(保证同一个record在一个块上,避免读取多个block),每块按列存储。 |
ORC File(√) | 数据按行分块,块数据列式存储,每一块都存储一个索引,自身支持切片。RC的升级版,性能大幅提升。 |
Parquet File(√) | 面向列的二进制文件格式,列式存储,Parquet默认使用snappy压缩,支持Impala查询引擎 |
Hive表文件存储的建议
文件存储格式 | 应用场景 |
---|---|
TextFile | 数据贴源层ODS或者STG |
Parquet | 在Impala和Hive共享数据和元数据 |
ORC | 除文件加载数据到ORC和Hive表作为计算结果以外的其他场景 |
3.1.4 选择合适的文件压缩格式
Hive 语句最终是转化为 MapReduce 程序来执行的,而 MapReduce 的性能瓶颈在与 网络IO 和 磁盘IO,要解决性能瓶颈,最主要的是 减少数据量,对数据进行压缩是个好方式。压缩虽然是减少了数据量,但是压缩操作要消耗 CPU,但是在 Hadoop 中,往往性能瓶颈不在于 CPU,CPU 压力并不大,所以压缩充分利用集群资源。
压缩方式 | 压缩比 | 压缩速度 | 解压缩速度 | 是否可分割 |
---|---|---|---|---|
gzip | 13.4% | 21 MB/s | 118 MB/s | 否 |
bzip2 | 13.2% | 2.4MB/s | 9.5MB/s | 是 |
lzo | 20.5% | 135 MB/s | 410 MB/s | 是 |
snappy | 22.2% | 172 MB/s | 409 MB/s | 否 |
选择压缩的场景?
任务:IO密集型[推荐压缩,减少网络IO] 计算密集型[不推荐,增加CPU负担]
如何选择压缩方式?
- 压缩比率 压缩后大小/压缩前大小
- 压缩速率 压缩100M的数据花费的时间
- 是否支持split 支持分割的文件可并行有多个mapper程序处理大数据文件,大多数文件不支持可分割是因为这些文件只能从头开始读。
压缩使用:
# Map输出结果以Gzip压缩
# 启用map端输出压缩
set mapred.map.output.compress=true
# 默认值是org.apache.hadoop.io.compress.DefaultCodec
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec
HQL在底层会转换为MR任务,如果该文件使用ORC存储,snappy压缩(不支持文件切分操作),所以压缩文件只会被一个任务所读取,如果该压缩文件很大,处理该任务的Map需要的时间远远高于读取普通文件的Map时间,出现数据倾斜的问题。为了避免该问题的发生, 需要在数据压缩的时候采用bzip2和Zip等支持文件分割的压缩算法。
在实际开发过程中,Hive表的数据存储格式一般选择: orc或parquet。压缩方式一般选择snappy、lzo等。
3.2 HQL语法和运行参数层面
HQL语法和运行参数层面,主要讨论的如何写出高效的HQL,以及如何利用一些控制参数来调优 HQL的执行。
3.2.1 查看Hive执行计划
Hive 的 SQL 语句在执行之前需将 SQL 语句转换成 MapReduce 任务,因此需要了解具体的转换过程,可以在 SQL 语句中输入如下命令查看具体的执行计划。
## 查看执行计划,添加extended关键字可以查看更加详细的执行计划
explain [extended] query
3.2.2 列裁剪
列裁剪就是在查询时只读取需要的列,分区裁剪只读取需要的分区。当列很多或者数据量很大时, 如果 select * 或者不指定分区,全列扫描和全表扫描效率都很低。
Hive 在读数据的时候,可以只读取查询中所需要用到的列,而忽略其他的列。这样做可以节省读取数据的开销:中间表存储开销和数据整合开销。
## 列裁剪,取数只取查询中需要用到的列,默认是true
set hive.optimize.cp = true;
3.2.3 谓词下推
谓词下推(Predicate Pushdown(PPD):简言之,就是在不影响结果的情况下,将 SQL 语句中的 where 谓词逻辑都尽可能提前执行,减少下游处理的数据量。对应逻辑优化器是 PredicatePushDown。
## 默认是true
set hive.optimize.ppd=true;
--示例SQL
-- 优化前的SQL
select a.*, b.* from a join b on a.id = b.id where b.age > 20
-- 优化后的SQL
select a.*, c.* from a join (select * from b where age > 20) c on a.id = c.id;
3.2.4 分区裁剪
列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。当列很多或者数据量很大时, 如果 select * 或者不指定分区,全列扫描和全表扫描效率都很低。 在查询的过程中只选择需要的分区,可以减少读入的分区数目,减少读入的数据量。
Hive 中与分区裁剪优化相关的则是:
## 默认是true
set hive.optimize.pruner=true;
3.2.5 合并小文件
Map输入合并
在执行 MapReduce 程序的时候,一般情况是一个文件的一个数据分块需要一个 mapTask 来处理。但是如果数据源是大量的小文件,这样就会启动大量的 mapTask 任务,这样会浪费大量资源。可以将输入的小文件进行合并,从而减少 mapTask 任务数量。
# Map端输入、合并文件之后按照block的大小分割(默认)
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
# Map端输入,不合并
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
Map/Reduce输出合并
大量的小文件会给 HDFS 带来压力,影响处理效率。可以通过合并 Map 和 Reduce 的结果文件来消除影响。
# 设置map端输出进行合并, 默认值为true
set hive.merge.mapfiles=true;
# 设置reduce端输出进行合并,默认值为false
set hive.merge.mapredfiles=true;
# 合并文件的大小,默认值为256000000(256M)
set hive.merge.size.per.task=256000000;
# 每个Map最大输入大小(决定了合并后的文件数据量)
set mapred.max.split.size=256000000;
# 一个节点上split的最少值(决定了多个DataNode上的文件是否要合并)
set mapred.min.split.size.per.node=1; // 服务器节点
# 一个机架上split的最少值(决定多个机架上的文件是否需要合并)
set mapred.min.split.size.per.rack=1; // 服务器机架
3.2.6 合理设置MapTask并行度
Map数过大:当输入文件特别大,MapTask 特别多,每个计算节点分配执行的 MapTask 都很多,这时候可以考虑减少 MapTask 的数量。增大每个 MapTask 处理的数据量。而且 MapTask 过多,最终生成的结果文件数也太多。
Map数太小:当输入文件都很大,任务逻辑复杂,MapTask 执行非常慢的时候,可以考虑增加 MapTask 数,来使得每个 MapTask 处理的数据量减少,从而提高任务的执行效率。
一个MapReduce Job 的 MapTask 数量是由输入分片 InputSplit 决定的。而输入分片是由 FileInputFormat.getSplit() 决定的。一个输入分片对应一个 MapTask,而输入分片是由三个参数决定的:
参数 | 默认值 | 意义 |
---|---|---|
dfs.blocksize | 128M | HDFS默认数据块大小 |
mapreduce.input.fileinputformat.split.minsize | 1M | 最小分片大小(MR) |
mapreduce.input.fileinputformat.split.maxsize | 256M | 最大分片大小(MR) |
输入分片大小的计算:针对数据是原始数据的情况,最终调整的splitsize大小最好是blockSize的整数倍
long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
两种经典的控制MapTask的个数方案:减少MapTask数 或者 增加MapTask数
- 1.减少 MapTask 数是通过合并小文件来实现,主要是针对数据源
- 2.增加 MapTask 数可以通过控制上一个 job 的 reduceTask 个数
3.2.7 合理设置ReduceTask并行度
如果 ReduceTask 数量过多,一个 ReduceTask 会产生一个结果文件,这样就会生成很多小文件,如果这些结果文件会作为下一个 Job 的输入,则会出现小文件需要进行合并的问题,而且启动和初始化 ReduceTask 需要耗费资源。
如果 ReduceTask 数量过少,这样一个 ReduceTask 就需要处理大量的数据,并且还有可能会出现数据倾斜的问题,使得整个查询耗时长。 默认情况下,Hive 分配的 reducer 个数由下列参数决定:
# 每个reduceTask处理的最大数据量(参数1)
set hive.exec.reducers.bytes.per.reducer =256 M (default)
# reduceTask的数量上限(参数2)
set hive.exec.reducers.max = 1009 (default)
# reduceTask的个数 设置以下参数:上述两个参数失效 最高优先级(参数3)
set mapreduce.job.reduces = 1(default)
# 依据经验,可以将 参数2 设定为 M * (0.95 * N) (N为集群中 NodeManager 个数)。一般来说,NodeManage 和 DataNode 的个数是一样的。
3.2.8 Join优化
Join的优化整体原则:
-
优先过滤后再进行Join操作,最大限度的减少参与join的数据量
-
小表join大表,最好启动mapjoin,hive自动启用mapjoin, 小表不能超过25M,可以更改
-
Join on的条件相同的话,最好放入同一个job,并且join表的排列顺序从小到大:select a.*,
b.*, c.* from a join b on a.id = b.id join c on a.id = c.id
-
如果多张表做join, 如果多个链接条件都相同,会转换成一个Job。
-
大表join大表: 空key过滤与空key转换(加随机值)
3.2.9 启用 MapJoin
MapJoin 是将join双方比较小的表直接分发到各个map进程的内存中,在 map 进程中进行join 操作,这样就不用进行 reduce 操作,从而提高了速度。只有 join 操作才能启用 MapJoin。MapJoin是通过两个只有Map阶段的Job完成一个Join操作。第一个Job会读取小表的数据,将其制作为Hash Table,并上传至Hadoop分布式缓存(本质是HDFS)。第二个Job会先从分布式缓存中读取小表数据,并缓存在MapTask的内存中,然后扫描大表,在Map端完成相关操作。
## 是否根据输入小表的大小,自动将reduce端的common join 转化为map join,将小表刷入内存中。
## 对应逻辑优化器是MapJoinProcessor
set hive.auto.convert.join = true;
## 刷入内存表的大小(字节) 25M[join 小表的大小]
set hive.mapjoin.smalltable.filesize = 25000000;
## hive会基于表的size自动的将普通join转换成mapjoin
set hive.auto.convert.join.noconditionaltask=true;
## 多大的表可以自动触发放到内层LocalTask中,默认大小10M
set hive.auto.convert.join.noconditionaltask.size=10000000;
MapJoin 特别适合大小表 join的情况。在Hive join场景中,一般总有一张相对小的表和一张相对大的表,小表叫 build table,大表叫 probe table。Hive 在解析带 join 的 SQL 语句时,会默认将最后一个表作为 probe table,将前面的表作为 build table 并试图将它们读进内存。如果表顺序写反,probe table 在前面,引发 OOM 的风险就高了。在维度建模数据仓库中,事实表就是 probe table,维度表就是 build table。这种 Join 方式在 map 端直接完成 join 过程,消灭了 reduce,效率很高。而且 MapJoin 还支持非等值连接。
手动开启mapJoin
--SQL方式,在SQL语句中添加MapJoin标记(mapjoin hint)
--将小表放到内存中,省去shffle操作
-- 在没有开启mapjoin的情况下,执行的是reduceJoin
SELECT /*+ MAPJOIN(smallTable) */ smallTable.key, bigTable.value FROM
smallTable JOIN bigTable ON smallTable.key = bigTable.key;
/*+mapjoin(smalltable)*/
3.2.10 Sort-Merge-Bucket(SMB) Map Join
Sort Merge Bucket Map Join是解决大表join大表的计算效率。使用这个技术的前提是所有的表都必须是分桶表(bucket)和分桶排序的(sort)。
## 当用户执行bucket map join的时候,发现不能执行时,禁止查询
set hive.enforce.sortmergebucketmapjoin=false;
## 如果join的表通过sort merge join的条件,join是否会自动转换为sort merge join
set hive.auto.convert.sortmerge.join=true;
## 当两个分桶表 join 时,如果 join on的是分桶字段,小表的分桶数是大表的倍数时,可以启用mapjoin 来提高效率。
# bucket map join优化,默认值是 false
set hive.optimize.bucketmapjoin=false;
## bucket map join 优化,默认值是 false
set hive.optimize.bucketmapjoin.sortedmerge=false;
3.2.11 Join数据倾斜优化
Hive解决Join引起的数据倾斜的参数设置
# join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
set hive.skewjoin.key=100000;
# 如果是join过程出现倾斜应该设置为true
set hive.optimize.skewjoin=false;
# 如果开启了,在 Join 过程中 Hive 会将计数超过阈值 hive.skewjoin.key(默认100000)的倾斜 key 对应的行临时写进文件中,然后再启动另一个 job 做 map join 生成结果。
# 控制第二个job的mapper数量,默认10000
set hive.skewjoin.mapjoin.map.tasks=10000;
3.2.12 CBO优化
在join是表加载顺序:前面的表都会被加载到内存中。后面的表进行磁盘扫描。
select a.*, b.*, c.* from a join b on a.id = b.id join c on a.id = c.id;
CBO,成本优化器,代价最小的执行计划就是最好的执行计划。
Hive 在提供最终执行前,优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成 的。根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连 接,并行度等等。
## 是否启动CBO优化
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
3.2.13 笛卡尔积
当 Hive 设定为严格模式(hive.mapred.mode=strict)时,不允许在 HQL 语句中出现笛卡尔积,这实际说明了 Hive 对笛卡尔积支持较弱。因为找不到 Join key,Hive 只能使用 1 个 reducer 来完成笛卡尔积。
在大表和小表做笛卡尔积时,规避笛卡尔积的方法是, 给 Join 添加一个 Join key,原理很简单:将小表扩充一列 join key,并将小表的条目复制数倍,join key 各不相同;将大表扩充一列 join key 为随机数。
精髓就在于复制几倍,最后就有几个 reduce 来做,而且大表的数据是前面小表扩张 key 值范围里面随机出来的,所以复制了几倍 n,就相当于这个随机范围就有多大 n,那么相应的,大表的数据就被随机的分为了n 份。并且最后处理所用的reduce数量也是n,而且也不会出现数据倾斜。
3.2.14 Group By优化
默认情况下,Map 阶段同一个 Key 的数据会分发到一个 Reduce 上,当一个 Key 的数据过大时会产生数据倾斜。进行 group by 操作时可以从以下两个方面进行优化:
1.Map端部分聚合
## 开启Map端聚合参数设置
set hive.map.aggr=true;
# 设置map端预聚合的行数阈值,超过该值就会分拆job,默认值100000
set hive.groupby.mapaggr.checkinterval=100000
2.负载均衡
策略就是把 MapReduce 任务拆分成两个:第一个先做预汇总,第二个再做最终汇总
# 自动优化,有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata=false;
# 当该参数设置为true时:
# 1、在第一个 MapReduce 任务中,map 的输出结果会随机分布到 reduce 中,每个 reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的`group by key`有可能分发到不同的 reduce 中,从而达到负载均衡的目的;
# 2、第二个 MapReduce 任务再根据预处理的数据结果按照 group by key 分布到各个 reduce 中,最后完成最终的聚合操作。
3.2.15 Count Distinct优化
当要统计某一列去重时,如果数据量很大,count(distinct) 就会非常慢,原因与 order by 类似, count(distinct) 逻辑只会有很少的 reducer 来处理。这时可以用 group by 来改写
-- 优化前 (只有一个reduce,先去重再count负担比较大)
select count(distinct id) from tablename;
-- 优化后(启动两个job,一个job负责子查询(可以有多个reduce),另一个job负责count(1))
select count(1) from (select id from tablename group by id) tmp;
3.2.16 多重模式
如果一个 HQL 底层要执行 10 个 Job,那么能优化成 8 个一般来说,肯定能有所提高,多重插入就是一个非常实用的技能。一次读取,多次插入,有些场景是从一张表读取数据后,要多次利用,这时可以使 用 multi insert 语法:
from sale_detail
insert overwrite table sale_detail_multi partition (sale_date='2019',
region='china' )
select shop_name, customer_id, total_price where .....
insert overwrite table sale_detail_multi partition (sale_date='2020',
region='china' )
select shop_name, customer_id, total_price where .....;
-- 1、一般情况下,单个SQL中最多可以写128路输出,超过128路,则报语法错误。
-- 2、在一个multi insert中:
-- 对于分区表,同一个目标分区不允许出现多次。
-- 对于未分区表,该表不能出现多次。
-- 3、对于同一张分区表的不同分区,不能同时有insert overwrite和insert into操作,否则报错返回。
3.3 Hive架构层面
3.3.1 启用本地抓取
Hive 从 HDFS 中读取数据,有两种方式:启用MapReduce读取 和 直接抓取。
# 默认more
set hive.fetch.task.conversion=more;
# Expects one of [none, minimal, more]
0. none : disable hive.fetch.task.conversion
1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
3.3.2 本地执行优化
启动本地模式(理解就是跑单机)涉及到三个参数:
# 打开hive自动判断是否启动本地模式的开关
set hive.exec.mode.local.auto=true;
# map任务数最大值,不启用本地模式的task最大个数
set hive.exec.mode.local.auto.input.files.max=4;
# map输入文件最大大小,不启动本地模式的最大输入文件大小
set hive.exec.mode.local.auto.inputbytes.max=134217728;
3.3.3 JVM重用
默认情况下,MapReduce 中一个 MapTask 或者 ReduceTask 就会启动一个 JVM 进程,一个 Task 执行完毕后,JVM 进程就会退出。如果任务花费时间很短,又要多次启动JVM 的情况下,JVM 的启动时间会变成一个比较大的消耗,这时,可以通过重用 JVM 来解决。
JVM重用有个缺点,开启JVM重用
# 一个jvm运行5个task
set mapred.job.reuse.jvm.num.tasks=5;
3.3.4 并行执行
默认情况下,一次只执行一个阶段。但是,如果某些阶段不是互相依赖,是可以并行执行的。多阶段并行是比较耗系统资源的。
## 可以开启并发执行。
set hive.exec.parallel=true;
## 同一个sql允许最大并行度,默认为8。
set hive.exec.parallel.thread.number=16;
3.3.5 推测执行
在分布式集群环境下,因为程序Bug,负载不均衡或者资源分布不均等原 因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。
Hadoop采用了推测执行机制,根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
# 启动mapper阶段的推测执行机制
set mapreduce.map.speculative=true;
# 启动reducer阶段的推测执行机制
set mapreduce.reduce.speculative=true;
3.3.6 Hive严格模式
所谓严格模式,就是强制不允许用户执行有风险的 HiveQL 语句,一旦执行会直接失败。但是Hive中为了提高SQL语句的执行效率,可以设置严格模式,充分利用Hive的某些特点。
## 设置Hive的严格模式
set hive.mapred.mode=strict;
# 动态分区使用非严格模式
set hive.exec.dynamic.partition.mode=nostrict;
# 当设置为严格模式时
1、对于分区表,必须添加where对于分区字段的条件过滤
select * from student_ptn where age > 25
2、order by语句必须包含limit输出限制
select * from student order by age limit 100;
3、限制执行笛卡尔积的查询
select a.*, b.* from a, b;
4、在hive的动态分区模式下,如果为严格模式,则必须需要一个分区列是静态分区
4.调优案例
4.1 日志表和用户表做连接
users表有600W+(假设有5G)的记录,将user表数据分发到所有的map上也是个不小的开销,而且MapJoin不支持这么大的小表。如果用普通的Join,会碰到数据倾斜问题。
-- 简言之: 事实表(大表)和维度表(小表)做连接
select * from log a left outer join users b on a.user_id = b.user_id;
-- 思路
-- 1. users表中,并不是所有的用户都会产生日志(百度所有的用户并不是都有搜索记录)
-- 2. 如果某个用户使用了百度,大概率今天的搜索记录不止一个
-- 1.给users表做过滤,但是需要过滤条件,先得到过滤层的user_id
select distinct user_id from log; ==> a
-- 2.获取所有日志记录的用户信息
select /*+mapjoin(a)*/ b.* from a join users b on a.user_id = b.user_id; ==> b
-- 3.让c表和log表做mapjoin
select /*+mapjoin(c)*/ c.*,d.* from c join log d on c.user_id=d.user_id;
-- 改进方案
select /*+mapjoin(x)*/ * from log a
left outer join (
select /*+mapjoin(c)*/ d.*
from ( select distinct user_id from log ) c join users d on c.user_id =
d.user_id
) x
on a.user_id = x.user_id;
4.2 求连续七天发朋友圈的用户
每天都要求 微信朋友圈 过去连续7天都发了朋友圈的小伙伴有哪些?
假设每个用户每发一次朋友圈都记录了一条日志。每一条朋友圈包含的内容:
日期,用户ID,朋友圈内容.....
dt, userid, content, .....
如果 微信朋友圈的 日志数据,按照日期做了分区。
2020-07-06 file1.log(可能会非常大)
2020-07-05 file2.log
.......
实现SQL:
-- 大表 join 大表 join 6次
-- 昨天和今天
select a.userid from table a join table b on a.userid = b.userid;
-- 上一次join的结果 和 前天 join
.....
-- 上一次join的结构 和 大前天 join
.....
-- 解决方案:位图法 BitMap
假设微信有10E用户,每天生成一个长度为10E的二进制数组,每个位置要么是0,要么是1,如果为1,代表该用户当天发了朋友圈。如果为0,代表没有发朋友圈。
然后每天:10E / 8 / 1024 / 1024 = 119M左右
求Join实现:两个数组做 求且、求或、异或、求反、求新增