大数据技术架构(组件)34——Spark:Spark SQL--Optimize

news2025/1/23 11:56:42

2.2.3、Optimize

2.2.3.1、SQL

3.3.1.1、RB

1、Join选择

在Hadoop中,MR使用DistributedCache来实现mapJoin。即将小文件存放到DistributedCache中,然后分发到各个Task上,并加载到内存中,类似于Map结构,然后借助于Mapper的迭代机制,遍历大表中的每一条记录,并查找是否在小表中,如果不在则省略。

而Spark是使用广播变量的方式来实现MapJoin.

2、谓词下推

3、列裁剪

4、常量替换

5、分区剪枝

3.3.1.2、CBO

开启cbo之后(通过配置spark.sql.cbo.enabled),有以下几个优化点:

1、Build选择

2、优化Join类型

3、优化多Join顺序

3.3.1.3、AE

3.3.1.3.1、Auto Setting The Shuffle Partition Number

Property Name

Default

Meaning

spark.sql.adaptive.enabled

false

设置为true,开启自适应机制

spark.sql.adaptive.minNumPostShufflePartitions

1

自适应机制下最小的分区数,可以用来控制最小并行度

spark.sql.adaptive.maxNumPostShufflePartitions

500

自适应机制下最大的分区数,可以用来控制最大并行度

spark.sql.adaptive.shuffle.targetPostShuffleInputSize

67108864

动态reducer端每个Task最少处理的数据量. 默认为 64 MB.

spark.sql.adaptive.shuffle.targetPostShuffleRowCount

20000000

动态调整每个task最小处理

20000000条数据。该参数只有在行统计数据收集功能开启后才有作用

3.3.1.3.2、Optimizing Join Strategy at Runtime

Property Name

Default

Meaning

spark.sql.adaptive.join.enabled

true

运行过程是否动态调整join策略的开关

spark.sql.adaptiveBroadcastJoinThreshold

equals to spark.sql.autoBroadcastJoinThreshold

运行过程中用于判断是否满足BroadcastJoin条件。如果不设置,则该值等于

spark.sql.autoBroadcastJoinThreshold.

3.3.1.3.3、Handling Skewed Join

Property Name

Default

Meaning

spark.sql.adaptive.skewedJoin.enabled

false

运行期间自动处理倾斜问题的开关

spark.sql.adaptive.skewedPartitionFactor

10

如果一个分区的大小大于所有分区大小的中位数而且大于spark.sql.adaptive.skewedPartitionSizeThreshold,或者分区条数大于所有分区条数的中位数且大于spark.sql.adaptive.skewedPartitionRowCountThreshold那么就会被当成倾斜问题来处理

spark.sql.adaptive.skewedPartitionSizeThreshold

67108864

倾斜分区大小不能小于该值

spark.sql.adaptive.skewedPartitionRowCountThreshold

10000000

倾斜分区条数不能小于该值

spark.shuffle.statistics.verbose

false

启用后MapStatus会采集每个分区条数信息,用来判断是否倾斜并进行相应的处理

2.2.3.2、Compute

2.2.3.2.1、Dynamic Executor Allocation

2.2.3.2.2、Paralliesm

2.2.3.2.3、Data Skew/Shuffle

其除了手段和Spark文章中提到的倾斜一样,这里不再叙述

2.2.3.2.4、Properties

更多配置见

Property Name

Default

Meaning

spark.sql.inMemorycolumnarStorage.compressed

true

内存中列存储压缩

spark.sql.codegen

false

设置为true,可以为大型查询快速编辑创建字节码

spark.sql.inMemoryColumnarStorage.batchSize

10000

默认列缓存大小为10000,增大该值可以提高内存利用率,但要避免OOM问题

spark.sql.files.maxPartitionBytes

134217728 (128 MB)

The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

spark.sql.files.openCostInBytes

4194304 (4 MB)

The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over-estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first). This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

spark.sql.files.minPartitionNum

Default Parallelism

The suggested (not guaranteed) minimum number of split file partitions. If not set, the default value is `spark.default.parallelism`. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

