【Spark计算引擎----第三篇(RDD)---《深入理解 RDD:依赖、Spark 流程、Shuffle 与缓存》】

news2024/9/30 0:28:34

前言:
💞💞大家好,我是书生♡,本阶段和大家一起分享和探索大数据技术Spark—RDD,本篇文章主要讲述了:RDD的依赖、Spark 流程、Shuffle 与缓存等等。欢迎大家一起探索讨论!!!
💞💞代码是你的画笔,创新是你的画布,用它们绘出属于你的精彩世界,不断挑战,无限可能!

个人主页⭐: 书生♡
gitee主页🙋‍♂:闲客
专栏主页💞:大数据开发
博客领域💥:大数据开发,java编程,前端,算法,Python
写作风格💞:超前知识点,干货,思路讲解,通俗易懂
支持博主💖:关注⭐,点赞、收藏⭐、留言💬

目录

  • 1. RDD缓存和checkpoint
    • 1.1 缓存机制
    • 1. 2 CheckPoint 机制
  • 2. RDD依赖
    • 2.1 窄依赖(Narrow Dependency)
    • 2.2 宽依赖(Wide Dependency)
    • 2.3 管理依赖
    • 2.4 日志查看依赖关系和计算流程
    • 2.5 划分stage
  • 3.Spark的运行流程(内核调度)
  • 4.spark的shuffle过程
    • 4.1 shuffle介绍
    • 4.2 Shuffle 的过程
      • Shuffle 的影响
    • 4.3 SparkShuffle配置

1. RDD缓存和checkpoint

  在 Apache Spark 中,缓存(也称为持久化)和 checkpoint 是两种用于优化性能和容错的重要机制。这两种机制可以帮助减少重复计算,提高应用程序的效率。
  RDD的缓存和checkpoint机制也是spark计算速度快的原因之一
在这里插入图片描述

1.1 缓存机制

定义:
  缓存是一种将 RDD 的计算结果存储在内存或磁盘上的机制。通过缓存,Spark 可以避免重新计算已经处理过的数据,从而显著提高应用程序的性能。

  • 缓存是将RDD存储到内存上或者是本地磁盘上(Linux)
  • 缓存是临时持久化操作

在这里插入图片描述

用途:

  • 提高性能:对于需要多次访问的数据集,缓存可以避免重复计算,显著加快执行速度。
  • 内存管理:可以根据可用内存和数据的重要性选择合适的存储级别。
  • 保证RDD容错性:应用程序运行过程中, 可能因为一些原因导致rdd计算失败需要重新计算

应用场景:

  • 计算时间长的rdd
  • 计算成本昂贵的RDD
  • 重复多次使用的RDD

注意点

  • 应用程序结束后会自动清空缓存RDD
  • 缓存不会切断RDD之间的依赖关系(缓存的rdd有可能丢失, 丢失后还可以通过依赖关系计算得到)
  • 缓存的RDD需要通过aciton算子触发缓存任务, 触发缓存任务后的RDD才是从缓存中获取的,触发缓存任务之前, 调用的rdd还是通过依赖关系计算得到的
  • 缓存级别:缓存的RDD存储在哪里 默认存储在内存中, 也可以设置存储在内存和本地磁盘
  • persist 只是定义了一个缓存任务,并不是执行
  • 使用unpersist:释放缓存

API

  • cache():默认将 RDD 存储在内存中,如果内存不足则溢出到磁盘。
  • persist(storageLevel):允许指定不同的存储级别,如 MEMORY_ONLY, MEMORY_AND_DISK, DISK_ONLY 等。
  • is_cached :查看缓存状态,进行了缓存返回TRUE,反之返回False
  • unpersist:释放缓存

缓存的级别:缓存RDD默认存储在内存中
from pyspark.storagelevel import StorageLevel

  • StorageLevel.DISK_ONLY # 将数据缓存到磁盘上
  • StorageLevel.DISK_ONLY_2 #将数据缓存到磁盘上 保存两份
  • StorageLevel.DISK_ONLY_3 # 将数据缓存到磁盘上 保存三份
  • StorageLevel.MEMORY_ONLY # 将数据缓存到内存 默认
  • StorageLevel.MEMORY_ONLY_2 #将数据缓存到内存 保存两份
  • StorageLevel.MEMORY_AND_DISK # 将数据缓存到内存和磁盘 优先将数据缓存到内存上,内存不足可以缓存到磁盘
  • StorageLevel.MEMORY_AND_DISK_2 # 将数据缓存到内存和磁盘
  • StorageLevel.OFF_HEAP # 基本不使用 缓存在系统管理的内存上 jvm(内存)在系统上运行,系统内存
  • StorageLevel.MEMORY_AND_DISK_ESER # 将数据缓存到内存和磁盘 序列化操作,按照二进制存储,节省空间

