大数据 - Spark系列《八》- 闭包引用

news2025/1/12 6:03:00

 Spark系列文章:

大数据 - Spark系列《一》- 从Hadoop到Spark:大数据计算引擎的演进-CSDN博客

大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客

大数据 - Spark系列《三》- 加载各种数据源创建RDD-CSDN博客

大数据 - Spark系列《四》- Spark分布式运行原理-CSDN博客

大数据 - Spark系列《五》- Spark常用算子-CSDN博客

大数据 - Spark系列《六》- RDD详解-CSDN博客

大数据 - Spark系列《七》- 分区器详解-CSDN博客


目录

8.1.🐶闭包引用的原理

1. 闭包引用的概念

2. 闭包引用的副本

3. 🧀实例代码1

4. 🧀实例代码2

8.2 闭包引用的应用场景

🍠Source.fromFile和sc.textFile的辨析

1. 使用 Source.fromFile 读取数据:

2. 使用 sc.textFile 读取数据:

8.3 🐶闭包引用的注意事项

1.🥙序列化检查

2. 🥙“副本”数量

8.4 🐶闭包变量的问题

🥙BT传输协议:基本原理


8.1.🐶闭包引用的原理

1. 闭包引用的概念

  • 算子函数中引用了一个算子外部的变量 , 这个变量就是闭包变量 ;

  • 这些引用会随着任务的序列化而被发送到各个 Executor 上,并在 Executor 上被反序列化。

  • 闭包变量定义在Driver端 ,使用在任务实例端 , 变量需要序列化

2. 闭包引用的副本

  • 在 Executor 上执行的任务中的闭包对象是完全独立的。

  • 修改任务中的闭包对象不会影响到 Driver 端的原对象,因为它们是独立的副本。

3. 🧀实例代码1

对于在 RDD 算子函数中引用的外部对象,其修改仅影响到任务执行所在的 Executor 上的局部副本,而不会影响到 Driver 端的原对象。

package com.doit.day0217

import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}
import org.apache.log4j.{Level, Logger}

/**
 * @日期: 2024/2/19
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */


object Test06 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 1)
    var cnt=0  // cnt,在Driver端的jvm中
    rdd.foreach(x=>{
      cnt+=1  // 此处的cnt是在worker端反序列化出的Task中,与driver端已无联系
      println(x)
    })
    println(cnt) // 此处打印的是driver端的cnt,依然是0

    // 关闭SparkContext对象
    sc.stop()
  }
}

  

4. 🧀实例代码2

Spark 的转换操作是惰性求值的,只有在调用结果算子(如 foreachcountcollect 等)时才会触发实际的作业执行。

因此cnt的取值为调用结果算子之前的值

package com.doit.day0217
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}
import org.apache.log4j.{Level, Logger}
/**
 * @日期: 2024/2/19
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */


object Test07 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 1)
    var cnt=0  // cnt,在Driver端的jvm中

    //算子中函数在远端执行
    //函数中引用的遍历 闭包引用,将闭包变量复制一个副本发送到远端,你对闭包变量的所有操作操作的是副本
    val rdd1 = rdd.map(x => {
      cnt += 1 // 只有在调用结果算子时,才会runjob,所以这里取进来的cnt为10
      println("-----" + cnt)
      x*10
    })

    cnt=10
    println(cnt) // 此处打印的是driver端的cnt,依然是0

    rdd1.foreach(println)
    // 关闭SparkContext对象
    sc.stop()
  }
}

  

8.2 闭包引用的应用场景

可以使用闭包引用避免shuffle

在我们之前讲解join算子的案例中,需要将两个数据集进行连接,通常会触发 shuffle 操作,这会带来一定的性能开销。

  

 我们可以将user数据集转换成hashmap,通过闭包引用传入进去,此时可以避免shuffle

  

package com.doit.day0217

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.io.Source

/**
 * 示例代码:演示了如何使用闭包变量避免shuffle
 */
object Test08 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    // 加载用户数据,将其转换为Map
    val user = Source.fromFile("data/join/user.txt")
    val userMap: Map[String, String] = user.getLines().map(line => {
      val arr = line.split(",")
      (arr(0), arr(1))
    }).toMap // 将用户数据转换为Map,存储在内存中 一般数据集控制大小为1G以内

    // 加载订单数据
    val orders = sc.textFile("data/join/orders.txt")

    // 使用闭包变量userMap避免shuffle,将用户名直接关联到订单数据中
    val rdd2 = orders.map(iter => {
      val arr2 = iter.split(",")
      val name = userMap.getOrElse(arr2(4), "unknown") // 使用闭包变量userMap关联订单数据中的用户ID
      (arr2(0), arr2(1), arr2(2), arr2(3), arr2(4), name)
    })

    // 打印处理后的订单数据
    rdd2.foreach(println)

    // 关闭SparkContext对象
    sc.stop()
  }
}

   