spark.sql.broadcastTimeout

300

Timeout in seconds for the broadcast wait time in broadcast joins

spark.sql.autoBroadcastJoinThreshold

10485760 (10 MB)

Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been run.

spark.sql.shuffle.partitions

200

Configures the number of partitions to use when shuffling data for joins or aggregations

spark.sql.sources.parallelPartitionDiscovery.threshold

32

Configures the threshold to enable parallel listing for job input paths. If the number of input paths is larger than this threshold, Spark will list the files by using Spark distributed job. Otherwise, it will fallback to sequential listing. This configuration is only effective when using file-based data sources such as Parquet, ORC and JSON.

spark.sql.sources.parallelPartitionDiscovery.parallelism

10000

Configures the maximum listing parallelism for job input paths. In case the number of input paths is larger than this value, it will be throttled down to use this value. Same as above, this configuration is only effective when using file-based data sources such as Parquet, ORC and JSON.

spark.sql.adaptive.coalescePartitions.enabled

true

When true and spark.sql.adaptive.enabled is true, Spark will coalesce contiguous shuffle partitions according to the target size (specified by spark.sql.adaptive.advisoryPartitionSizeInBytes), to avoid too many small tasks

spark.sql.adaptive.coalescePartitions.minPartitionNum

Default Parallelism

The minimum number of shuffle partitions after coalescing. If not set, the default value is the default parallelism of the Spark cluster. This configuration only has an effect when spark.sql.adaptive.enabled

and spark.sql.adaptive.coalescePartitions.enabled

are both enabled.

spark.sql.adaptive.coalescePartitions.initialPartitionNum

(none)

The initial number of shuffle partitions before coalescing. If not set, it equals to spark.sql.shuffle.partitions

. This configuration only has an effect when spark.sql.adaptive.enabled

and spark.sql.adaptive.coalescePartitions.enabled

are both enabled.

spark.sql.adaptive.advisoryPartitionSizeInBytes

64 MB

The advisory size in bytes of the shuffle partition during adaptive optimization (when spark.sql.adaptive.enabled

is true). It takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle partition.

spark.sql.adaptive.localShuffleReader.enabled

true

开启自适应执行后,spark会使用本地的shuffle reader读取shuffle数据。这种情况只会发生在没有shuffle重分区的情况

spark.sql.adaptive.skewJoin.enabled

true

When true and spark.sql.adaptive.enabled is true, Spark dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed partitions.

spark.sql.adaptive.skewJoin.skewedPartitionFactor

5

A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes.

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

256MB

A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than spark.sql.adaptive.skewJoin.skewedPartitionFactor

multiplying the median partition size. Ideally this config should be set larger than spark.sql.adaptive.advisoryPartitionSizeInBytes

、.

spark.sql.optimizer.maxIterations

100

The max number of iterations the optimizer and analyzer runs

spark.sql.optimizer.inSetConversionThreshold

10

The threshold of set size for InSet conversion

spark.sql.inMemoryColumnarStorage.partitionPruning

true

When true,enable partition pruning for in-memory columnar tables

spark.sql.inMemoryColumnarStorage.enableVectorizedReader

true

Enables vectorized reader for columnar caching

spark.sql.columnVector.offheap.enabled

true

When true, use OffHeapColumnVector in ColumnarBatch.

spark.sql.join.preferSortMergeJoin

true

When true, prefer sort merge join over shuffle hash join

spark.sql.sort.enableRadixSort

true

When true, enable use of radix sort when possible. Radix sort is much faster but requires additional memory to be reserved up-front. The memory overhead may be significant when sorting very small rows (up to 50% more in this case)

spark.sql.limit.scaleUpFactor

4

Minimal increase rate in number of partitions between attempts when executing a take on a query. Higher values lead to more partitions read. Lower values might lead to longer execution times as more jobs will be run

spark.sql.hive.advancedPartitionPredicatePushdown.enabled

true

When true, advanced partition predicate pushdown into Hive metastore is enabled