案例:
这个案例中我们对 rdd_cnt 这个RDD对象进行了缓存操作

rdd_cnt.persist(storageLevel=StorageLevel.MEMORY_ONLY)

# 创建SparkContext对象
sc = SparkContext()

# 从HDFS读取文本文件
rdd_file = sc.textFile("/data/stu.csv")
# 读取文件后,对文件进行分割,得到每个学生的信息
rdd_stu = rdd_file.map(lambda x: x.split(","))
rdd_map = rdd_stu.map(lambda x: (x[0], int(x[1])))

# 查看是否对rdd进行缓存操作, 返回True或False
print(rdd_map.is_cached)
# 统计不同年龄段的人数 0-30 为青年,30-60为中年,大于60为老年
rdd_age= rdd_map.map(lambda x: ('青年',1) if x[1]<30 else ('中年',1) if x[1]<=60 and x[1]>=30 else ('老年',1) )
rdd_cnt= rdd_age.reduceByKey(lambda x,y: x+y)

# 对rdd_reducebykey进行缓存操作, 只是定义了一个缓存任务
# 如果实现对rdd进行缓存, 需要调用action算子触发缓存任务
# 触发缓存任务之前, 调用的当前rdd还是通过依赖关系计算得到的
rdd_cnt.persist(storageLevel=StorageLevel.MEMORY_ONLY)
# 查看是否对rdd进行缓存操作, 返回True或False
print(rdd_cnt.is_cached)

# 手动释放
rdd_cnt.unpersist()
print(rdd_cnt.is_cached)

print(rdd_cnt.collect())

在这里插入图片描述

内存限制:确保有足够的内存来缓存数据,否则可能会导致 OutOfMemoryError。

1. 2 CheckPoint 机制

定义
  Checkpoint 是一种容错机制,用于将 RDD 的数据持久化到可靠的存储系统(如 HDFS、S3 等)。与缓存不同,Checkpoint 不仅用于提高性能,还可以在节点故障时恢复数据。

  • checkpoint将RDD存储到HDFS(分块多副本)中
  • checkpoint的RDD是不会随程序运行结果而被清空
  • checkpoint会永久存储RDD, RDD之间的依赖关系会被删除
  • checkpoint是永久持久化操作

用途

  • 提高计算效率
  • 容错:当 Spark 应用程序遇到故障时,可以从 Checkpoint 恢复数据,避免重新计算整个数据流。
  • 减少依赖:通过 Checkpoint,可以减少计算依赖的深度,从而降低故障时需要重新计算的数据量。

注意

  • 永久存储在HDFS上
  • 程序运行结束不会被删除
  • 会切断rdd之间的依赖关系
  • 需要通过action算子触发checkpoint任务

API

  • setCheckpointDir(path=):将 RDD 的数据持久化到指定的目录。
  • rdd.checkpoint() :对RDD进行checkpoint操作

使用

checkpoint:将rdd存储在HDFS中使用:

  • ①设置checkpoint目录路径 sc.setCheckpointDir()
  • ②rdd.checkpoint()

案例:

"""
checkpoint:将rdd存储在HDFS中
使用:①设置checkpoint目录路径 sc.setCheckpointDir() ②rdd.checkpoint()
注意点:①永久存储在HDFS上 ②程序运行结束不会被删除 ③会切断rdd之间的依赖关系 ④需要通过action算子触发checkpoint任务
"""
# 统计不同词出现的次数 -> 分组聚合操作
from pyspark import SparkContext
# 导入缓存级别类
from pyspark import StorageLevel

