大数据 - Spark系列《一》- 分区 partition数目设置详解

news2024/10/6 1:39:34

目录

🐶3.2.1 分区过程

🐶3.2.2 SplitSize计算和分区个数计算

🐶3.2.3 Partition的数目设置

1. 🥙对于数据读入阶段,输入文件被划分为多少个InputSplit就会需要多少初始task.

2. 🥙对于转换算子产生的RDD的分区数

 3. 🥙如果指定了spark.default.parallelism,在进行shuffle之后的新的rdd会和spark.default.parallelism设置的一致

​编辑

4. 🥙repartition和coalesce操作会聚合成指定分区数。

🐶3.2.4 groupBy不一定会Shuffle


🐶3.2.1 分区过程

每一个过程的任务数,对应一个InputSplit,Paritition 输入可能以多个文件的形式存储在HDFS上面,,每个File都包含了很多块(128切分),称为block

当Spark读取这些文件作为输入时,会根据具体数据格式对应的InputFormat进行解析,按照SplitSize切成一个个输入分片。随后将为这些输入分片生成具体的task. InputSplit与Task是一一对应的关系

注意:InputSplit不能跨越文件。

随后这些具体的Task每个都会被分配到集群上的某个节点的某个Executor去执行。

  • 每个节点可以起一个或多个Executor.

  • 每个Executor由若干core组成,每个Executor的每个core一次只能执行一个task.

  • 每个task执行的结果就就是生成了目标rdd的一个partition.

注意:这里的core是虚拟的core而不是机器的物理CPU核,可以理解为Executor的一个工作线程。Task被执行的并发度=Executor数目*每个Executor核数(=core总个数)

🐶3.2.2 SplitSize计算和分区个数计算

🐶3.2.3 Partition的数目设置

1. 🥙对于数据读入阶段,输入文件被划分为多少个InputSplit就会需要多少初始task.
  • 集合

    • (优先等级1)指定分区数

    • (优先等级2)使用 set("spark.default.parallelism","8")

    • (优先等级3)所有的可用核数

  • 文件 根据计算来的任务切片大小和输入路径下的文件大小 ,至少2并行度

  • 数据库 指定的

2. 🥙对于转换算子产生的RDD的分区数
  • 默认和父RDD的分区数一致

  • 有些算子可以调用的时候指定分区个数 distinct groupBy groupByKey

  • 特殊的算子 有特殊规定 union(和) join

val rdd3 = rdd1.intersection(rdd2)  // 取大的
val rdd4 = rdd1.subtract(rdd2) // 前面的RDD分区数
println(rdd1.cartesian(rdd2).getNumPartitions) // 两个分区个数乘积

 注意: 可能产生Shuffle的算子可以指定分区个数的

//可能产生shuffle的操作
distinct(p)     减少
groupBy(_._1 , p)    Shuffle 
groupByKey( p)       Shuffle 
groupByKey(_+_, p)   Shuffle 
join( , p)
 3. 🥙如果指定了spark.default.parallelism,在进行shuffle之后的新的rdd会和spark.default.parallelism设置的一致
package com.doit.com.doit.day0128

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @日期: 2024/1/30
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 我是技术大牛
 * @Description:
 */

/** data/orders.txt
oid01,100,bj
oid02,100,bj
oid03,100,bj
oid04,100,nj
oid05,100,nj
*/

object Test06 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("Starting...")
      .setMaster("local[*]")
      .set("spark.default.parallelism", "8")
    val sc = new SparkContext(conf)

    //设置spark-submit提交程序时不在控制台打印日志信息
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    val rdd1 = sc.textFile("data/orders.txt")

    //将rdd1的分区设置为2
    rdd1.repartition(2)
    println("rdd1 partition为:"+rdd1.getNumPartitions)

    //将rdd1按照城市分组
    val rdd2 = rdd1.groupBy(tp=>{
      val arr = tp.split(",")
      arr(2)
    })

    println("rdd2 partition为:"+rdd2.getNumPartitions)
    sc.stop()
  }
}
4. 🥙repartition和coalesce操作会聚合成指定分区数。
println(rdd1.repartition(3).getNumPartitions) // 增加 
println(rdd1.repartition(1).getNumPartitions)  //减少
println(rdd1.coalesce(1, true).getNumPartitions)  //减少
println(rdd1.coalesce(3, true).getNumPartitions)  //增加
// 不允许Shuffle就不能增加分区
println(rdd1.coalesce(3, false).getNumPartitions)  //增加失败
println(rdd1.coalesce(1, false).getNumPartitions)  //减少  不会Shuffle