spark.sql.subexpressionElimination.enabled

true

When true, common subexpressions will be eliminated

spark.sql.caseSensitive

false

Whether the query analyzer should be case sensitive or not. Default to case insensitive. It is highly discouraged to turn on case sensitive mode

spark.sql.crossJoin.enabled

false

When false, we will throw an error if a query contains a cartesian product without explicit CROSS JOIN syntax.

spark.sql.files.ignoreCorruptFiles

false

Whether to ignore corrupt files. If true, the Spark jobs will continue to run when encountering corrupted files and the contents that have been read will still be returned.

spark.sql.files.ignoreMissingFiles

false

Whether to ignore missing files. If true, the Spark jobs will continue to run when encountering missing files and the contents that have been read will still be returned.

spark.sql.files.maxRecordsPerFile

0

Maximum number of records to write out to a single file.If this value is zero or negative, there is no limit.

spark.sql.cbo.enabled

false

Enables CBO for estimation of plan statistics when set true.

spark.sql.cbo.joinReorder.enabled

false

Enables join reorder in CBO

spark.sql.cbo.joinReorder.dp.threshold

12

The maximum number of joined nodes allowed in the dynamic programming algorithm

spark.sql.cbo.joinReorder.card.weight

0.7

The weight of cardinality (number of rows) for plan cost comparison in join reorder: rows * weight + size * (1 - weight).

spark.sql.cbo.joinReorder.dp.star.filter

false

Applies star-join filter heuristics to cost based join enumeration

spark.sql.cbo.starSchemaDetection

false

When true, it enables join reordering based on star schema detection

spark.sql.cbo.starJoinFTRatio

0.9

Specifies the upper limit of the ratio between the largest fact tables for a star join to be considered

spark.sql.windowExec.buffer.in.memory.threshold

4096

Threshold for number of rows guaranteed to be held in memory by the window operator

2.2.3.3、Storage

2.2.3.3.1、Small File

小文件的危害就不再叙述了,这个时候就要思考什么时候会产生小文件。其产生的地方有:

1、源头:如果原始文件就存在小文件,那么就需要先进行合并,然后再计算,避免产生大量的task造成资源浪费

2、计算过程中:这个时候就要结合实际的数据量大小和分布,以及分区数进行调整。

3、写入:写入文件的数量跟reduce/分区的个数有关系,可以根据实际的数据量进行调整并行度或者配置自动合并

2.2.3.3.2、Cold And Hot Data

2.2.3.3.3、Compress And Serializable

1、文件采用合适的存储类型以及压缩格式

2、使用合适高效的序列化器,如kryo

Property Name

Default

Meaning

spark.sql.parquet.compression.codec

snappy

parquet存储类型文件的压缩格式,默认为snappy

spark.sql.sources.fileCompressionFactor

1.0

When estimating the output data size of a table scan, multiply the file size with this factor as the estimated data size, in case the data is compressed in the file and lead to a heavily underestimated result

spark.sql.parquet.mergeSchema

false

When true, the Parquet data source merges schemas collected from all data files, otherwise the schema is picked from the summary file or a random data file if no summary file is available

spark.sql.parquet.respectSummaryFiles

false

When true, we make assumption that all part-files of Parquet are consistent with summary files and we will ignore them when merging schema. Otherwise, if this is false, which is the default, we will merge all part-files. This should be considered as expert-only option, and shouldn't be enabled before knowing what it means exactly

spark.sql.parquet.binaryAsString

false

Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems

spark.sql.parquet.filterPushdown

true

Enables Parquet filter push-down optimization when set to true

spark.sql.parquet.columnarReaderBatchSize

4096

The number of rows to include in a parquet vectorized reader batch. The number should be carefully chosen to minimize overhead and avoid OOMs in reading data.

2.2.3.4、Other

2.2.3.4.1、Closed Loop FeedBack

2.2.3.4.1.1、实时运行信息分析

2.2.3.4.1.2、运行信息离线统计分析

