Spark10-11

news2025/1/14 1:24:46

10. 广播变量

10.1 广播变量的使用场景

在很多计算场景,经常会遇到两个RDD进行JOIN,如果一个RDD对应的数据比较大,一个RDD对应的数据比较小,如果使用JOIN,那么会shuffle,导致效率变低。广播变量就是将相对较小的数据,先收集到Driver,然后再通过网络广播到属于该Application对应的每个Executor中,以后处理大量数据对应的RDD关联数据,就不用shuffle了,而是直接在内存中关联已经广播好的数据,即通实现mapside join,可以将Driver端的数据广播到属于该application的Executor,然后通过Driver广播变量返回的引用,获取实现广播到Executor的数据

广播变量的特点:广播出去的数据就无法在改变了,在没有Executor中是只读的操作,在每个Executor中,多个Task使用一份广播变量

 

10.2 广播变量的实现原理

广播变量是通过BT的方式广播的(TorrentBroadcast),多个Executor可以相互传递数据,可以提高效率

sc.broadcast这个方法是阻塞的(同步的)

广播变量一但广播出去就不能改变,为了以后可以定期的改变要关联的数据,可以定义一个object[单例对象],在函数内使用,并且加一个定时器,然后定期更新数据

广播到Executor的数据,可以在Driver获取到引用,然后这个引用会伴随着每一个Task发送到Executor,然后通过这个引用,获取到事先广播好的数据

10.3 案例:根据IP计算归属地

10.3.1 需求

根据IP规则数据,计算出给定日志中ip地址对应的省份信息,由于IP地址的规则数据相对较小,所以可以将IP规则数据先广播出去,以后关联IP规则数据,就可以在内存中进行关联了,这样可以避免shuffle,提高执行效率!

10.3.2 代码实现

11. 序列化问题

11.1 序列化问题的场景

spark任务在执行过程中,由于编写的程序不当,任务在执行时,会出序列化问题,通常有以下两种情况,

  • 封装数据的Bean没有实现序列化接口(Task已经生成了),在ShuffleWirte之前要将数据溢写磁盘,会抛出异常
  • 函数闭包问题,即函数的内部,使用到了外部没有实现序列化的引用(Task没有生成)

11.2 数据Bean未实现序列化接口

spark在运算过程中,由于很多场景必须要shuffle,即向数据溢写磁盘并且在网络间进行传输,但是由于封装数据的Bean没有实现序列化接口,就会导致出现序列化的错误!

Scala

object C02_CustomSort {

  def main(args: Array[String]): Unit = {

    val sc = SparkUtil.getContext(this.getClass.getSimpleName, true)
    //使用并行化的方式创建RDD
    val lines = sc.parallelize(
      List(
        "laoduan,38,99.99",
        "nianhang,33,99.99",
        "laozhao,18,9999.99"
      )
    )
    val tfBoy: RDD[Boy] = lines.map(line => {
      val fields = line.split(",")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toDouble
      new Boy(name, age, fv) //将数据封装到一个普通的class中
    })

    implicit val ord = new Ordering[Boy] {
      override def compare(x: Boy, y: Boy): Int = {
        if (x.fv == y.fv) {
          x.age - y.age
        } else {
          java.lang.Double.compare(y.fv, x.fv)
        }
      }
    }
    //sortBy会产生shuffle,如果Boy没有实现序列化接口,Shuffle时会报错
    val sorted: RDD[Boy] = tfBoy.sortBy(bean => bean)

    val res = sorted.collect()

    println(res.toBuffer)
  }
}

//如果以后定义bean,建议使用case class
class Boy(val name: String, var age: Int, var fv: Double)  //extends Serializable
{
  
  override def toString = s"Boy($name, $age, $fv)"
}

11.3 函数闭包问题

11.3.1 闭包的现象

在调用RDD的Transformation和Action时,可能会传入自定义的函数,如果函数内部使用到了外部未被序列化的引用,就会报Task无法序列化的错误。原因是spark的Task是在Driver端生成的,并且需要通过网络传输到Executor中,Task本身实现了序列化接口,函数也实现了序列化接口,但是函数内部使用到的外部引用不支持序列化,就会函数导致无法序列化,从而导致Task没法序列化,就无法发送到Executor中了

 