# 创建sc对象
sc = SparkContext()
# 设置checkpoint目录
sc.setCheckpointDir('/checkpoint')
# 读取hdfs文件数据,转换成rdd对象
rdd_words = sc.textFile('/data/words.txt')
rdd_flatmap = rdd_words.flatMap(lambda x: x.split(','))
rdd_map = rdd_flatmap.map(lambda x: (x, 1))
print(rdd_map.is_checkpointed)
rdd_reducebykey = rdd_map.reduceByKey(lambda x, y: x + y)
# 对rdd_reducebykey进行checkpoint操作, 只是定义了一个checkpoint任务
# 如果实现对rdd进行checkpoint, 需要调用action算子触发checkpoint任务
# 触发checkpoint任务之前, 调用的rdd还是通过依赖关系计算得到的
rdd_reducebykey.checkpoint()
# 查看是否对rdd进行checkpoint操作, 返回True或False
print(rdd_reducebykey.is_checkpointed)
# 根据词出现的次数进行降序操作
# rdd_reducebykey是通过依赖关系计算得到的, 不是从checkpoint中获取的
rdd_sortby = rdd_reducebykey.sortBy(lambda x: x[1], ascending=False)
print(rdd_sortby.collect())
# rdd_reducebykey是从checkpoint中获取的
rdd_sortby2 = rdd_reducebykey.sortBy(lambda x: x[1], ascending=True)
print(rdd_sortby2.collect())

在这里插入图片描述

注意事项
存储空间:确保 Checkpoint 目录有足够的存储空间。
性能开销:频繁地执行 Checkpoint 会增加额外的 I/O 开销。

2. RDD依赖

  在 Apache Spark 中,弹性分布式数据集(Resilient Distributed Dataset, RDD)之间的依赖关系是 Spark 计算模型的核心部分。依赖关系决定了数据的处理顺序和粒度,同时也影响着 Spark 作业的执行效率和容错性。

  1. 定义

    • RDD 之间的依赖关系指的是一个 RDD 如何依赖于另一个 RDD 的数据。(相邻RDD之间存在的因果关系, 可以称为依赖关系)----》新RDD一定是由旧RDD计算得到, RDD1->RDD2->RDD3
    • 这种依赖关系决定了数据流的方向和数据处理的顺序。
    • RDD特性之一
    • 依赖关系可以保证RDD计算的容错性, 如果rdd因为某些原因计算失败, 可以根据依赖关系重新计算
  2. 类型

    • 窄依赖(Narrow Dependency):每个父 RDD 分区最多被一个子 RDD 分区使用。
    • 宽依赖(Wide Dependency):一个子 RDD 分区可能依赖于多个父 RDD 分区。
  3. 影响

    • 窄依赖:允许 Spark 在流水线中并行执行任务。
    • 宽依赖:导致数据重分布,增加了计算成本。

在这里插入图片描述

2.1 窄依赖(Narrow Dependency)

  • 定义

    • 窄依赖指的是父 RDD 的每个分区最多被一个子 RDD 分区使用。(一对一或者多对一关系)
    • 窄依赖通常出现在 mapfilterunion 等操作中。
  • 触发窄依赖关系的算子

    • map()
    • flatMap()
    • filter()
    • mapValues()
    • mapPartitions()
  • 特点

    • 窄依赖操作可以并行执行,因为它们不需要重新分布数据。
    • 通常不会触发 shuffle 操作。

在这里插入图片描述
在这里插入图片描述

  • 示例
    • 使用 map 操作将每个元素乘以 2:
      rdd = sc.parallelize([1, 2, 3, 4, 5])
      doubled_rdd = rdd.map(lambda x: x * 2)
      

2.2 宽依赖(Wide Dependency)

  • 定义

    • 宽依赖指的是一个子 RDD 分区可能依赖于多个父 RDD 分区。
    • 宽依赖通常出现在 groupByKeyreduceByKeyjoin 等操作中。
  • 触发宽依赖关系的算子

  • groupBy()
  • groupByKey()
  • reduceByKey()
  • sortBy()
  • sortByKey()
  • distinct()
  • 特点

    • 宽依赖操作需要触发 shuffle 操作,即数据需要在节点间进行重分布。
    • 通常会导致较高的计算成本。
  • 示例

    • 使用 reduceByKey 对数据进行聚合:
      rdd = sc.parallelize([(1, 2), (1, 3), (2, 4)])
      result = rdd.reduceByKey(lambda x, y: x + y)
      