高频表、列统计,错误信息汇总,策略生效情况记录等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/344212.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【半监督医学图像分割 2021 CVPR】CVRL

文章目录【半监督医学图像分割 2021 CVPR】CVRL摘要1. 介绍1.1 总览1.2 无监督对比学习2. 实验3. 总结【半监督医学图像分割 2021 CVPR】CVRL 论文题目&#xff1a;Momentum Contrastive Voxel-wise Representation Learning for Semi-supervised Volumetric Medical Image Seg…

ThinkPHP多语言模块文件包含RCE复现详细教程

免责声明 本文章只用于技术交流&#xff0c;若使用本文章提供的技术信息进行非法操作&#xff0c;后果均由使用者本人负责。 漏洞描述&#xff1a; ThinkPHP在开启多语言功能的情况下存在文件包含漏洞&#xff0c;攻击者可以通过get、header、cookie等位置传入参数&#xff…

Transformer机制学习笔记

学习自https://www.bilibili.com/video/BV1J441137V6 RNN&#xff0c;CNN网络的缺点 难以平行化处理&#xff0c;比如我们要算b4b^4b4&#xff0c;我们需要一次将a1a^1a1~a4a^4a4依次进行放入网络中进行计算。 于是有人提出用CNN代替RNN 三角形表示输入&#xff0c;b1b^1b1的…

【数据结构】算法的复杂度分析:让你拥有未卜先知的能力

&#x1f451;专栏内容&#xff1a;数据结构⛪个人主页&#xff1a;子夜的星的主页&#x1f495;座右铭&#xff1a;日拱一卒&#xff0c;功不唐捐 文章目录一、前言二、时间复杂度1、定义2、大O的渐进表示法3、常见的时间复杂度三、空间复杂度1、定义2、常见的空间复杂度一、前…

微信小程序删除list指定列表项

一、删除效果展示&#xff1a; // 重要代码片段async deleteListItem(e) {const sureResult await wx.showModal({title: "提示",content: "确定要删除这项吗",});if (sureResult.confirm) {const { index } e.currentTarget.dataset;setTimeout(()>{…

Centos7安装kvm WEB管理工具kimchi

参考&#xff1a;虚拟化技术之kvm WEB管理工具kimchi一、kimchi安装1.1 下载wok安装包和kimchi安装包# 1. wok下载地址&#xff1a; wget https://github.com/kimchi-project/kimchi/releases/download/2.5.0/wok-2.5.0-0.el7.centos.noarch.rpm # 2. kimchi下载地址&#xff1…

Springmvc补充配置

Controller配置总结 控制器通常通过接口定义或注解定义两种方法实现 在用接口定义写控制器时&#xff0c;需要去Spring配置文件中注册请求的bean;name对应请求路径&#xff0c;class对应处理请求的类。 <bean id"/hello" class"com.demo.Controller.HelloCo…

【Spark分布式内存计算框架——Spark SQL】2. SparkSQL 概述(上)

第二章 SparkSQL 概述 Spark SQL允许开发人员直接处理RDD&#xff0c;同时可以查询在Hive上存储的外部数据。Spark SQL的一个重要特点就是能够统一处理关系表和RDD&#xff0c;使得开发人员可以轻松的使用SQL命令进行外部查询&#xff0c;同时进行更加复杂的数据分析。 2.1 前…

蓝桥杯模块学习16——PCF8591(深夜学习——单片机)

一、硬件电路&#xff1a;1、蓝桥杯板子上的电路&#xff1a;&#xff08;1&#xff09;AIN0-3&#xff1a;四种模拟量的输入口&#xff0c;AIN1为光敏电阻控制电压输入&#xff0c;AIN3为电位器控制电压输入&#xff08;2&#xff09;A0-2&#xff1a;决定设备的地址码&#x…

SSTI漏洞原理及渗透测试

模板引擎&#xff08;Web开发中&#xff09; 是为了使 用户界面 和 业务数据&#xff08;内容&#xff09;分离而产生的&#xff0c;它可以生成特定格式的文档&#xff0c; 利用模板引擎来生成前端的HTML代码&#xff0c;模板引擎会提供一套生成HTML代码的程序&#xff0c;之后…

