Spark性能优化五 算子优化

news2025/1/12 17:22:05

文章目录

  • (一)map 和 mapPartitions
  • (二)foreach 和 foreachPartition
  • (三)repartition的使用
  • (四)reduceByKey 和 groupByKey的区别

(一)map 和 mapPartitions

  • map 操作:对 RDD 中的每个元素进行操作,一次处理一条数据
  • mapPartitions 操作:对 RDD 中每个 partition 进行操作,一次处理一个分区的数据

所以:

  • map 操作: 执行 1 次 map算子只处理 1 个元素,如果 partition 中的元素较多,假设当前已经处理了 1000 个元素,在内存不足的情况下,Spark 可以通过GC等方法回收内存(比如将已处理掉的1000 个元素从内存中回收)。因此, map 操作通常不会导致OOM异常;
  • mapPartitions 操作: 执行 1 次map算子需要接收该 partition 中的所有元素,因此一旦元素很多而内存不足,就容易导致OOM的异常,也不是说一定就会产生OOM异常,只是和map算子对比的话,相对来说容易产生OOM异常
    不过一般情况下,mapPartitions 的性能更高;初始化操作、数据库链接等操作适合使用 mapPartitions操作
    这是因为:
    假设需要将 RDD 中的每个元素写入数据库中,这时候就应该把创建数据库链接的操作放置在mapPartitions 中,创建数据库链接这个操作本身就是个比较耗时的,如果该操作放在 map 中执行,将会频繁执行,比较耗时且影响数据库的稳定性。
    map 和mapPartition代码区别
object MapPartitionScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("CheckpointOpScala")
      .setMaster("local")

    val sc = new SparkContext(conf)
    //设置分区数量为2
    val  dataRdd = sc.parallelize(Array(1,2,3,4,5),2)
//    val sum =dataRdd.map(item=>{
//      println("=======")
//      item*2
//    }).reduce(_+_)
    
    //mapParition一次处理一个分区的数据
    val sum  = dataRdd.mapPartitions(it=>{
    println("=======")
      val result = new ArrayBuffer[Int]()
      it.foreach(item=>{
        result.+=(item*2)
      })
      result.toIterator
    }).reduce(_+_)

    print("sum:"+sum)
    sc.stop()
  }
}
  1. 建议针对初始化链接之类的操作,使用mapPartitions,放在mapPartitions内部
    例如:创建数据库链接,使用mapPartitions可以减少链接创建的次数,提高性能
  2. 注意:创建数据库链接的代码建议放在次数,不要放在Driver端或者it.foreach内部
    数据库链接放在Driver端会导致链接无法序列化,无法传递到对应的task中执行,所以
  3. 数据库链接放在it.foreach()内部还是会创建多个链接,和使用map算子的效果是一样

(二)foreach 和 foreachPartition

  • foreach:一次处理一条数据

  • foreachPartition:一次处理一个分区的数据

  • foreachPartition的特性和mapPartitions 的特性是一样的,唯一的区别就是
    mapPartitions 是 transformation 操作(不会立即执行),foreachPartition是 action 操作(会立即执行)
    代码实现:

object ForeachPartitionOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ForeachPartitionOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
//设置分区数量为2
val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)
//foreachPartition:一次处理一个分区的数据,作用和mapPartitions类似
//唯一的区是mapPartitions是transformation算子,foreachPartition是action算子
dataRDD.foreachPartition(it=>{
//在此处获取数据库链接
println("===============")
it.foreach(item=>{
//在这里使用数据库链接
println(item)
})
//关闭数据库链接
})
sc.stop()
}
}

(三)repartition的使用

对RDD进行重分区,repartition主要有两个应用场景:

  1. 可以调整RDD的并行度
    针对个别RDD,如果感觉分区数量不合适,想要调整,可以通过repartition进行调整,分区调整了之后,对应的并行度也就可以调整了
  2. 可以解决RDD中数据倾斜的问题
    如果RDD中不同分区之间的数据出现了数据倾斜,可以通过repartition实现数据重新分发,可以均匀分发到不同分区中

