带你彻底理解Spark的分区

news2025/4/6 11:54:54

前言

我:什么是RDD?
面试者:RDD是被分区的,由一系列分区组成…

我:你怎么理解分区?
面试者:…
我:Spark中有哪些可以实现分区的方法?分别使用的场景是什么?
面试者…
我:Spark默认分区数是多少?如何保证一个分区对应一个文件?
面试者…
我:…谢谢您的面试,回去等通知吧!

什么是分区

Spark分区是将大型数据集划分为较小的数据块,每个数据块称为分区,分区是一个逻辑数据块,对应相应的物理块Block。每个分区都可以在集群中的不同节点上并行处理,这样可以提高Spark的并行性和性能。分区的数量可以通过设置Spark的分区数来控制,分区数越多,Spark可以并行处理的数据块就越多,从而提高性能。分区的数量应根据数据的大小和集群的资源进行调整,以充分利用集群的并行处理能力。在处理大型数据集时,Spark分区是非常重要的,因为它可以帮助Spark充分利用集群的资源和并行处理能力,从而加快数据处理速度。

有哪些分区方法,使用场景是什么

分区方法使用场景
repartition(numPartitions : scala.Int)对数据集随机打散进行范围分区,每个分区中数据量大致相同
repartition(partitionExprs : Column*)对数据集中指定列进行哈希分区
repartition(numPartitions : scala.Int, partitionExprs : Column*)partition = hash(partitionExprs) % numPartitions,使用指定列的哈希值对指定分区数进行取模
coalesce(numPartitions : scala.Int)用来减少分区数量,可以避免shuffle
repartitionByRange(partitionExprs : Column*)使用范围分区,非常使用与数字列
partitionBy(colNames : root.scala.Predef.String*)用于将数据写入磁盘中对应的子文件夹,类似于hive的分区

注意:partitionBy()是DataFrameWriter类中的一个方法,其他所有方法都来自DataFrame。

分区的几个常识

  1. 默认情况下,Spark创建的分区等于服务器中CPU内核的数量。
  2. 每个分区的数据都存在一台服务器上(这里注意和HDFS上Block的区别)。
  3. Spark为每个分区创建一个Task。
  4. Spark Shuffle操作将数据从一个分区移动到其他分区。
  5. 分区是一项昂贵的操作,因为它会造成shuffle(数据可能在节点之间移动)。
  6. 默认情况下,DataFrame shuffle操作会创建200个分区。

内存分区和磁盘分区

Spark支持内存分区(RDD/DataFrame)和磁盘分区(文件系统)。

  1. 内存分区
    您可以通过调用repartition() or coalesce()算子来对DataFrame进行分区或重分区。
  2. 磁盘分区
    在将DataFrame写回磁盘时,可以使用DataFrameWriter的partitionBy()来选择如何根据列对数据进行分区。这与Hive分区类似。

分区的优点

众所周知,Spark的处理大型数据集的速度是MapReduce处理速度的100倍,如果没有分区,这是不可能的。以下是在内存或磁盘上使用Spark分区的一些优点。

  1. 快速访问数据。
  2. 提供在较小数据集上执行算子的能力。

很多数据处理框架都在使用分区,因为它能够很快读取数据。

默认分区和配置

默认情况下,Spark会根据运行Job的模式对数据进行分区。

  1. Local模式
    在本地以独立模式运行时,Spark会将数据划分为系统上的CPU核数或创建SparkSession对象时指定的值。
        SparkSession spark = SparkSession
                .builder()
                .appName("appName")
                .master("local[5]")
                .enableHiveSupport()
                .getOrCreate();

上面的例子提供了local[5]作为master()方法的参数,这意味着用5个分区在本地运行作业。就算你系统只有2个内核,它仍然会创建5个分区任务。

package sparkdemo;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;


public class PartitionTest {
    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkSession spark = SparkSession
                .builder()
                .appName("appName")
                .master("local[5]")
                .enableHiveSupport()
                .getOrCreate();

        Dataset<Long> ds = spark.range(0, 20);
        System.out.println(ds.rdd().getNumPartitions());
        spark.stop();
    }
}

结果打印的是:5

  1. HDFS Cluster模式
    在Hadoop集群上运行Spark作业时,默认的分区数量需要考虑以下几个种情况:
  • 在HDFS集群上,默认情况下,Spark为文件的每个Block创建一个分区。
  • 在Hadoop1.X中,HDFS块大小为64MB,在Hadoop2.X中HDFS块大小为128MB。
  • 集群中所有executor核总数和2,哪个大取哪个。