🐶3.2.4 groupBy不一定会Shuffle

Shuffle:上游一个分区的数据可能被下游所有分区引用

package com.doit.com.doit.day0128

import org.apache.spark.SparkContext.jarOfObject
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
 * @日期: 2024/1/29
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 我是技术大牛
 * @Description:
 */


object Test03 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("doe").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1 = sc.makeRDD(List("a b c d e f g"), 2)

    val rdd2: RDD[String] = rdd1.flatMap(_.split("\\s+"))

    val wordOne = rdd2.map(line=>{
      println("aaaaaa")
      (line,1)
    })   //2

    //对数据使用HashPartitioner在分区 2
    val rdd3 = wordOne.partitionBy(new HashPartitioner(3))

    rdd3.mapPartitionsWithIndex((p,iter)=>{
      iter.map(e=>(p,e))
    }).foreach(println)

    //底层默认是HashPartition分区 2
    val rdd4: RDD[(String, Iterable[(String, Int)])] = rdd3.groupBy(_._1, 3)

    val rdd5: RDD[(Int, (String, Iterable[(String, Int)]))] = rdd4.mapPartitionsWithIndex((p, iter) => {
      iter.map(e => (p, e))
    })

    rdd5.foreach(println)
    
    sc.stop()
  }
}

 结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1426774.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Bugs】Jmeter报错:NoSuchMethodError: org.apache.jmeter.samplers.

报错情况 Jmeter版本:5.4.3 报错场景:在线程组中添加了jpgc - PerfMon Metrics Collector性能监控组件后出现报错。 Jmeter中无法运行测试,cmd命令行中出现以下报错。 cmd报错详细内容: Uncaught Exception java.lang.NoSuchMe…

如何系统的自学Python?通义千问、讯飞星火、文心一言及ChatGPT的回答

如何系统的自学Python?来看看通义千问、讯飞星火、文心一言及ChatGPT的回答. 第一个是马老师的通义千问 系统地自学Python是一个循序渐进的过程,从基础语法到实践项目,再到专业领域的深入学习。下面是一个详细的步骤指南: 了解Py…

Qt5 基于OpenGL实现六轴机械臂三维仿真

需求 在Qt中通过OPenGL方式加载三维模型STL文件,然后将多个结构的STL文件类型的模型进行组装,形成6轴机械臂三维模型的显示,并且可以对每个关节进行关节角度的控制。 新建一个C类STLFileLoader,用于加载STL文件,并进…

电口模块的应用:实现高速网络通信

随着互联网的普及和信息时代的到来,网络通信已经成为了现代社会中不可或缺的一部分。电口模块作为网络通信中的重要组成部分,其应用也越来越广泛。在本文中,我们将详细介绍电口模块的优势以及应用领域。 首先,电口模块具有高速传…

JavaScript 与Java什么关系?为什么名字中带有Java?

JavaScript与Java关系 JavaScript和Java是两种不同的编程语言,它们之间没有直接关系。尽管它们都以“Java”命名,但是它们的语法、用途和应用场景都不同。 Java是一种面向对象的、静态类型的编程语言,主要用于开发独立应用程序、网络应用、…

unity 拖入文件 窗口大小

目录 unity 拖入文件插件 设置窗口大小 unity 拖入文件插件 GitHub - Bunny83/UnityWindowsFileDrag-Drop: Adds file drag and drop support for Unity standalong builds on windows. 设置窗口大小 file build

golang Cannot assign a value to the unexported field ‘xxxxx‘

最近学习golang,结果发现参考github的代码报错了 查了一下资料,这里记录加吐槽一下,这个设定真的让我感觉痛苦 go 实例化结构体报错 Cannot assign a value to the unexported field xxxxx 或者是报错implicit assignment of unexported fiel…

计算机系统体系结构

文章目录 计算机系统体系结构1. 什么是计算机体系结构术语解释计算机系统体系结构所涉及的内容简单通用计算机结构计算机指令程序执行过程时钟 2. 计算机的发展机械计算机机电式计算机早期电子计算机微机和PC革命移动计算和云计算摩尔定律乱序执行 3. 存储程序计算机寄存器传输…

qt学习:停车场管理系统+摄像头+http识别车牌+sqlite3数据库

目录 参考前面发的几篇文章http识别车牌,sqlite3数据库、摄像头的文章 步骤 部分代码 新建一个项目,加入前面用到的http和image两个文件,和加入用到的模块和头函数和成员,加入前面用到的全局变量 配置ui界面 在构造函数中初…

sql指南之null值用法