代码实现:Repatition的使用

object RepartitionOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("RepartitionOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
//设置分区数量为2
val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)
//重新设置RDD的分区数量为3,这个操作会产生shuffle
//也可以解决RDD中数据倾斜的问题
dataRDD.repartition(3)
.foreachPartition(it=>{
println("=========")
it.foreach(println(_))
})
//通过repartition可以控制输出数据产生的文件个数
dataRDD.saveAsTextFile("hdfs://bigdata01:9000/rep-001")
dataRDD.repartition(1).saveAsTextFile("hdfs://bigdata01:9000/rep-002")
sc.stop()
}
}

(四)reduceByKey 和 groupByKey的区别

功能:实现分组聚合
原理
首先这两个算子在执行的时候都会产生shuffle
但是:
1:当采用reduceByKey时,数据在进行shuffle之前会先进行局部聚合
2:当使用groupByKey时,数据在shuffle之间不会进行局部聚合,会原样进行shuffle

这样的话reduceByKey就减少了shuffle的数据传送,所以效率会高一些。
在这里插入图片描述
总结 :能用reduceByKey优先使用reduceByKey

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/384724.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解决Visual C++ Redistributable安装找不到vc_runtimeMinimum_x86.msi最简单办法

今天在安装Fritzing的时候,遇到了需要Visual C Redistributable支持包,所以就动手安装,发现居然不能安装,安装几次居然错误提示所需要的安装包*.MSI的居然名称还不用。我也是下载了各种版本来试图靠运气过关,结果失败告…

Linux下安装prometheus grafana

1 安装prometheus1.1 下载prometheus下载地址https://prometheus.io/download/#prometheus下载wget https://github.com/prometheus/prometheus/releases/download/v2.42.0/prometheus-2.42.0.linux-amd64.tar.gz1.2 安装# 新建目录,并进入目标目录 mkdir -p /middl…

Java语言如何求平方根

问题 在编程时,会遇到求平方根的问题,本次问题讲到如何使用Java来求解平方根。 方法 使用java.lang.Math类的sqrt(double)方法求平方根。Math是java.lang包中的类,所以就可以直接使用这个类。Double为对象中的基本类型。例如求正整数16的平方…

Vue 2

文章目录1. 简介2. 第一个Vue程序3. 指令3.1 判断循环3.2 操作属性3.3 绑定事件3.4 表单中数据双向绑定3.5 其他内置指令3.6 自定义指令4. 组件4.1 全局注册4.2 局部注册4.3 组件通讯4.4 单文件组件5. 组件插槽5.1 单个插槽5.2 具名插槽5.3 作用域插槽6. 内置组件6.1 component…

智能客服系统:为企业提升客户满意度

随着科技的不断进步,电话营销、呼叫中心机器人、语音自助服务等领域的智能客服系统也得到了飞速的发展。这些技术的出现,让企业能够更加高效地管理客户服务,提高客户满意度,从而在市场竞争中占据优势。 电话营销是企业推广产品、服…

g2o源码阅读

之前写的g2o源码阅读笔记,分享给有需要的人 整个文档请自行下载,这里只贴一个图片。

微信小程序第一节 —— 自定义顶部、底部导航栏以及获取胶囊体位置信息。

一、前言 大家好!我是 是江迪呀。我们在进行微信小程序开发时,常常需要自定义一些东西,比如自定义顶部导航、自定义底部导航等等。那么知道这些自定义内容的具体位置、以及如何适配不同的机型就变得尤为重要。下面让我以在iPhone机型&#x…

Word处理控件Aspose.Words功能演示:使用 C# 拆分 MS Word 文档

Aspose.Words 是一种高级Word文档处理API,用于执行各种文档管理和操作任务。API支持生成,修改,转换,呈现和打印文档,而无需在跨平台应用程序中直接使用Microsoft Word。此外, Aspose API支持流行文件格式处…

Python——列表排序和赋值