影响

  1. 计算效率

    • 窄依赖通常比宽依赖更高效,因为它们不需要 shuffle 数据。
    • 宽依赖可能会导致更多的磁盘 I/O 和网络传输,从而降低性能。
  2. 容错性

    • RDD 的容错性是通过 lineage 信息来实现的。
    • 当数据丢失时,Spark 可以根据依赖关系重新计算丢失的数据。
  3. 执行计划

    • Spark 的 DAGScheduler 会根据依赖关系构建执行计划。
    • 宽依赖会导致新的 stage 的形成,而窄依赖则可以在同一个 stage 内执行。
      在这里插入图片描述
      在这里插入图片描述

示例

假设我们有一个简单的 RDD rdd1,并执行了一系列的操作来创建新的 RDD。

rdd1 = sc.parallelize([1, 2, 3, 4, 5])

# 窄依赖
rdd2 = rdd1.map(lambda x: x * 2)

# 宽依赖
rdd3 = rdd2.groupByKey()

# 窄依赖
rdd4 = rdd3.flatMap(lambda x: x)

# 宽依赖
rdd5 = rdd4.reduceByKey(lambda x, y: x + y)

在这个例子中,rdd2rdd4 之间的依赖关系是窄依赖,而 rdd3rdd5 之间的依赖关系是宽依赖。宽依赖操作(如 groupByKeyreduceByKey)会导致数据重分布,而窄依赖操作(如 mapflatMap)则不需要重分布数据。

2.3 管理依赖

通过DAG有向无环图图计算算法管理RDD之间的依赖关系

在这里插入图片描述

  • DAG称为有向无环图(有方向没有闭环), 是图计算中的一种算法
  • DAG有向无环图作用
    • 管理RDD之间的依赖关系, 保证RDD按照依赖关系进行有序地计算
    • 根据RDD之间的依赖关系对计算任务划分成多个计算步骤, 每个步骤称为stage阶段
      • 触发宽依赖关系的算子会产生新的stage阶段
      • 窄依赖关系的算子计算步骤是在同一个stage阶段进行

在这里插入图片描述

2.4 日志查看依赖关系和计算流程

  • app spark应用程序
    • appID 就是这个程序的ID
    • APP Name 就是这个spark程序的名字(别名)

在这里插入图片描述

  • job -> 计算任务(一个app中是可能有多个job), 执行action算子时才会产生job
    在这里插入图片描述
    • stage -> 计算步骤/阶段, DAG根据宽依赖关系划分成多个stage
      在这里插入图片描述
    • task -> task线程任务, 真正执行的计算任务,有多少个分区就有多个task线程任务

在这里插入图片描述

2.5 划分stage

怎么划分stage:DAG根据宽依赖关系划分成多个stage

为甚要划分成多个stage呢?

  • spark的task任务是以线程方式实现多任务计算, 线程多任务会有一个资源抢夺问题, 导致计算不准确
  • spark中同一个stage中的多个task任务是并行计算的, 下一个stage中的多个task任务要想并行计算, 需要等上一个stage计算步骤完成后才能并行计算
  • 为什么要等待上一个stage计算完成?
    • 宽依赖是会进行shuffle过程, 数据需要重新洗牌, 等待过程就是洗牌过程
  • 如何划分stage?
    • 查看rdd之间是否存在宽依赖关系
    • 触发宽依赖关系的算子
    • 通过日志查看DAG有向无环图‘

注意:在一个stage中,会有多个线程也就是多个task任务是并行计算,那么就会有资源竞争,有的任务执行快,有的任务执行慢,当执行快的任务执行完的时候,慢的任务刚刚执行了,一旦这个时候通过宽依赖进行计算就会出现数据缺失的的问题,因此划分成多个stage,让执行快的任务等慢的任务执行完之后一起一起执行宽依赖的算子计算,这样子数据就不会缺失了。

在这里插入图片描述

3.Spark的运行流程(内核调度)

