Hive/MaxCompute SQL性能优化(三):数据倾斜优化实战

news2024/11/24 17:23:32

SQL性能优化系列:

Hive/MaxCompute SQL性能优化(一):什么是数据倾斜

Hive/MaxCompute SQL性能优化(二):如何定位数据倾斜


前面介绍了如何定位数据倾斜,本文介绍如果遇到各种数据倾斜的情况该怎样优化代码。

Map长尾优化

一、Map读取数据量不均匀

小文件多,数据分布不均匀,使用下面的参数设置小文件合并,让每个mapper实例读取数据量大致相同。

set odps.sql.mapper.merge.limit.size=64; -- 小于阈值的文件将会合并,默认64mb
set odps.sql.mapper.split.size=256; --map最大输入数据量,默认256mb,影响mapper数量

当出现文件过大,mapper很少,导致map端读取数据很慢的时候,可以适量缩小split.size的值,以达到提高map实例数量的效果,提高数据读取效果。

二、Map实例读取的块中的数据分布不均匀

出现这种情况的话,可以使用distribute by rand()打乱数据,将数据随机分发。

需要注意,此参数不可随意使用,只有出现块中数据分布不均时候才建议使用,否则会适得其反。

使用此参数后,map端不要再做复杂join、聚合,避免map端长尾,并且会带来reduce端的资源紧张,可以适度增加reducer的个数。

set odps.sql.reducer.instances=500; -- 增大reducer个数
select id,count(*) cnt
from (
    select id,name
    from tbl
    distribute by rand()
)
group by id
;

Join端长尾优化

一、关联键存在大量空值引发长尾

首先定位是哪个关联键中存在空值,然后使用rand()随机值替换join字段空值,打乱空值的分布,提高计算并行度,并且由于空值本身关联无意义,因此不会影响数据结果。

select ...
from a
left join b
on coalesce(a.id,rand()*9999) =  coalesce(b.id,rand()*9999) --空值替换成随机值,不影响计算结果,可以将空值打散到多个joinner中

二、大表关联小表,使用mapjoin优化避免长尾

大表关联小表时,可以使用mapjoin hint,将小表广播到map端,转换成hash table加载到内存中,在mapper端和大表的分散数据做笛卡尔积,省去大表的shuffle时间,避免出现长尾。

注意,小表的定义有阈值限制,并且小表只能作为从表,不能作为主表。

且map join没有reduce任务,所以map直接输出结果,即有多少个map任务就会产生多少个结果文件

set odps.sql.mapjoin.memory.max=512; -- 最大2048MB,小表过大会影响性能
select /*+ mapjoin(b,c)*/  --mapjoin hint 定义小表,多个用逗号分隔
...
from t0 a
left join t1 b 
on a.id = b.id
left join t2 c
on a.id = c.id

map join原理图:

三、大表关联大表,使用skewjoin优化避免长尾

大表关联大表出现数据倾斜时,可以将大表数据拆分,对热点数据使用map join,非热点数据使用merge join,然后将结果合并。

如何拆分热点数据:引入一个aggregate,提取topK热点key拆分数据,可以通过参数调整top key个数:set odps.optimizer.skew.join.topk.num=20;

若执行计划中出现topk_agg关键字,则表示skewjoin生效。

skew join原理图:

下面三个级别的hint,指定热点信息越详细,效率越高。

-- 性能效率 方法1<方法2<方法3
-- 方法1:hint表名
select /*+ skewjoin(a)*/ ... from t0 a join t1 b on a.id = b.id and a.code = b.code;
-- 方法2:hint表名和认为可能产生倾斜的列,下面认为a的id和code列存在倾斜
select /*+ skewjoin(a(id,code))*/ ... from t0 a join t1 b on a.id = b.id and a.code = b.code;
-- 方法3:hint表名和列,并提供产生倾斜的列的值,如果是string类型需要加引号
-- 下面case认为(a.id=1 and a.code='xxx')和(a.id=3 and a.code='yyy')的值出现倾斜
select /*+ skewjoin(a(id,code)((1,'xxx'),(3,'yyy')))*/ ... from t0 a join t1 b on a.id = b.id and a.code = b.code;