注明:参考文章: SQL避坑指南之NULL值知多少?_select null as-CSDN博客文章浏览阅读2.9k次,点赞7次,收藏21次。0 引言 SQL NULL(UNKNOW)是用来代表缺失值的术语,在表中的NULL值是显示…

微服务入门篇:Ribbon负载均衡(原理,均衡策略,饥饿加载)

目录 1.负载均衡原理2.负载均衡策略3.饥饿加载 1.负载均衡原理 在使用 LoadBalanced 注解后,Spring Cloud Ribbon 将会为 RestTemplate 添加负载均衡的能力。 负载均衡的流程如下: 当使用 RestTemplate 发送请求时,会先判断请求的 URL 是否包…

03、全文检索 -- Solr -- Solr 身份验证配置(给 Solr 启动身份验证、添加用户、删除用户)

目录 全文检索 -- Solr -- Solr 身份验证配置启用身份验证&#xff1a;添加用户&#xff1a;删除用户&#xff1a; 全文检索 – Solr – Solr 身份验证配置 学习之前需要先启动 Solr 执行如下命令即可启动Solr&#xff1a; solr start -p <端口>如果不指定端口&#xf…

保姆级的指针详解(超详细)

目录 一.内存和地址  1.初识指针 2.如何理解编址 二. 指针变量 三.指针的解引用操作符 1.指针变量的大小 四.指针变量类型的意义 五.指针的运算 1.指针加减整数 2.指针减指针 3.野指针 3.1指针未初始化 3.2指针越界访问 3.3指针指向的空间被提前释放 3.4如何规…

05:容器镜像技术揭秘|发布容器服务器|私有镜像仓库

容器镜像技术揭秘&#xff5c;发布容器服务器&#xff5c;私有镜像仓库 创建镜像使用commit方法创建自定义镜像。Dockerfile打包镜像创建apache服务镜像制作 php 镜像 微服务架构创建nginx镜像 发布服务通过映射端口发布服务容器共享卷 docker私有仓库 创建镜像 使用commit方法…

Mac用Crossover玩《幻兽帕鲁》手柄不能用怎么办? Mac电脑玩《幻兽帕鲁》怎么连接手柄? 幻兽帕鲁玩家超1900万

2024年首款爆火Steam平台的游戏《幻兽帕鲁》&#xff0c;在使用Crossover后可以用Mac系统玩了&#xff0c;很多玩家喜欢通过手柄玩游戏&#xff0c;它拥有很好的握持体验&#xff0c;长时间玩也不会很累&#xff0c;所以很多《幻兽帕鲁》玩家都喜欢用手柄来操作&#xff0c;很多…

idea 中 tomcat 乱码问题修复

之前是修改 Tomcat 目录下 conf/logging.properties 的配置&#xff0c;将 UTF-8 修改为 GBK&#xff0c;现在发现不用这样修改了。只需要修改 IDEA 中 Tomcat 的配置就可以了。 修改IDEA中Tomcat的配置&#xff1a;添加-Dfile.encodingUTF-8 本文结束

大路灯有必要买吗?五款年度好用大路灯推荐

随着人们生活水平上升&#xff0c;对健康的关注度也不断提高&#xff0c;护眼灯的需求也越来越多。而护眼落地灯作为一种新型的照明产品&#xff0c;具有独特的优点。护眼落地灯采用柔和的自然光源&#xff0c;能有效减少眼睛疲劳和视力损伤&#xff0c;提高工作和学习的效率。…

消息中间件特性

一&#xff1a;消息队列的主要作用是什么&#xff1f; 1.消息队列的特性&#xff1a; 业务无关&#xff0c;一个具有普适性质的消息队列组件不需要考虑上层的业务模型&#xff0c;只做好消息的分发就可以了&#xff0c;上层业务的不同模块反而需要依赖消息队列所定义的规范进行…

亚信安全助力宁夏首个人工智能数据中心建成 铺设绿色算力安全底座

近日&#xff0c;由宁夏西云算力科技有限公司倾力打造&#xff0c;亚信安全科技股份有限公司&#xff08;股票代码&#xff1a;688225&#xff09;全力支撑&#xff0c;总投资达数十亿元人民币的宁夏智算中心项目&#xff0c;其一期工程——宁夏首个采用全自然风冷技术的30KW机…

软考高项十大管理49个过程记忆口诀

一、十大管理口诀 口诀&#xff1a;范进整狗子&#xff0c;成人风采干 内容&#xff1a;范围管理、进度管理、整合管理、沟通管理、质量管理、成本管理、资源管理、风险管理、采购管理、干系人管理 二、49个过程口诀 1、整合管理 口诀&#xff1a;按章程计划指导知识、监控…