Spark框架中封装了三个Scheduler类完成整个spark的计算过程

  Spark的核心是根据RDD来实现的,Spark Scheduler则为Spark核心实现的重要一环,其作用就是任务调度。Spark的任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,将每个Stage中的任务发到指定节点运行。基于Spark的任务调度原理,可以合理规划资源利用,做到尽可能用最少的资源高效地完成任务计算。

  • DAGScheduler
    • 根据rdd间的依赖关系,将提交的job划分成多个stage。
    • 对每个stage中的task进行描述(task编号,task执行的rdd算子)
  • TaskScheduler
    • 获取DAGScheduler提交的task
    • 调用SchedulerBackend获取executor的资源信息
    • 给task分配资源,维护task和executor对应关系
    • 管理task任务队列
    • 将task给到SchedulerBackend,然后由SchedulerBackend分发对应的executor执行
  • SchedulerBackend
    • 向RM申请资源
    • 获取executor信息
    • 分发task任务

在这里插入图片描述

  • 执行spark应用程序, 创建driver进程, driver进程调用三个scheduler类创建三个scheduler对象
  • schedulerbackend向资源调度工具主服务申请计算资源, 创建executor进程
  • 执行了action算子触发计算任务, DAGscheduler根据宽依赖关系划分stage, 同时分析stage中的task描述, 将taskSet提交给Taskscheduler
  • Taskscheduler将计算资源分配给task, 同时维护task和executor之间关系, 管理task执行顺序, 将分配计算资源的task交给schedulerbackend
  • schedulerbackend将task线程任务给到executor进程执行

4.spark的shuffle过程

4.1 shuffle介绍

  在 Apache Spark 中,Shuffle 是一个关键的概念,它涉及到数据的重新分布,通常发生在宽依赖操作中,例如 groupByKey, reduceByKey, join 等。

mapreduce的shuffle作用: 将map计算后的数据传递给reduce使用

mapreduce的shuffle过程: 分区,排序,合并(规约)

  • Shuffle 的定义
    Shuffle 是指在 Spark 中对数据进行重新分布的过程,通常涉及到将数据从一个节点移动到另一个节点。这个过程发生在宽依赖操作中,因为这些操作需要将具有相同 key 的数据聚集在一起,而这些数据可能最初分布在不同的节点上。

  • Shuffle 的原因
    Shuffle 发生的主要原因是需要将数据重新分布到不同的分区中,以便进行聚合或连接等操作。例如,在 groupByKey 操作中,具有相同 key 的所有元素需要被聚集在一起以进行聚合计算。

  • 作用:不同阶段的数据传递

  • 无论是spark shuffle还是mapreduce shuffle,本质都是传递数据

  • spark的shuffle分成两个阶段

    • map阶段: shuffle write, 将上一个stage的数据保存到磁盘文件中
    • reduce阶段: shuffle read, 将磁盘文件中的数据保存到下一个stage中

在这里插入图片描述

spark的shuffle方法类:
是spark封装好的处理shuffle的方法

  • hashshuffle
    • spark1.2版本之前使用, 在spark2.0版本删除
    • hash(key)%分区数=结果值…余数, 余数相同的数据放到一起
    • 未优化的hashshuffle -> 有多少个buffer有多少个磁盘小文件
    • 优化后的hashshuffle -> 有多少个分区有多少个磁盘小文件

在这里插入图片描述
在这里插入图片描述

  • sortshuffle
    • spark2.0/3.0版本使用的都是sortshuffle
    • 普通模式 -> 使用排序方式将数据划分
      • 将分区数据存储在5M大小的memory中, 从memory取1w条数据进行排序
    • bypass模式
      • 类似于优化后的hashshuffle
      • hash(key)%分区数=结果值…余数, 余数相同的数据放到一起
        在这里插入图片描述
        在这里插入图片描述

无论是hash还是排序都是将相同key值放在一起处理

  • [(‘a’,1),(‘b’,2),(‘a’,1)]
  • hash(key)%分区数,相同的key数据余数是相同的,会放一起,交给同一个分区进行处理
  • 按照key排序,相同key的数据也会放在一起 ,然后交给同一分区处理

4.2 Shuffle 的过程