四、大表关联中表,使用distributed mapjoin优化

Distributed MapJoin是MapJoin的升级版,适用于小表Join大表的场景,二者的核心目的都是为了减少大表侧的Shuffle和排序。

注意事项:

  1. Join两侧的表数据量要求不同,大表侧数据在10 TB以上,小表侧数据在[1 GB, 100 GB]范围内。

  1. 小表侧的数据需要均匀分布,没有明显的长尾,否则单个分片会产生过多的数据,导致OOM(Out Of Memory)及RPC(Remote Procedure Call)超时问题。

  1. SQL任务运行时间在20分钟以上,建议使用Distributed MapJoin进行优化。

  1. 由于在执行任务时,需要占用较多的资源,请避免在较小的Quota组运行。

使用方式:

在select语句中使用Hint提示

/*+distmapjoin(<table_name>(shard_count=<n>,replica_count=<m>))*/

并发数=shard_count * replica_count

参数说明:

  • table_name:目标表名。

  • shard_count=<n>:设置小表数据的分片数,小表数据分片会分布至各个计算节点处理。n即为分片数,一般按奇数设置。

说明
shard_count值建议手动指定, shard_count值可以根据小表数据量来大致估算,预估一个分片节点处理的数据量范围是[200 MB, 500 MB]。
shard_count设置过大,性能和稳定性会受影响; shard_count设置过小,会因内存使用过多而报错。
  • replica_count=<m>:设置小表数据的副本数。m即为副本数,默认为1。

说明 为了减少访问压力以及避免单个节点失效导致整个任务失败,同一个分片的数据,可以有多个副本。当并发过多,或者环境不稳定导致运行节点频繁重启,可以适当提高 replica_count,一般建议为2或3。

语法示例

-- 推荐,指定shard_count(replica_count默认为1)
/*+distmapjoin(a(shard_count=5))*/

-- 推荐,指定shard_count和replica_count
/*+distmapjoin(a(shard_count=5,replica_count=2))*/

-- distmapjoin多个小表
/*+distmapjoin(a(shard_count=5,replica_count=2),b(shard_count=5,replica_count=2)) */

-- distmapjoin和mapjoin混用
/*+distmapjoin(a(shard_count=5,replica_count=2)),mapjoin(b)*/

五、动态过滤器 Dynamic Filter

适用于大表和中小表关联,且小表过滤性高的情况,使用动态过滤器,可减少大表shuffle数据量,以此提高性能。

如图所示,两表join,其中A为小表,B为大表,依据A表的key生成filter,推送至shuffle/join之前,提前过滤B表(大表)的数据量,可以减少B表shuffle的数据量从而提高性能。

使用hint开启动态过滤功能:

-- HINT格式为/*+dynamicfilter(Producer, Consumer1[, Consumer2,...])*/,允许一个生产者过滤多个消费者
select /*+dynamicfilter(A, B)*/ * from (table1) A join (table2) B on A.a= B.b;

注意,当关连键为分区字段时,可以开启动态分区裁剪,在读取完整数据前将无用分区裁剪掉。

下面的例子,如果不使用动态分区裁剪,则会将B表中的全部分区读取后再和A表关联,即使此时开启了动态过滤器,也在前期浪费了大量的读取时间。

而打开动态分区裁剪功能后,由于A表中的a列值只有20200701,B表中的20200702和20200703分区会被裁剪掉,既节省了资源,也降低了作业运行时长。

-- 开启动态分区裁剪功能
set odps.optimizer.dynamic.filter.dpp.enable=true;

--A为非分区表,表中a列的值为20200701。
--B为分区表,表中ds列的值包含3个分区20200701、20200702、20200703。
select * from (table1) A join (table2) B on A.a= B.ds;

开启动态过滤器或动态分区裁剪后,若Logview中出现类似DynamicFilterConsumer1的算子,说明动态过滤器已生效。

Reduce长尾

一、Count Distinct引发长尾

count distinct使得map端不做预聚合,group by 和distinct 字段的数据全量shuffle,容易引发reduce端长尾,若一段脚本中出现多个count distinct,则数据膨胀N倍,长尾也是N倍。

优化方案:

去重字段组合到group by 字段中,打散分组聚合去重,然后统计group by 字段的数量。

select  group_id
        ,app_id
        ,sum(case when 7d_cnt>0 then 1 else 0 end) AS 7d_uv, -- 7日内UV
        ,sum(case when 14d_cnt>0 then 1 else 0 end) AS 14d_uv --14日内UV
from    (
        select   
            group_id,
            app_id,
            user_id, --按user_id去重
            count(case when dt>='${7d_before}' then user_id else null end) as 7d_cnt, -- 7日内各用户的点击量
            count(case when dt>='${14d_before}' then user_id else null end) as 14d_cnt --14日内各用户的点击量
        from tbl
        where dt>='${14d_before}'
        group by 
            group_id,
            app_id,
            user_id
        ) a
group by group_id,
         app_id

前面的内容中有过详细示例:https://blog.csdn.net/wsdc0521/article/details/117885554

二、动态分区写入数据倾斜

动态分区是指在往分区表里插入数据时,可以在分区中指定分区列,但不指定具体分区值,sql执行完才确定分区值,运行时动态批量写入。

当有K个Map Instance,N个目标分区,极端情况下会产生K* N个小文件。

MaxCompute对动态分区的处理是引入额外的一级Reduce Task,将同一个分区的数据,尽可能交给一个instance来处理写入,如有热点分区则发生长尾,优点是产生小文件少,但额外引入一级Reduce操作也耗费计算资源,因此如何保持着两者的平衡,需要认真的权衡。

若目标分区少,则没有小文件的情况存在,开启此功能不仅会增加资源浪费,还会降低性能,可以选择关闭此功能,反而可以提升性能。

-- 当reduce和动态分区数少的时候可以关闭,否则会生成大量小文件
-- 关闭此功能
set odps.sql.reshuffle.dynamicpt=false;

三、GroupBy数据倾斜

数据倾斜引发的GroupBy聚合长尾,可采用groupby.skewindata参数优化:

set odps.sql.groupby.skewindata=true;

优化原理为引入两级reduce

  1. 第一次shuffle key是group key和一个随机数(取模),将数据打散,多个reduce并发做部分聚合;

  1. 第二次shuffle key是group key,相同的key分发到同一个reduce做最终聚合;

四、ROW_NUMBER(计算TopN)

常用于取TopN的场景中,当数据量巨大时候,可以通过两阶段聚合,增加随机列或拼接随机数,将其作为分组中一参数,提升查询性能。

示例:

为使Map阶段中各分组数据尽可能均匀,增加随机列,将其作为分组中一参数。

SELECT  main_id
        ,type
  FROM  (SELECT  main_id
                 ,type
                 ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
            FROM (SELECT  main_id
                          ,type
                    FROM(SELECT  main_id
                                 ,type
                                 ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                           FROM  (SELECT  main_id
                                          ,type
                                          ,ceil(10 * rand()) AS src_pt
                                          FROM    data_demo2
                                  )
                                ) B
                        WHERE B.rn <= 10
                    )
        ) A
WHERE   A.rn <= 10;

参数调优

一、Map端

--调整cpu数量,默认100,在[50,800]之间调整,一般无需改动
set odps.sql.mapper.cpu=100;

-- 当map阶段instance有wirte dumps,可适当增加内存
set odps.sql.mapper.memory=1024;

-- 设定一个map的最大数据输入量,调小该值可以增加mapper数量
set odps.sql.mapper.split.size=256;

二、Join端

-- 调整cpu数量,默认100,在[50,800]之间调整,一般无需改动
set odps.sql.joiner.cpu=100;

-- 当join阶段instance有wirte dumps,可适当增加内存
set odps.sql.joiner.memory=1024;

-- 调整instance数量,默认-1,在[0,2000]之间调整
set odps.sql.joiner.instances=-1;

三、Reduce端

-- 调整cpu数量,默认100,在[50,800]之间调整,一般无需改动
set odps.sql.reducer.cpu=100;

-- 当reduce阶段instance有wirte dumps,可适当增加内存
set odps.sql.reducer.memory=1024;