例如,如果某个文件大小为640 MB,在Hadoop2.X上运行,则会创建5个分区,每个分区由128 MB的Block块组成(5个块*128 MB=640 MB)。如果将分区重新划分为10,那么它会为每个块创建2个分区。

  1. 通过配置设置分区
  • spark.default.parallelism
    配置默认值设置为集群中所有节点上的所有核数,在本地,它设置为系统上的核数。
  • spark.sql.shuffle.partitions
    配置默认值设置为200,当调用shuffle算子,如groupBy、join算子等,此属性仅在DataFrame API中可用,在RDD中不可用。
    在代码中设置配置属性的方式如下:

spark.conf.set("spark.sql.shuffle.partitions", "500")

也可在使用spark-submit命令提交spark应用程序的时候,增加配置属性:


./bin/spark-submit --conf spark.sql.shuffle.partitions=500 --conf spark.default.parallelism=500

动态修改分区数

创建RDD/DataFrame时,可以通过参数设定分区的数量,Spark也可以修改内存分区数量、磁盘分区数量。

  1. repartition()和coalesce()
    在处理分区数据时,可以通过reparation和coalesce来增加和减少分区。
package sparkdemo;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;


public class PartitionTest {
    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkSession spark = SparkSession
                .builder()
                .appName("appName")
                .master("local[5]")
                .enableHiveSupport()
                .getOrCreate();
        Dataset<Long> ds = spark.range(0, 10);
        System.out.println("默认分区数量:"+ds.rdd().getNumPartitions());
        Dataset<Long> repartition = ds.repartition(10);
        System.out.println("repartition增加分区数:"+repartition.rdd().getNumPartitions());
        Dataset<Long> coalesce = ds.coalesce(2);
        System.out.println("coalesce减少分区数:"+coalesce.rdd().getNumPartitions());
        spark.stop();
    }
}

默认分区数量:5
repartition增加分区数:10
coalesce减少分区数:2

注意:当你想减少分区的数量时,建议使用coalesce而不是repartition,因为能避免shuffle。

  1. partitionBy()
    Spark partitionBy()是DataFrameWriter类的一个方法,用于在将DataFrame写入磁盘/文件系统时基于一个或多个列值进行分区。
    当通过调用partitionBy()将Spark DataFrame写入磁盘时,Spark会根据分区列拆分记录,并将每个分区数据存储到一个子目录中。
    partitionBy可以对单个列或多个列进行分区。
    测试数据:
code,country
1,China
1,China
1,China
2,America
2,America
3,England
3,England
4,Japan
4,Japan
4,Japan
5,Korea
5,Korea

代码:

package sparkdemo;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


public class PartitionTest {
    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkSession spark = SparkSession
                .builder()
                .appName("appName")
                .master("local[5]")
                .enableHiveSupport()
                .getOrCreate();
        Dataset<Row> ds = spark.read().option("header", "true").csv("./data/country.csv");
        ds.write().option("header", "true").partitionBy("country")
                .mode("overwrite").csv("./data/country");
        spark.stop();
    }
}

DataFrame中共有5个不同的country,因此,它创建了5个目录,如下图所示。子目录的名称将是分区列及其值(分区列=值)。
在这里插入图片描述

  1. repartitionByRange()
    测试数据:
id,count
1,10
2,20
3,10
4,20
5,10
6,30
7,50
8,50
9,50
10,30
11,30
12,40
13,40
14,20
15,20

代码:

package sparkdemo;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


public class PartitionTest {
    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkSession spark = SparkSession
                .builder()
                .appName("appName")
                .master("local[5]")
                .enableHiveSupport()
                .getOrCreate();
        Dataset<Row> ds = spark.read().option("header", "true").csv("./data/num.csv");
        Dataset<Row> count = ds.repartitionByRange(5, new Column("count"));
        count.write().option("header", "true").mode("overwrite").csv("./data/range-partition");
        spark.stop();
    }
}

在这里插入图片描述
打开看其中一个分区:
在这里插入图片描述
所有count=50的在一个分区。

如何选择分区列

当使用partitionBy()时,必须非常小心它创建的分区数量,因为分区太多会在一个目录中创建太多子目录,这会给NameNode带来不必要的开销(如果你使用的是Hadoop),因为它必须将文件系统的所有元数据保存在内存中。因此,需要根据分区字段进行判断,比如使用country作为分区,就比使用city可能会更合适。