如果需要让每个task都用有一份“只读”的“小量”数据,比如一个字典,则可以利用闭包引用;

如果共享的这分只读数据比较大,则应该使用“广播变量”效率更高!

🍠Source.fromFilesc.textFile的辨析

1. 使用 Source.fromFile 读取数据:
  • 何时使用:

    • 当数据量较小,可以完全装载到内存中并进行处理时,适合使用 Source.fromFile 方法读取数据。例如,对于文件大小在几十兆到几百兆之间的数据集,可以考虑使用该方法。

    • 特点:

      • 将文件内容一次性读取到内存中,适合对数据进行全量处理。

      • 读取的数据直接转换为内存中的集合类型(如Map),方便进行后续处理。

    • 注意事项:

      • 对于较大的数据集,可能会导致内存溢出,因此在处理大规模数据时需要谨慎使用。

2. 使用 sc.textFile 读取数据:
  • 何时使用:

    • 当数据量较大,无法一次性装载到内存中进行处理时,应该使用 Spark 的 sc.textFile 方法读取数据。例如,对于几百兆到几十亿的大型数据集,应该使用该方法。

  • 特点:

    • Spark 的 sc.textFile 方法将文件分布式地加载到集群中的各个节点上,并返回一个分布式数据集(RDD)。

    • 可以有效地处理大规模数据,具有良好的扩展性和性能。

  • 使用方式:

    • 使用 sc.textFile 方法加载文件后,可以使用各种 Spark 的转换操作对数据进行处理,如 map、filter、reduceByKey 等。

  • 注意事项:

    • 使用 sc.textFile 方法加载的数据集是分布式的,无法直接转换为集合类型,因此需要结合 Spark 的各种操作进行处理。

8.3 🐶闭包引用的注意事项

1.🥙序列化检查

闭包引用的对象,必须实现序列化接口,否则会导致task序列化失败,从而快速报错

   class Per extends Serializable {
      val id:Int = 0
    }
    val p = new Per()
    val rd = sc.makeRDD(1 to 10, 2)
    // 闭包引用的对象,必须实现序列化接口
    rd.map((_,p)).foreach(println)

2. 🥙“副本”数量

如果闭包引用的是普通对象,则每个task中都有一份“copy”

如果闭包引用的是一个object对象(单例对象),则其实在整个executor中只有一份,如下

Yarn :resourcemanager nodemanager

Nodemanager 提供计算资源 : container对资源的隔离

    object Per extends Serializable {
      val id:Int = 0
    }
    val p = Per
    val rd = sc.makeRDD(1 to 10, 2)
    rd.map((_,p)).foreach(println)

如果使用闭包引用object对象,有可能产生线程安全问题(因为有多个task线程共享这个对象);

看似闭包,实非闭包

下面的代码,其实并不是“闭包引用”

当然,里面用到了自定义类型,也还是要注意序列化问题

    class Phone(var brand: String, var price: Double)
    val resRdd = rdd.map(tp => {
      new Phone(tp._1, tp._2) // 这里并没有引用外部的对象,所以不存在f序列化检查失败的问题    })
      //.map(phone => (phone.brand, phone.price))
      //.reduceByKey(_ + _) // 这里有shuffle,但shuffle写出的是 2元组,它能序列化,所以不会报错
      .groupBy(p=>p.brand)  // 这里有shuffle,而且shuffle写出phone对象,它不能序列化,所以报错
      .mapValues(_.size)

8.4 🐶闭包变量的问题

  • 每个任务实例中都会保存一份完整的闭包变量

  • 如果一条节点同时运行当前任务的多个任务实例 , 存储多份闭包变量数据

package com.doit.day0217
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}
import org.apache.log4j.{Level, Logger}
/**
 * @日期: 2024/2/19
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */


object Test07 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
    var cnt=0  // cnt,在Driver端的jvm中

    //算子中函数在远端执行
    //函数中引用的遍历 闭包引用,将闭包变量复制一个副本发送到远端,你对闭包变量的所有操作操作的是副本
    val rdd1 = rdd.map(x => {
      cnt += 1 // 只有在调用结果算子时,才会runjob,所以这里取进来的cnt为10
      println("-----" + cnt)
      x*10
    })

    cnt=10
    println(cnt) // 此处打印的是driver端的cnt,依然是0

    rdd1.foreach(println)
    // 关闭SparkContext对象
    sc.stop()
  }
}

 