-- 调整instance数量,默认-1,在[0,2000]之间调整
set odps.sql.reducer.instances=-1;
  • 当日志看到dumps关键词,则说明出现了文件dump,若比较严重,可以适当调大内存参数,但不能太大,否则集群资源不足时,任务会一直排队等待资源。

  • 当reduce无长尾时,但耗时长,可适当增大reduce并发数。

  • 当map无长尾,但耗时长,可适当调小mapper.split.size的值,以增加mapper数量,提高并发。

希望本文对你有帮助,请点个赞鼓励一下作者吧~ 谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/154515.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ArcGIS如何将Excel表格转换为SHP格式

概述数据的获取渠道是多种多样的&#xff0c;获取的数据格式也是多种多样&#xff0c;作为一名GISer&#xff0c;需要熟练掌握各种格式的数据之间的转换&#xff0c;例如本文要介绍的Excel格式的数据&#xff0c;经常会遇到&#xff0c;这里为大家介绍一下转换方法&#xff0c;…

区块链基础知识(二)

密码学与安全技术 参考书籍 《区块链原理、设计与应用》 Hash算法 加解密算法 混合加密机制 离散对数与Diffie-Hellman秘钥交换协议 消息认证码 数字签名 PKI体系 PKI基本组件 证书签发 证书的撤销 Merkle tree结构 默克尔树逐层记录哈希值的特点&#xff0c;让它具有了一些独特…

【我的渲染技术进阶之旅】关于C++轻量级界面开发框架Dear ImGui介绍

文章目录一、怎么知道ImGui的1.1 Filament中有使用ImGui1.2 其他很多渲染框架都有使用ImGui二、ImGui介绍2.1 ImGui风格2.2 Imgui介绍2.2.1 Imgui简介2.2.2 Imgui用法2.2.3 Demo示例2.2.4 集成2.2.5 更多案例2.3 查看Imgui实例源代码2.3.1 运行demo2.3.2 项目结构分析2.3.3 示例…

TCP/IP网络编程(2)——套接字类型与协议设置

文章目录二、套接字类型与协议设置2.1 套接字协议及数据传输特性2.1.1 创建套接字2.1.2 协议族&#xff08;Protocol Family&#xff09;2.1.3 套接字类型&#xff08;Type&#xff09;2.1.4 套接字类型1&#xff1a;面向连接的套接字&#xff08;SOCK_STREAM&#xff09;2.1.5…

RHCE学习笔记-133-2

rpm and kickstart The RPM Way 不会有互动事件 可以适用在所有软件,如kernerl和其他额外的软件都可以以rpm的形式 不需要安装前面的版本才能安装后面的版本 RPM Packge manager RPM components local database /var/lib/rpm rpm and related executables package files primar…

大数据NiFi(十三):NiFi监控

文章目录 NiFi监控 一、处理器状态指示有如下几种情况 二、对于每个组的监控情况如下

CMMI之客户验收

客户验收&#xff08;Customer Acceptance, CA&#xff09;是指客户依据合同对产品进行审查和测试&#xff0c;确保产品满足客户需求。客户验收过程域是SPP模型的重要组成部分。本规范阐述了客户验收的规程&#xff0c;该规程的“目标”、“角色与职责”、“启动准则”、“输入…

Spring 源码解析~13、Spring 中的钩子函数汇总

Spring 中的钩子函数汇总 一、生命周期总览 二、BeanDefinition 生成与注册阶段 钩子执行顺序与博文顺序一致&#xff0c;即 1->n 1、EmptyReaderEventListener#defaultsRegistered 触发点&#xff1a;创建 BeanDefinitionParserDelegate 委派类时触发解释&#xff1a;通知…

本立道生:必备的基础知识

通过前面两节课的内容&#xff0c;我带领大家熟悉了一下 Visual Studio C 开发环境的必备知识&#xff0c;虽然还有很多关于 Visual Studio 的重要知识没有介绍&#xff0c;但为了让你尽快进入 C 开发环节&#xff0c;及早获得开发程序的愉悦&#xff0c;我们暂时只介绍这些必备…

【数据结构】5.5 遍历二叉树和线索二叉树