Shuffle 的过程可以分为以下几个主要阶段:

  1. Map 阶段

    • Map 阶段通常涉及对输入数据进行转换,例如应用 mapflatMap 等操作。
    • 在宽依赖操作中,数据会被标记为需要参与 shuffle。
    • Map 阶段还会进行一些优化,例如将部分结果写入本地磁盘。
  2. Shuffle write

    • Map 阶段产生的数据会被写入本地磁盘上的 shuffle 文件。
    • 每个 map 任务都会产生一个或多个 shuffle 文件,这些文件按 key 进行分区。
    • Shuffle 文件通常会被压缩以节省存储空间和传输时间。
  3. Shuffle read

    • 在 shuffle 读阶段,reduce 任务会从所有 map 任务产生的 shuffle 文件中读取数据。
    • 读取数据时,reduce 任务会根据 key 去定位相应的 shuffle 文件,并从中读取数据。
    • 数据可能需要在网络上传输,这取决于数据的存储位置。
  4. Reduce 阶段

    • Reduce 阶段处理从 shuffle 文件中读取的数据。
    • 对于每个 key,reduce 任务会执行相应的聚合或连接操作。
    • 最终结果会被输出到内存或磁盘上。

Shuffle 的影响

Shuffle 过程可能会显著影响 Spark 应用程序的性能,因为它涉及到大量的磁盘 I/O 和网络传输。为了减少 shuffle 的影响,可以采取以下措施:

  • 减少 shuffle 的数量:尽量使用窄依赖操作来减少 shuffle 的需求。
  • 调整并行度:通过设置 spark.sql.shuffle.partitions 来调整 shuffle 的并行度。
  • 优化数据分布:确保数据在节点之间均匀分布,以减少数据倾斜。
  • 启用压缩:通过启用 shuffle 文件的压缩来减少传输的数据量。
  • 使用高效的序列化方式:例如使用 Kryo 序列化器来提高序列化和反序列化的效率。

示例
假设我们有一个简单的 RDD,我们想要使用 reduceByKey 来计算每个 key 的总和。

from pyspark import SparkContext

sc = SparkContext("local", "Shuffle Example")

# 创建一个 RDD
rdd = sc.parallelize([(1, 2), (1, 3), (2, 4)])

# 使用 reduceByKey 进行聚合
result = rdd.reduceByKey(lambda x, y: x + y)

print("Result:", result.collect())

在这个例子中,reduceByKey 操作会导致 shuffle,因为需要将具有相同 key 的元素聚集在一起。

4.3 SparkShuffle配置

  • spark.shuffle.file.buffer

参数说明:

该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32K)。将数据写到磁盘文件之前会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。 2倍 3倍

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升

  • spark.reducer.maxSizeInFlight

参数说明:

该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。(默认48M)

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

  • spark.shuffle.io.maxRetries and spark.shuffle.io.retryWai

spark.shuffle.io.maxRetries :

shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。(默认是3次)

spark.shuffle.io.retryWait:

该参数代表了每次重试拉取数据的等待间隔。(默认为5s)

调优建议:一般的调优都是将重试次数调高,不调整时间间隔。

  • spark.shuffle.memoryFraction=10

参数说明:

该参数代表了Executor 1G内存中,分配给shuffle read task进行聚合操作内存比例。

  • spark.shuffle.manager

参数说明:该参数用于设置shufflemanager的类型(默认为sort)
Hash:spark1.x版本的默认值,HashShuffleManager
Sort:spark2.x版本的默认值,普通机制。当shuffle read task 的数量小于等于200采用bypass机制

  • spark.shuffle.sort.bypassMergeThreshold=200
    • 根据task数量决定sortshuffle的模式
    • task数量小于等于200 就采用bypass task大于200就采用普通模式

参数说明:

当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些

  1. 交互式开发中使用
pyspark --master yarn --name shuffle_demo --conf 'spark.shuffle.sort.bypassMergeThreshold=300'

通过--conf = ''来配置参数

  1. 脚本中配置参数
  • 创建conf对象, 实现spark参数设置
  • 对象名=类名()
  • 调用set()返回对象本身
conf = (SparkConf().
		set('spark.shuffle.sort.bypassMergeThreshold', '300').
		set('spark.shuffle.io.maxRetries', '5'))
  • 创建sc对象
  • 传递conf对象
  • sc = SparkContext(master=‘yarn’, appName=‘shuffle_demo’, conf=conf)
from pyspark import SparkContext
from pyspark import SparkConf

# 创建conf对象, 实现spark参数设置
# 对象名=类名()
# 调用set()返回对象本身
conf = (SparkConf().
		set('spark.shuffle.sort.bypassMergeThreshold', '300').
		set('spark.shuffle.io.maxRetries', '5'))