解决方案: 运行当前任务的节点只存一份数据 

🥙BT传输协议:基本原理

BitTorrent(简称BT)是一种用于大规模文件分享的通信协议。它被广泛用于分发大型文件和数据集,例如软件、电影、音乐等。BitTorrent协议的主要原理是将文件分成小块,并且允许用户同时上传和下载这些文件块,从而实现高效的分发。

以下是BitTorrent协议的主要特点和工作原理:

  1. 分布式架构:

    1. BitTorrent是一种分布式协议,没有单一的中心服务器,所有参与者都可以直接交换文件块。

    2. 参与者之间通过Tracker服务器或者DHT网络(分布式哈希表)进行通信,用于发现其他参与者并交换文件块的信息。

  2. 分块下载:

    1. 将文件分成固定大小的块(一般为256KB或512KB)。

    2. 下载者可以选择下载文件的哪些块,而不是整个文件,从而实现灵活的下载策略。

  3. 种子文件:

    1. 种子文件(Torrent文件)包含了文件的元数据信息,包括文件名、大小、哈希值等。

    2. 种子文件可以通过文件共享网站或者其他方式进行传播,从而让其他用户获取文件的元数据信息并加入下载。

  4. 优化的上传下载策略:

    1. BitTorrent协议实现了一种基于位掩码的上传下载策略,优先下载缺失的文件块,并且优先上传稀缺的文件块,从而提高下载速度和整体的网络效率。

  5. 健壮性和自我修复:

    1. BitTorrent协议具有较强的健壮性,即使某些参与者离线或者退出下载,其他参与者仍然可以通过其他方式获取丢失的文件块。

    2. 通过校验和哈希校验值,BitTorrent协议可以检测到下载的文件块是否损坏或者被篡改,并且自动请求重新下载。

  6. Tracker服务器和DHT网络:

    1. Tracker服务器用于管理下载者和上传者的信息,帮助下载者找到可用的上传者。

    2. DHT网络是一种分布式哈希表,允许下载者通过哈希值查询其他下载者的IP地址和端口信息,从而实现去中心化的Peer发现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1459618.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

spring-security 过滤器

spring-security过滤器 版本信息过滤器配置过滤器配置相关类图过滤器加载过程创建 HttpSecurity Bean 对象创建过滤器 过滤器作用ExceptionTranslationFilter 自定义过滤器 本章介绍 spring-security 过滤器配置类 HttpSecurity,过滤器加载过程,自定义过…

如何进行 Github 第三方登录详细讲解 (Java 版本)

如何进行 Github 第三方登录详细讲解 (Java 版本) 文章目录 如何进行 Github 第三方登录详细讲解 (Java 版本)创建一个 Github 应用定义一个跳转按钮,进行 Github 的授权通过授权拿到一个随机的 code通过 code 进行后端…

【MySQL】Navicat/SQLyog连接Ubuntu中的数据库(MySQL)

🏡浩泽学编程:个人主页 🔥 推荐专栏:《深入浅出SpringBoot》《java对AI的调用开发》 《RabbitMQ》《Spring》《SpringMVC》 🛸学无止境,不骄不躁,知行合一 文章目录 前言一、安装…

消息队列-RabbitMQ:发布确认—发布确认逻辑和发布确认的策略

九、发布确认 1、发布确认逻辑 生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给…

Input Output模型

一、I/O介绍 I/O在计算机中指Input/Output, IOPS (Input/Output Per Second)即每秒的输入输出量(或读写次数),是衡量磁盘性能的主要指标之一。IOPS是指单位时间内系统能处理的I/O请求数量,一般以每秒处理的I/O请求数量为单位,I/O…

ETL、ELT区别以及如何正确运用

一、 浅谈ETL、ELT ETL与ELT的概念 ETL (Extract, Transform, Load) 是一种数据集成过程,通常用于将数据从一个或多个源系统抽取出来,经过清洗、转换等处理后,加载到目标数据存储中。这种方法适用于需要对数据进行加工和整合后再加载到目标…

react实现转盘抽奖功能

看这个文章不错,借鉴 这个博主 的内容 样式是背景图片直接,没有设置。需要的话应该是 #bg { width: 650px; height: 600px; margin: 0 auto; background: url(turntable-bg.jpg) no-repeat; position: relative; } img[src^"pointer"] {positi…

redis的搭建 RabbitMq搭建

官网 Download | Redis wget https://github.com/redis/redis/archive/7.2.4.tar.gz 编译安装 yum install gcc g tar -zxvf redis-7.2.4.tar.gz -C /usr/localcd /usr/local/redis make && make install 常见报错 zmalloc.h:50:10: fatal error: jemalloc/jemal…