5.5.1 遍历二叉树 遍历定义 顺着某一条搜索路径巡访二叉树中的每个结点&#xff0c;使得每个结点均被访问依次&#xff0c;而且仅被访问一次&#xff08;又称周游&#xff09;。访问的含义很广&#xff0c;可以是对结点作各种处理&#xff0c;如&#xff1a;输出结点的信息&a…

Centos7开启SSH连接配置

1、查看是否已安装openssh-server&#xff1a; [rootlocalhost ~]# yum list installed | grep openssh-server 如果有信息说明已安装了openssh-server&#xff0c;如果输出没有任何结果&#xff0c;说明没有安装。 2、安装openssh-server&#xff08;如果已安装&#xff0c…

微信小程序(学习笔记篇)

基本项目结构 pages用来存放所有小程序的页面utils 用来存放工具性质的模块&#xff08;例如:格式化时间的自定义模块)app.js小程序项目的入口文件app.json 小程序项目的全局配置文件app.wXss小程序项目的全局样式文件project.config.json项目的配置文件sitemap.json用来配置小…

买卖股票的最佳时机 II -数学推导证明贪心思路 -leetcode122

问题说明来源leetcode 一、问题描述: 122. 买卖股票的最佳时机 II 难度中等1941 给你一个整数数组 prices &#xff0c;其中 prices[i] 表示某支股票第 i 天的价格。 在每一天&#xff0c;你可以决定是否购买和/或出售股票。你在任何时候 最多 只能持有 一股 股票。你也可…

Spark Core----RDD详解

为什么需要RDD 分布式计算需要&#xff1a; 分区控制&#xff08;多台机器并行计算&#xff0c;将一份数据分成多份&#xff0c;在不同机器上执行&#xff09;Shuffle控制&#xff08;不同分区数据肯定需要进行相关的关联&#xff0c;不同分区进行数据传输叫Shuffle控制&…

分享77个NET源码,总有一款适合您

NET源码 分享77个NET源码&#xff0c;总有一款适合您 NET源码下载链接&#xff1a;https://pan.baidu.com/s/1vhXwExVAye5YrB77Vxif8Q?pwdzktx 提取码&#xff1a;zktx 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&#xf…

Html 3D旋转相册制作

程序示例精选 Html 3D旋转相册制作 如需安装运行环境或远程调试&#xff0c;见文章底部微信名片&#xff0c;由专业技术人员远程协助&#xff01; 前言 这篇博客针对<<Html 3D旋转相册制作>>编写代码&#xff0c;代码整洁&#xff0c;规则&#xff0c;易读。 学习…

zabbix监控主机

zabbix官网 zabbix分为zabbix server&#xff08;zabbix服务端&#xff0c;用来展示监控的&#xff09;和zabbix-agent&#xff08;zabbix客户端用来收集数据的&#xff09; zabbix-agent客户端有两种工作模式&#xff0c;被动模式&#xff08;由zabbix服务来采集数据&#xff…

二十二、Kubernetes中Pod调度第四篇污点(容忍)调度详解、实例

1、概述 在默认情况下&#xff0c;一个Pod在哪个Node节点上运行&#xff0c;是由Scheduler组件采用相应的算法计算出来的&#xff0c;这个过程是不受人工控制的。但是在实际使用中&#xff0c;这并不满足的需求&#xff0c;因为很多情况下&#xff0c;我们想控制某些Pod到达某…

魔方爱好者快来康康,困难的平面魔方来了!

前言和效果图我今天看到一个网站&#xff0c;就是关于魔方的&#xff0c;里面二阶魔方引起了我的兴趣。https://rubiks-cube-solver.com/2x2/进去后你们可以看到&#xff0c;二阶魔方的平面展开图&#xff0c;复原也更加困难。虽然是英文的&#xff0c;但我还是玩得不亦乐乎。好…

查看GPU使用情况和设置CUDA_VISIBLE_DEVICES

文章目录一、简介二、查看GPU状态和信息三、使用3.1临时设置&#xff08;临时设置方法一定要在第一次使用 cuda 之前进行设置&#xff09;3.2python 运行时设置3.3永久设置四、参考资料一、简介 服务器中有多个GPU&#xff0c;选择特定的GPU运行程序可在程序运行命令前使用&am…