在调用RDD的Transformation或Action是传入函数,第一步就进行检测,即调用sc的clean方法

为了避免错误,在Driver初始化的object或class必须实现序列化接口,不然会报错误

Scala
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f) //检测函数是否可以序列化,如果可以直接将函数返回,如果不可以,抛出异常
  new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}

Scala
private def ensureSerializable(func: AnyRef): Unit = {
  try {
    if (SparkEnv.get != null) {
      //获取spark执行换的的序列化器,如果函数无法序列化,直接抛出异常,程序退出,根本就没有生成Task
      SparkEnv.get.closureSerializer.newInstance().serialize(func)
    }
  } catch {
    case ex: Exception => throw new SparkException("Task not serializable", ex)
  }
}

11.3.2 在Driver端初始化实现序列化的object

在一个Executor中,多个Task使用同一个object对象,因为在scala中,object就是单例对象,一个Executor中只有一个实例,Task会反序列化多次,但是引用的单例对象只反序列化一次

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//函数外部定义的一个引用类型(变量)
//RuleObjectSer是一个静态对象,实在第一次使用的时候被初始化了(实在Driver被初始化的)
val rulesObj = RuleObjectSer

//函数实在Driver定义的
val func = (line: String) => {
  val fields = line.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  val name = rulesObj.rulesMap.getOrElse(code, "未知") //闭包
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesObj.toString)
}

//处理数据,关联维度
val res = lines.map(func)
res.saveAsTextFile(args(2))

 

11.3.3 在Driver端初始化实现序列化的class

在一个Executor中,每个Task都会使用自己独享的class实例,因为在scala中,class就是多例,Task会反序列化多次,每个Task引用的class实例也会被序列化

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//函数外部定义的一个引用类型(变量)
//RuleClassNotSer是一个类,需要new才能实现(实在Driver被初始化的)
val rulesClass = new RuleClassSer

//处理数据,关联维度
val res = lines.map(e => {
  val fields = e.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  val name = rulesClass.rulesMap.getOrElse(code, "未知") //闭包
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesClass.toString)
})

res.saveAsTextFile(args(2))

 

11.3.4 在函数内部初始化未序列化的object

object没有实现序列化接口,不会出现问题,因为该object实现函数内部被初始化的,而不是在Driver初始化的

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//不再Driver端初始化RuleObjectSer或RuleClassSer
//函数实在Driver定义的
val func = (line: String) => {
  val fields = line.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  //在函数内部初始化没有实现序列化接口的RuleObjectNotSer
  val name = RuleObjectNotSer.rulesMap.getOrElse(code, "未知")
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, RuleObjectNotSer.toString)
}
//处理数据,关联维度
val res = lines.map(func)
res.saveAsTextFile(args(2))
sc.stop()

 

11.3.5 在函数内部初始化未序列化的class

这种方式非常不好,因为每来一条数据,new一个class的实例,会导致消耗更多资源,jvm会频繁GC

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))

//处理数据,关联维度
val res = lines.map(e => {
  val fields = e.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  //RuleClassNotSer是在Executor中被初始化的
  val rulesClass = new RuleClassNotSer
  //但是如果每来一条数据new一个RuleClassNotSer,不好,效率低,浪费资源,频繁GC
  val name = rulesClass.rulesMap.getOrElse(code, "未知")
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesClass.toString)
})

res.saveAsTextFile(args(2))

11.3.6 调用mapPartitions在函数内部初始化未序列化的class

一个分区使用一个class的实例,即每个Task都是自己的class实例

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//处理数据,关联维度
val res = lines.mapPartitions(it => {
  //RuleClassNotSer是在Executor中被初始化的
  //一个分区的多条数据,使用同一个RuleClassNotSer实例
  val rulesClass = new RuleClassNotSer
  it.map(e => {
    val fields = e.split(",")
    val id = fields(0).toInt
    val code = fields(1)
    val name = rulesClass.rulesMap.getOrElse(code, "未知")
    //获取当前线程ID
    val treadId = Thread.currentThread().getId
    //获取当前Task对应的分区编号
    val partitiondId = TaskContext.getPartitionId()
    //获取当前Task运行时的所在机器的主机名
    val host = InetAddress.getLocalHost.getHostName
    (id, code, name, treadId, partitiondId, host, rulesClass.toString)
  })
})
res.saveAsTextFile(args(2))
sc.stop()

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/684646.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C/C++】使用类和对象 练习EasyX图形库

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; &#x1f525;c系列专栏&#xff1a;C/C零基础到精通 &#x1f525; 给大…