[office] excel图表怎么发挥IF函数的威力 #微信#媒体

excel图表怎么发挥IF函数的威力 IF函数应该是最常用的Excel函数之一了,在公式中经常能够看到她的“身影”。IF函数的基本使用如图1所示。 图1 IF函数之美 IF函数是一个逻辑函数,通过判断提供相应操作,让Excel更具智能。 然而,…

js设计模式:装饰者模式

作用: 可以给原有对象的身上添加新的属性方法 可以让对象或者组件进行扩展 示例: class Person{constructor(name,selfSkill){this.name namethis.selfSkill selfSkill}run 会走路}//所有人类都有的共同特性和技能let wjt new Person(王惊涛,写代码)let mashi new Pers…

2024.02.20作业

1. 使用多进程完成两个文件的拷贝&#xff0c;父进程拷贝前一半&#xff0c;子进程拷贝后一半&#xff0c;父进程回收子进程的资源 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> #include <time.h> #includ…

Sora的原理,中国小学生游戏在践行

大家龙年好呀&#xff0c;春节假期和家人出去浪了&#xff0c;旅行期间&#xff0c;几乎没刷社交媒体信息。等我17号回到家仔细看手机&#xff0c;Sora的消息铺面而来&#xff0c;什么“新革命”、“划时代”、“新纪元”说的挺神呼。 任何新事物出现&#xff0c;讨论热烈是好…

AS-V1000 视频监控平台产品介绍:客户端功能介绍(四)

目 录 一、引言 1.1 AS-V1000视频监控平台介绍 1.2平台服务器配置说明 二、软件概述 2.1 客户端软件用途 2.2 客户端功能 三、客户端功能说明 3.1告警管理 3.1.1告警联动 &#xff08;1&#xff09;告警联动显示 &#xff08;2&#xff09;告警联动处理 3…

unity学习(31)——跳转到角色选择界面(打勾?手滑挂错脚本)

There are 2 audio listeners in the scene. Please ensure there is always exactly one audio listener in the scene. 是因为后来创建了一个camera&#xff0c;因为camera中自带一个组件Audio Listener。所以有两个camera就有两个audio listener导致报错。 一个简单的解决…

C++(18)——适配器概念以及stack、queue、优先队列的模拟实现

上篇文章中&#xff0c;给出了对于模拟实现中功能的补全&#xff0c;本篇文章将优先介绍一个新的容器之后引入什么是适配器&#xff0c;以及适配器的使用方法&#xff0c;再通过适配器的思想来完成对于&#xff0c;、优先级队列_的实现。 目录 1. deque: 1.1 什么是deque&…

【嵌入式-Keil】keil代码提示快捷键

CTRL空格 如果没有提示&#xff0c;可能跟输入法的快捷键冲突&#xff0c; 右键->设置->按键->勾掉第一个就行了 再按CTRL空格就有提示了 参考&#xff1a;串口发送&串口发送接收

SAP PP学习笔记02 - PP中配置品目Master时的顺序

配置品目Master的时候&#xff0c;最佳实践是要遵循什么顺序呢&#xff1f; 一般而言是如下顺序 - 新规物料类型&#xff08;或利用现有类型也可以&#xff09; - 设定料号范围 - 设定物料状态&#xff08;比如准备好之前&#xff0c;要先锁住&#xff0c;等准备好了之后再…

CTFshow web(SQL注入176-179)

web176 没啥好说的&#xff0c;直接上万能密码&#xff1a; 1 or usernameflag 当然了还有别的方法&#xff1a; 1 union Select 1,2,group_concat(password) from ctfshow_user where username flag -- web177 没啥好说的&#xff0c;直接上万能密码&#xff1a; 1 or user…

《VitePress 简易速速上手小册》第1章:VitePress 入门(2024 最新版)

文章目录 1.1 VitePress 简介与架构1.1.1 基础知识点解析1.1.2 重点案例&#xff1a;企业文档站点1.1.3 拓展案例 1&#xff1a;个人博客1.1.4 拓展案例 2&#xff1a;产品展示网站 1.2 安装与初次运行1.2.1 基础知识点解析1.2.2 重点案例&#xff1a;公司内部知识分享平台1.2.…

关于VIT(Vision Transformer)的架构记录

在VIT模型设计中&#xff0c;尽可能地紧密遵循原始的Transformer模型&#xff08;Vaswani等人&#xff0c;2017年&#xff09;。这种刻意简化的设置的一个优势是&#xff0c;可扩展的NLP Transformer架构及其高效的实现几乎可以即插即用。 图&#xff1a;模型概述。我们将图像分…