(1)列表排序: 列表排序方法 ls.sort() 对列表ls 中的数据在原地进行排序 ls [13, 5, 73, 4, 9] ls.sort()ls.sort(reverseFalse) 默认升序,reverseTrue,降序 ls [13, 5, 73, 4, 9] ls.sort(reverseTrue)key指定排序时…

小红书「高效达人筛选攻略」

三八女神节降临,诸多品牌纷纷开启铺垫预热,在各大平台借势宣传。而聚集庞大年轻女性消费群体的小红书,对“她营销”的重要性不言而喻。节点序幕拉开,面对海量达人信息,如何提前积草屯粮、高效备战? 本期千瓜…

【数据结构】链表:看我如何顺藤摸瓜

👑专栏内容:数据结构⛪个人主页:子夜的星的主页💕座右铭:日拱一卒,功不唐捐 文章目录一、前言二、链表1、定义2、单链表Ⅰ、新建一个节点Ⅱ、内存泄漏Ⅲ、插入一个节点Ⅳ、销毁所有节点Ⅴ、反转一个链表3、…

云his系统源码 SaaS应用 基于Angular+Nginx+Java+Spring开发

云his系统源码 SaaS应用 功能易扩 统一对外接口管理 一、系统概述: 本套云HIS系统采用主流成熟技术开发,软件结构简洁、代码规范易阅读,SaaS应用,全浏览器访问前后端分离,多服务协同,服务可拆分&#xff…

【Linux要笑着学】进程创建 | 进程终止 | slab分派器

爆笑教程《看表情包学Linux》👈 猛戳订阅!​​​​​​​​​​​​💭 写在前面:本章我们主要讲解进程的创建与终止。首先讲解进程创建,fork 函数是我们早在讲解 "进程的概念" 章节就提到过的一个函数&#…

总结篇 字符串设备(一)

简介 1、字符设备是Linux驱动中最基本的一类设备驱动,字符设备就是一个个字节,按照字节流进行读写操作的设备。(例:按键,电池等,IIC,SPI,LCD)。这些设备的驱动就叫字符设备驱动。 在…

八股文(二)

一、 实现深拷贝和浅拷贝 1.深拷贝 function checkType(any) {return Object.prototype.toString.call(any).slice(8, -1) }//判断拷贝的要进行深拷贝的是数组还是对象,是数组的话进行数组拷贝,对象的话进行对象拷贝 //如果获得的数据是可遍历的&#…

小白推荐!必定成功的python的安装流程?

目录 1.安装教程 2.使用cmd测试是否安装成功,快捷键WinR 3.如果测试失败,如何卸载? 4.如何在pycharm中指定下载的python解释器路径? 5.第一条python语句 1.安装教程 1.前往python的官网(弄个梯子可能会快一点&#xf…

spring boot + rabbitMq整合之死信队列(DL)

rabbit mq 死信队列 什么是死信队列? DL-Dead Letter 死信队列 死信,在官网中对应的单词为“Dead Letter”,可以看出翻译确实非常的简单粗暴。那么死信是个什么东西呢? “死信”是RabbitMQ中的一种消息机制,当你在消费消息时&…

Qt 解决程序全屏运行弹窗引发任务栏显示

文章目录摘要在VM虚拟机器中测试setWindowFlags()关键字: Qt、 Qt::WindowStayOnTopHint、 setWindowFlags、 Qt::Window、 Qt::Tool摘要 今天眼看项目就要交付了,结果在测试程序的时候,发现在程序全品情况下,点击输入框&#x…

【Android Studio】【学习笔记】【2023春】

文章目录零、常用一、界面布局疑问&报错零、常用 一、界面布局 Android——六大基本布局总结/CSDN小马 同学 【Android】线性布局(LinearLayout)最全解析/CSDNTeacher.Hu 一个不错的计算器界面👇 Android Studio App LinearLayout多层…

数据资产管理建设思考(二)

关于数据资产管理,近两年是数据治理行业中一个热点话题,当然有我们前面提到的国家的政策支持及方向指引的原因。另一方面我们做数据治理的同行们从学习吸收国外优秀的数据治理理论,进一步在实践中思考如何应用理论,并结合我们国家…