【关联式容器】之map和set

【关联式容器】之map和set 容器类型树形结构的关联式容器mapset&#xff0c;multiset&#xff0c;multimap的区别与联系 容器类型 在STL中&#xff0c;我们接触过许多容器&#xff0c;例如&#xff1a;vector&#xff0c;list&#xff0c;stack&#xff0c;queue&#xff0c;m…

conll2003数据集下载与预处理

CoNLL-2003 数据集包括 1,393 篇英文新闻文章和 909 篇德文新闻文章。我们将查看英文数据。 1. 下载CoNLL-2003数据集 https://data.deepai.org/conll2003.zip 下载后解压你会发现有如下文件。 打开train.txt文件&#xff0c; 你会发现如下内容。 CoNLL-2003 数据文件包含由单…

逍遥自在学C语言 | 指针陷阱-空指针与野指针

前言 在C语言中&#xff0c;指针是一种非常强大和灵活的工具&#xff0c;但同时也容易引发一些问题&#xff0c;其中包括空指针和野指针。 本文将带你了解这两个概念的含义、产生原因以及如何避免它们所导致的问题。 一、人物简介 第一位闪亮登场&#xff0c;有请今后会一直…

【玩转Docker小鲸鱼叭】理解DockerFile如此简单

DockerFile构建过程 DockerFile 是Docker的一个配置文件&#xff0c;本质上来说它只是一个文本文件&#xff0c;它是用来构建Docker镜像的。DockerFile配置文件中包含了一系列的指令和配置信息&#xff0c;用于描述如何构建镜像以及如何运行容器。通过编写 Dockerfile&#xf…

RISC-V处理器的设计与实现(二)—— CPU框架设计

前面我们选好了要实现的指令集&#xff0c;并且了解了每个指令的功能&#xff08;传送门&#xff1a;RISC-V处理器的设计与实现&#xff08;一&#xff09;—— 基本指令集_Patarw_Li的博客-CSDN博客&#xff09;&#xff0c;接下来我们就可以开始设计cpu了。当然我们不可能一上…

ChatGPT更新的使用指南,与其他类似的人工智能的软件和服务-更新版(2023-6-25)

文章目录 一、什么是ChatGPT二、如何使用三、如何使用ChatGPT帮助我们的工作和生活四、高阶用法1、角色扮演2、英语口语老师3、在搜索引擎中集成ChatGPT 五、常见问题五、其他类似的软件和服务 如果你还不知道如何注册和使用&#xff0c;可看末尾&#xff0c;手把手教你。 一、…

Linux线程同步

同步的几种方式&#xff1a;信号量&#xff0c;互斥锁&#xff0c;条件变量&#xff0c;读写锁 同步&#xff1a;对程序的执行过程进行控制&#xff0c;保证对临界资源的访问同一时刻只能有一个进程或线程访问。 2.1信号量 存在P操作&#xff1a;获取资源&#xff0c;信号量…

58.最后一个单词的长度

LeetCode-58.最后一个单词的长度 1、题目描述2、解题思路3、代码实现4、解题记录 1、题目描述 题目描述&#xff1a; 给你一个字符串 s&#xff0c;由若干单词组成&#xff0c;单词前后用一些空格字符隔开。返回字符串中 最后一个 单词的长度。 单词 是指仅由字母组成、不包含任…

通讯录获取APP程序分析

前言 APP非法获取用户通讯录等隐私信息 我用技术分析APP是如何获取信息的 如果你不幸中招了&#xff0c;可以参考下方链接(有偿) 我的方法是替换掉通讯录数据&#xff0c;替换不成功包退&#xff01; 每日16:00-06:00在线&#xff0c;5分钟受理&#xff0c;2~3小时完成 点下面…

下载安装mysql与设置密码详细步骤(压缩包版本)