分区越多越好还是越少越少

分区可以提高作业的并行度从而提高spark的性能,那我们设置超级多分区岂不是美哉?
nonono,Spark必须为每个分区创建一个任务,如果大部分时间都用于创建、调度和管理任务,然后再执行,而不是大部分时间用来处理数据。同理,太少的分区根本不能充分利用集群资源,那简直就是对集群资源的一种浪费。因此,根据不同的数据量设置相应的分区才是最佳选择。

后记

相信看完这篇博客,你对spark的分区会有更加清晰的认识,无论在工作中,还是面试都将游刃有余。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/491313.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

nodejs-前端工程化环境-安装-webpack打包工具

文章目录 1.安装nodejs1.1.新建项目1.2.安装jQuery。1.3.查看全局模块安装目录 2.安装Vue2.1.安装2.2.创建vue项目 3.安装webpack4.安装 Grunt5.安装uglify-js > js代码压缩打包工具。6.因为在此系统上禁止运行脚本……解决办法 1.安装nodejs 从官网下载长期支持版本&#…

数值分析-埃尔米特插值的概念、实现与应用

目录 一、引言 二、埃尔米特插值的基本概念 2.1 埃尔米特插值的定义 2.2 埃尔米特插值的优点 三、埃尔米特插值的实现方法 3.1 基于拉格朗日插值的埃尔米特插值 2.2 基于牛顿插值的埃尔米特插值 四、埃尔米特插值的应用 4.1 基于埃尔米特插值的函数逼近 4.2 基于埃尔…

2分钟搞懂人工智能、机器学习和深度学习

不少高校的小伙伴找我聊入门人工智能该怎么起步&#xff0c;如何快速入门&#xff0c;多长时间能成长为中高级工程师&#xff08;聊下来感觉大多数学生党就是焦虑&#xff0c;毕业即失业&#xff0c;尤其现在就业环境这么差&#xff09;&#xff0c;但聊到最后&#xff0c;很多…

java遍历集合的方法

java中&#xff0c;集合的遍历是一项基本且重要的技能。我们不仅要知道集合中元素的个数&#xff0c;还要知道元素的值&#xff0c;以及它们之间的关系。 在 Java中&#xff0c;如果需要遍历集合中的某个元素&#xff0c;可以使用以下方法&#xff1a; 1.通过 return语句将集合…

工赋开发者社区 | 装备制造企业数字化转型总体框架

导读 当前&#xff0c;面对技术、市场以及供应链等多重挑战&#xff0c;在软件定义、数据驱动、数字孪生、大数据、人工智能及元宇宙等技术加持下&#xff0c;装备制造企业不断采用新工艺、新材料&#xff0c;以新模式推动产品快速创新。企业积极关注并探索数字化转型路径&…

ThingsBoard使用docker compose集群部署

1、概述 今天我将讲解官方文档说的使用docker compose集群部署ThingsBoard,这种部署方式也是目前企业中常用的形式,希望大家能够掌握,我不是直接使用官方的镜像,我是自己拉起代码,然后自己构建镜像,在传到服务器上,使用自己的镜像来部署。而且这种部署中间有个大坑,我…

雷达原理_有源干扰_间歇采样直接、重复、循环转发干扰_含MATLAB实现代码

间歇采样直接、重复、循环转发干扰 间歇采样转发干扰是在雷达脉冲周期内对雷达信号进行间歇采样&#xff0c;并通过干扰机将采样的信号进行处理和转发&#xff0c;从而生成相干的假目标信号。这种干扰方式的原理可分为直接转发、重复转发和逐次循环转发三种方式。直接转发是指…

这个档案室管理妙招,太有用了!

档案是人类文明发展到一定历史阶段的产物&#xff0c;是人类活动的真实记录&#xff0c;也是新的社会实践最可靠的凭证和依据。 借助档案&#xff0c;我们能够更好地了解过去、把握现在、预见未来&#xff0c;是一种宝贵的无形资产&#xff0c;也是一种不可再生资源。因此&…

Pandas + AI = PandasAI【Python】

Pandas AI 是一个 Python 库&#xff0c;它为流行的数据分析和操作工具 Pandas 添加了生成式AI能力。 PandasAI旨在与 Pandas 结合使用&#xff0c;而不是它的替代品。 推荐&#xff1a;用 NSDT场景设计器 快速搭建3D场景 1、安装PandasAI 使用如下命令安装pandas-ai&#xf…