# 创建sc对象
# 传递conf对象
sc = SparkContext(master='yarn', appName='shuffle_demo', conf=conf)
# 读取hdfs文件数据,转换成rdd对象
rdd_words = sc.textFile('/data/words.txt')
print(rdd_words.take(num=3))
rdd_flatmap = rdd_words.flatMap(lambda x: x.split(','))
print(rdd_flatmap.collect())
rdd_map = rdd_flatmap.map(lambda x: (x, 1))
print(rdd_map.collect())
rdd_reducebykey = rdd_map.reduceByKey(lambda x, y: x + y)
print(rdd_reducebykey.collect())
# 根据词出现的次数进行降序操作
rdd_sortby = rdd_reducebykey.sortBy(lambda x: x[1], ascending=False)
print(rdd_sortby.collect())

查看历史服务,发现配置生效
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1980059.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Gold菜鸟】Linux知识回忆(8)——进程和计划任务

前言 这部分让我们来继续了解Linux中进程和计划任务的相关知识吧~ 相关技术交流欢迎添加VX: wenjinworkon 目录 进程和内存管理 什么是进程 进程结构 进程相关概念 物理地址空间和虚拟地址空间 用户和内核空间 进程使用内存问题 进程状态 内存淘汰数据机制&#xff1a;…

数学建模评价类—Topsis法

目录 文章目录 前言 切记&#xff1a;以下内容仅用于参考理解&#xff0c;不可用于数模竞赛&#xff01;&#xff01;&#xff01; 一、Topsis的基本原理 二、Topsis的建模过程 1.判断矩阵是否需要正向化 2.原始矩阵正向化 3.矩阵标准化 4.计算距离&#xff0c;给出得…

Can Large Language Models Provide Feedback to Students? A Case Study on ChatGPT

文章目录 题目摘要相关工作方法结果讨论意义 题目 大型语言模型能为学生提供反馈吗&#xff1f;ChatGPT 案例研究 论文地址&#xff1a;https://ieeexplore.ieee.org/abstract/document/10260740 摘要 摘要——教育反馈已被广泛认为是提高学生学习能力的有效方法。然而&#x…

Python | Leetcode Python题解之第322题零钱兑换

题目&#xff1a; 题解&#xff1a; class Solution:def coinChange(self, coins: List[int], amount: int) -> int:dp [float(inf)] * (amount 1)dp[0] 0for coin in coins:for x in range(coin, amount 1):dp[x] min(dp[x], dp[x - coin] 1)return dp[amount] if d…

Python的if语句及其运用

一、条件测试 每条if语句的核心都是一个值为True或False的表达式&#xff0c;这种表达式称为条件测试。如果测试的条件满足if语句则为True&#xff0c;接着执行if里的语句&#xff1b;如果测试的条件不满足if语句则为False&#xff0c;则不执行if里的语句。 1.1、检查是否相等…

C++ | Leetcode C++题解之第322题零钱兑换

题目&#xff1a; 题解&#xff1a; class Solution { public:int coinChange(vector<int>& coins, int amount) {int Max amount 1;vector<int> dp(amount 1, Max);dp[0] 0;for (int i 1; i < amount; i) {for (int j 0; j < (int)coins.size();…

二叉树(真题)

1.用非递归遍历求二叉树结点个数【计学2020】 算法思想:用先序非递归遍历 当前指针不为空或栈不为空进行循环&#xff1b; 当前指针不为空访问当前结点&#xff0c;当前节点入栈&#xff0c;进入左子树 当前指针为空&#xff0c;栈顶元素出栈&#xff08;回溯&#xff09;&…

【kickstart+pxe批量安装linux系统】

目录 一、实验环境准备二、安装kickstart1、kickstart自动安装脚本的制作 三、安装web服务器&#xff0c;提供网络源四、安装dhcp五、安装syslinux&#xff0c;tftp-server六、虚拟机中新建新主机 一、实验环境准备 1、rhel7主机 2、开启主机图形 init 5 开图形 3、配置网络可…

ESP8266 烧录,待坑