TensorRT的Python接口解析

TensorRT的Python接口解析 文章目录TensorRT的Python接口解析4.1. The Build Phase4.1.1. Creating a Network Definition in Python4.1.2. Importing a Model using the ONNX Parser4.1.3. Building an Engine4.2. Deserializing a Plan4.3. Performing Inference点此链接加入…

科技巨头争相入局,卫星通信领域将迎来怎样的发展?

近年来&#xff0c;全球卫星通信产业进入了一个高速发展的阶段。与卫星通信相关的新技术和新应用不断出现&#xff0c;成为了媒体报道的热点&#xff0c;也引起了公众的广泛关注。尤其是刚刚过去的2022年&#xff0c;华为和苹果公司分别发布了搭载卫星通信技术的手机&#xff0…

万丈高楼平地起:Linux常用命令

目录 系统管理命令 man命令 ls命令 cd命令 useradd命令 passwd命令 free命令 whoami命令 ps命令 date命令 pwd命令 shutdown命令 文件目录管理命令 touch命令 cat命令 mkdir命令 rm命令 cp命令 mv命令 find命令 more指令 less指令 head指令 tail指令 …

基于HTML实现浪漫情人节表白代码(附源代码)

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

【spark】第三章——SparkSQL

文章目录1. SparkSQL 概述1.1 SparkSQL 是什么1.2 Hive and SparkSQL1.3 SparkSQL 特点1.3.1 易整合1.3.2 统一的数据访问1.3.3 兼容 Hive1.3.4 标准数据连接1.4 DataFrame 是什么1.5 DataSet 是什么2. SparkSQL 核心编程2.1 新的起点2.2 DataFrame2.2.1 创建 DataFrame2.2.2 S…

MVC架构 —— 理解 Dao 层和 Service 层

MVC 框架念叨了千百遍&#xff0c;但是对于它的理解还是停留在概念上。 作为一种经典架构设计典范&#xff0c;MVC 在日新月异的软件行业却能常青数十年&#xff0c;一定有其独特的魅力。 一、Dao 层和 Service 层的概念 Dao 是 Data Access Object &#xff08;数据访问对象&…

FPGA纯verilog实现任意分辨率视频输出显示,高度贴近真实项目,提供工程源码和技术支持

目录1、前言2、视频显示的VESA协议3、VESA协议的bug4、FPGA实现任意分辨率视频输出显示5、FDMA实现数据缓存6、vivado工程详解7、上板调试验证并演示8、福利&#xff1a;工程代码的获取1、前言 本设计使用纯Verilog代码实现&#xff0c;重点在于基于AXI协议的DDR控制器的运用&…

SpringBoot 整合EasyExcel详解

一、概述 Java解析、生成Excel比较有名的框架有Apache poi、jxl。但他们都存在一个严重的问题就是非常的耗内存&#xff0c;poi有一套SAX模式的API可以一定程度的解决一些内存溢出的问题&#xff0c;但POI还是有一些缺陷&#xff0c;比如07版Excel解压缩以及解压后存储都是在内…

内网渗透-src挖掘-互联网打点到内网渗透-2023年2月

1、通过信息搜集&#xff0c;发现目标有一个互联网访问的骑士cms 2、发现该系统骑士cms版本为6.0.20&#xff0c;通过搜索&#xff0c;发现骑士cms < 6.0.48存在任意文件包含漏洞 /Application/Common/Controller/BaseController.class.php 该文件的assign_resume_tpl函数…

字节面试惨败,闭关修炼再战美团(Android 面经~)

作者&#xff1a;王旭 前言 本人从事Android 开发已经有5年了&#xff0c;受末日寒气影响&#xff0c;被迫在家休整&#xff0c;事后第一家选择字节跳动面试&#xff0c;无奈的被面试官虐得“体无完肤”&#xff0c;好在自己并未气馁&#xff0c;于是回家开始回家进行闭关修炼…