目录 一、前言 二、操作步骤 &#xff08;一&#xff09;下载与解压缩 &#xff08;二&#xff09;配置环境变量 &#xff08;三&#xff09;安装MySQL服务 &#xff08;四&#xff09;设置ini文件和data文件 &#xff08;五&#xff09;启动MySQL服务和设置密码 三、…

【C++ 程序设计】第 5 章:类的继承与派生

目录 一、类的继承与类的派生 &#xff08;1&#xff09;继承的概念 &#xff08;2&#xff09;派生类的定义与大小 ① 派生类的定义 ② 派生类的大小 &#xff08;3&#xff09;继承关系的特殊性 &#xff08;4&#xff09;有继承关系的类之间的访问 &#xff08;5&am…

多线程单例模式

1、单例模式 顾名思义&#xff0c;单例模式能保证某个类在程序中只存在唯一一份示例&#xff0c;而不会创建出多个实例。就像java的JDBC编程只需要创建一个单例类DataSourece从这个DataSorce中获取数据库连接。没必要创建多个对象。 单例模式具体实现方式分为“饿汉”和“懒汉…

java编译与反编译

参考&#xff1a; Idea 使用技巧记录_source code recreated from a .class file by intell_hresh的博客-CSDN博客 深入理解Java Class文件格式&#xff08;一&#xff09;_昨夜星辰_zhangjg的博客-CSDN博客 实践详解javap命令&#xff08;反编译字节码&#xff09;_天然玩家…

【运筹优化】元启发式算法详解:迭代局部搜索算法(Iterated Local Search,ILS)+ 案例讲解代码实现

文章目录 一、介绍二、迭代局部搜索2.1 总体框架2.2 随机重启2.3 在 S* 中搜索2.4 ILS 三、获得高性能3.1 初始解决方案3.2 Perturbation3.2.1 扰动强度3.2.2 自适应扰动3.2.3 更复杂的扰动方案3.2.4 Speed 3.3 接受准则3.4 Local Search3.5 ILS 的全局优化 四、ILS 的精选应用…

Windows PE怎么修复系统?使用轻松备份解决!

​什么是Windows PE? Windows预先安装环境&#xff08;英语&#xff1a;Microsoft Windows Preinstallation Environment&#xff09;&#xff0c;简称Windows PE或WinPE&#xff0c;是Microsoft Windows的轻量版本&#xff0c;主要提供个人电脑开发商&#xff08;主要为OEM厂…

electron+vue3全家桶+vite项目搭建【20】窗口事件广播,通用事件封装

引入 electron中的渲染进程与主进程之间的数据交互需要利用ipc通信&#xff0c;互相订阅/通知来实现&#xff0c;我们不妨封装一个通用事件广播&#xff0c;利用自定义的事件名称来让主进程遍历窗口挨个推送对应内容&#xff0c;来实现事件的广播。 demo项目地址 实现思路 …

【计算机视觉】MaskFormer:将语义分割和实例分割作为同一任务进行训练

文章目录 一、导读二、逐像素分类和掩码分类的区别2.1 逐像素分类2.2 掩码分类2.3 区别 三、DETR四、MaskFormer五、MaskFormer用于语义和实例分割六、总结 一、导读 目标检测和实例分割是计算机视觉的基本任务&#xff0c;在从自动驾驶到医学成像的无数应用中发挥着关键作用。…

模拟电路系列分享-运放的关键参数5

文章目录 概要整体架构流程技术名词解释技术细节小结 概要 提示&#xff1a;这里可以添加技术概要 例如&#xff1a; 实际运放与理想运放具有很多差别。理想运放就像一个十全十美的人&#xff0c;他学习100 分&#xff0c;寿命无限长&#xff0c;长得没挑剔&#xff0c;而实…

【c++11】移动构造的性质 和 与拷贝构造的比较(详解)

文章目录 定义性质移动构造的定义实例代码分析移动构造 与 拷贝构造的比较移动赋值 和 拷贝赋值 应用场景 定义 移动构造&#xff08;Move Constructor&#xff09;是一种特殊的构造函数&#xff0c;它通过接收一个右值引用参数来创建新对象&#xff0c;并从传入的对象中“移动…