ets Jan 8 2013,rst cause:1, boot mode:(7,0)waiting for host 空芯片&#xff0c;未加SPI FLASH 显示 下载模式(IO15 10k下拉 &#xff0c; IO0下拉 &#xff08;直接GND),IO2上拉&#xff08;文档上说是有内部上拉的&#xff0c;先上拉&#xff09;&#xff09; &#xff…

jdbc(mysql)

1.概述 jdbc&#xff1a;java database connection&#xff08;java与数据库连接&#xff09; java可以连接不同数据库&#xff0c;不同数据库连接细节不同&#xff0c;具体细节都由数据库自己实现 由java设计出一系列连接数据库的接口规范&#xff0c;然后由不同的数据库开发…

C语言程序设计26

《C程序设计教程&#xff08;第四版&#xff09;——谭浩强》 习题2.3 上机运行下面的程序&#xff0c;分析输出结果 代码 //《C程序设计教程&#xff08;第四版&#xff09;——谭浩强》 //习题2.3 上机运行下面的程序&#xff0c;分析输出结果#include <stdio.h> int …

【MYSQL】MYSQL逻辑架构

mysql逻辑架构分为3层 mysql逻辑架构分为3层 1). 连接层&#xff1a;主要完成一些类似连接处理&#xff0c;授权认证及相关的安全方案。 2). 服务层&#xff1a;在 MySQL据库系统处理底层数据之前的所有工作都是在这一层完成的&#xff0c;包括权限判断&#xff0c;SQL接口&…

GD 32 IIC通信协议

前言&#xff1a; ... 通信方式 通信方式分为串行通信和并行通信。常见的串口就是串行通信的方式 常用的串行通信接口 常用的串行通信方式有USART,IIC,USB,CAN总线 同步与异步 同步通信&#xff1a;IIC是同步通信&#xff0c;有两个线一个是时钟信号线&#xff0c;一个数数据…

rocketMq-5.2.0双主双从搭建

最近在研究rocketmq5.x的运行机制&#xff0c;研究到高可用章节&#xff0c;看到rocketMq采用了主从机制实现高可用&#xff0c;将broker分成了master和slave。为了更好的理解主从源码&#xff0c;我觉着需要先搭建一个主从的集群&#xff0c;先了解主从集群是怎么使用的。 这篇…

【practise】只出现一次的数字

现在给你一个数组&#xff0c;里面放了一些数字&#xff0c;里面都是两两成对&#xff0c;只有一个数字是单独的&#xff0c;要求找出其中只出现一次的数字。相必这道题是非常简单了&#xff0c;有很多解法比如说用暴力求解&#xff1f;比如说用位运算&#xff1f;甚至说用哈希…

使用Docker+ollama部署大模型

Docker的安装----在 Ubuntu 系统上安装 Docker 一&#xff1a;配置系统的 APT 软件包管理器 首先添加 Docker 的官方 GPG 密钥 # Add Dockers official GPG key: sudo apt-get update sudo apt-get install ca-certificates curl gnupg sudo install -m 0755 -d /etc/apt/ke…

使用 宝塔面板 部署 php网站

【语料库网站】宝塔面板 在线部署全过程 代码仓库&#xff1a;https://github.com/talmudmaster/RedCorpus 网站介绍 语料库提供双语文本检索和分享功能。供英语、翻译相关专业的爱好者&#xff0c;学生和老师学习使用。 该网站是对BiCorpus开源项目的二次开发。 技术栈&am…

DA14695 printf没办法打印浮点数

是因为没有打开浮点数库&#xff0c;添加了这个库也会导致堆内存的增加

基于Kahn算法|动态线程池,支持扩展点并发执行|召回|过滤

背景 在《分布式领域扩展点设计稿》一文中&#xff0c;我们提到针对业务横向扩展点和纵向扩展点的编排能力。 那有这样的一种场景&#xff1a;针对于一次会话&#xff0c;同时会调很多外部服务&#xff0c;同时这些RPC服务会有多种直接或间接的关系&#xff0c;是否有更高效的…

【Spring】Bean详细解析

1.Spring Bean的生命周期 整体上可以简单分为四步&#xff1a;实例化 —> 属性赋值 —> 初始化 —> 销毁。初始化这一步涉及到的步骤比较多&#xff0c;包含 Aware 接口的依赖注入、BeanPostProcessor 在初始化前后的处理以及 InitializingBean 和 init-method 的初始…