linux中TF启动卡制作:磁盘分区文件同步

文章目录 前言&#xff1a;1. 连接TF卡2. 磁盘卸载载与分区2.1 磁盘卸载2.2 创建第一个分区2.3 创建第二个分区 3. 磁盘格式化4. 文件同步5. 检查与BOOT分区启动文件拷贝总结&#xff1a; 前言&#xff1a; TF卡在linux环境下配置好相关软件后&#xff0c;把配置好的系统以及软…

Neo4j图数据库的数据模型_包括节点_属性_数据_关系---Neo4j图数据库工作笔记0002

来看一下neo4j的特性 这个neo4j特点就是简单,这里用最快的速度学习 可以看到一个圈表示一个节点,然后两个节点直接可以有关系,关系可以是双向的

Python正则表达式详解,保姆式教学,0基础也能掌握正则

正则作为处理字符串的一个实用工具&#xff0c;在Python中经常会用到&#xff0c;比如爬虫爬取数据时常用正则来检索字符串等等。正则表达式已经内嵌在Python中&#xff0c;通过导入re模块就可以使用&#xff0c;作为刚学Python的新手大多数都听说”正则“这个术语。 今天来给…

学生成绩管理系统【纯控制台】(Java课设)

系统类型 纯控制台类型&#xff08;没有用到数据库&#xff09; 使用范围 适合作为Java课设&#xff01;&#xff01;&#xff01; 部署环境 jdk1.8Idea或eclipse 运行效果 本系统源码地址&#xff1a;https://download.csdn.net/download/qq_50954361/87753365 更多系统…

Python每日一练(20230505) 课程表 Course Schedule III/IV

目录 3. 课程表 Course Schedule III 4. 课程表 Course Schedule IV &#x1f31f; 每日一练刷题专栏 &#x1f31f; Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练 专栏 Java每日一练 专栏 3. 课程表 Course Schedule III 这里有 n 门不同的在线课程&#xff…

python并发编程:什么是并发编程?python对并发编程有哪些支持?

Python并发编程是指同时执行多个任务的编程模式。Python提供了多种实现并发编程的方式&#xff0c;包括多线程、多进程、协程、异步IO等。 为什么要引入并发编程 假设以下两个场景&#xff1a; 场景一: 一个网络爬虫&#xff0c;按顺序爬取花了一个小时&#xff0c;采用并发…

距新发牌制度生效不到1个月,我们和数位香港Web3er聊了聊

出品&#xff5c;欧科云链研究院 作者&#xff5c;Jason Jiang 4月20日&#xff0c;欧洲议会通过加密资产市场法规&#xff08;MiCA&#xff09;,使欧盟成为全球首个引入全面加密法的主要司法管辖区。与此同时&#xff0c;东方世界的香港也正加速拥抱Web3变革。香港特区立法会…

【线程安全】内存可见性问题及解决方案

1. 关于内存可见性的一段代码 import java.util.Scanner; public class ThreadDemo {public static int count 0;public static void main(String[] args) throws InterruptedException {Thread t1 new Thread(() -> {while (count 0) {}System.out.println("t1 线程…

双向链表及双向链表的常见操作和用js封装一个双向链表

书接上回&#xff0c;上一篇文章讲了单向链表以及用 js 封装一个单向链表&#xff0c;所以这节将介绍双向链表以及用 js 封装一个双向链表。待会我也会继续在文章后面附上视频学习链接地址&#xff0c;大家想学习的可以去看看 一、认识双向链表 首先来认识一下什么是双向链表&…

广和通发布5G智能模组SC151系列,助力AIoT应用更智能高效

2023年5月&#xff0c;广和通发布5G R16智能模组SC151系列。SC151系列基于4nm制程工艺的高通QCM4490解决方案设计&#xff0c;采用8核高性能处理器&#xff0c;为工业与商业物联网终端提供高性能处理能力。面对与日俱增的终端智能化需求&#xff0c;SC151系列将助力打造高生产力…

【致敬未来的攻城狮计划】第2期定向赠书《RT-Thread设备驱动开发指南》+ 《GD32 MCU原理及固件库开发指南》

开启攻城狮的成长之旅&#xff01;这是我参与的由 CSDN博客专家 架构师李肯&#xff08;超链接&#xff1a;http://yyds.recan-li.cn&#xff09;和 瑞萨MCU &#xff08;超链接&#xff1a;瑞萨电子 (Renesas Electronics Corporation)&#xff09; 联合发起的「 致敬未来的攻…