Spark-Core

news2025/1/11 19:59:10

Spark简介
Spark-Core核心算子
Spark-Core


文章目录

    • 一、RDD 编程
      • 1、RDD序列化
        • 1.2 Kryo序列化框架
      • 2、RDD依赖关系
        • 2.1 查看血缘关系
        • 2.2 查看依赖关系
        • 2.3 窄依赖
        • 2.4 宽依赖
        • 2.5 Stage任务划分
      • 3、RDD 持久化
        • 3.1 Cache缓存
        • 3.2 CheckPoint检查点
        • 3.3 缓存和检查点区别
        • 3.4 检查点存储到HDFS集群
      • 4、键值对RDD数据分区
    • 二、累加器
    • 三、广播变量


一、RDD 编程

1、RDD序列化

初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。

在这里插入图片描述

class User extends Serializable {
  var name: String = _
}

class Test04 {
  Logger.getLogger("org").setLevel(Level.ERROR)

  @Test
  def test(): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val rdd01: RDD[(Int, String)] = sc.makeRDD(Array((111, "aaa"), (222, "bbbb"), (333, "ccccc")), 3)


    val user01: User = new User()
    user01.name = "list"
    val user02: User = new User()
    user02.name = "lisi"
    val userRdd01: RDD[User] = sc.makeRDD(List(user01, user02))

    //  没有序列化(java.io.NotSerializableException: day04.User)
    userRdd01.foreach(user => println(user.name))
    sc.stop()

  }
}
1.2 Kryo序列化框架

参考地址: https://github.com/EsotericSoftware/kryo

Java的序列化能够序列化任何的类。但是比较重,序列化后对象的体积也比较大。

Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Test

class Test04 {
  Logger.getLogger("org").setLevel(Level.ERROR)

  @Test
  def test(): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
      // 替换默认的序列化机制
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      // 注册需要使用kryo序列化的自定义类
      .registerKryoClasses(Array(classOf[Search]))
    val sc: SparkContext = new SparkContext(conf)

    val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello", "world"))
    val search: Search = new Search("hello")
    val result: RDD[String] = rdd.filter(search.isMatch)
    println(result.collect().toList)
  }


}

//  关键字封装在一个类里面
//  需要自己先让类实现序列化  之后才能替换使用kryo序列化
class Search(val query: String) extends Serializable {
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }
}

2、RDD依赖关系

2.1 查看血缘关系

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

在这里插入图片描述

  • 圆括号中的数字表示RDD的并行度,也就是有几个分区
rdd03.toDebugString
@Test
def test(): Unit = {
  val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
  val sc: SparkContext = new SparkContext(conf)
  val rdd01: RDD[String] = sc.textFile("input/1.txt")
  println(rdd01.toDebugString)
  println("rdd01===")
  val rdd02: RDD[String] = rdd01.flatMap(_.split(" "))
  println(rdd02.toDebugString)
  println("rdd02====")
  val rdd03: RDD[(String, Int)] = rdd02.map((_, 1))
  println(rdd03.toDebugString)
  println("rdd03====")
  val rdd04: RDD[(String, Int)] = rdd03.reduceByKey(_ + _)
  println(rdd04.toDebugString)
  sc.stop()
}

在这里插入图片描述

2.2 查看依赖关系

在这里插入图片描述

在这里插入图片描述

RDD之间的关系可以从两个维度来理解:一个是RDD是从哪些RDD转换而来,也就是 RDD的parent RDD(s)是什么(血缘); 另一个就是RDD依赖于parent RDD(s)的哪些Partition(s),这种关系就是RDD之间的依赖(依赖)。

RDD和它依赖的父RDD(s)的依赖关系有两种不同的类型,即窄依赖(NarrowDependency)和宽依赖(ShuffleDependency)。

2.3 窄依赖

一对一、多对一

  • 窄依赖表示每一个父RDD的Partition最多被子RDD的一个Partition使用(一对一、多对一)。
  • 窄依赖我们形象的比喻为独生子女。

在这里插入图片描述

2.4 宽依赖

一对多,会引起Shuffle

  • 宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖(只能是一对多),会引起Shuffle。

  • 总结:宽依赖我们形象的比喻为超生。

  • 具有宽依赖的transformations包括:sortreduceByKeygroupByKeyjoin和调用rePartition函数的任何操作。

  • 宽依赖对Spark去评估一个transformations有更加重要的影响,比如对性能的影响。

  • 在不影响业务要求的情况下,要尽量避免使用有宽依赖的转换算子,因为有宽依赖,就一定会走shuffle,影响性能。

在这里插入图片描述

2.5 Stage任务划分

DAG有向无环图

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。

DAG记录了RDD的转换过程和任务的阶段。

RDD任务切分

RDD任务切分中间分为:Application、Job、Stage和Task

  • Application:初始化一个SparkContext即生成一个Application;

  • Job:一个Action算子就会生成一个Job;

  • Stage:Stage等于宽依赖的个数加1;

  • Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

注意:Application->Job->Stage->Task每一层都是1对n的关系。

@Test
def Test(): Unit = {
  val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
  //  1、Application:初始化一个SparkContext即生成一个Application
  val sc: SparkContext = new SparkContext(conf)
  val lineRdd: RDD[String] = sc.textFile("input/1.txt")
  val rdd01: RDD[String] = lineRdd.flatMap(_.split(" "))
  val rdd02: RDD[(String, Int)] = rdd01.map((_, 1))
  //  3、Stage:reduceByKey算子会有宽依赖,stage阶段+1。一共2个stage
  val resultRdd: RDD[(String, Int)] = rdd02.reduceByKey(_ + _)
  //  2、Job:一个Action算子就会生成一个Job。一共2个Job
  resultRdd.collect().foreach(println)
  resultRdd.saveAsTextFile("output")
  Thread.sleep(Long.MaxValue)
  sc.stop()
}

Application个数:

//  1、Application:初始化一个SparkContext即生成一个Application
val sc: SparkContext = new SparkContext(conf)

Job个数

在这里插入图片描述

Stage个数

在这里插入图片描述

Task数量

  • 如果存在shuffle过程,系统会自动进行缓存,UI界面显示skipped的部分。
  • 从Stage中看有2个Task。

在这里插入图片描述

在这里插入图片描述

3、RDD 持久化

3.1 Cache缓存

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

//  cache底层调用的就是persist方法,缓存级别默认用的是MEMORY_ONLY
wortToOneRdd.cache()
//  可以更改缓存级别
wortToOneRdd.persist(StorageLevel.MEMORY_AND_DISK_2)

案例:

val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wortToOneRdd: RDD[(String, Int)] = wordRdd.map(word => (word, 1))
//  打印血缘关系(缓存前)
println(wortToOneRdd.toDebugString)
//  数据缓存
//  cache底层调用的就是persist方法,缓存级别默认用的是MEMORY_ONLY
wortToOneRdd.cache()
//  可以更改缓存级别
//    wortToOneRdd.persist(StorageLevel.MEMORY_AND_DISK_2)
wortToOneRdd.collect().foreach(println)
//  打印血缘关系(缓存后)
println(wortToOneRdd.toDebugString)

在这里插入图片描述

缓存枚举参数

默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。

SER:表示序列化。

在这里插入图片描述

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

自带缓存采用reduceByKey

// 采用reduceByKey,自带缓存
val wordByKeyRDD: RDD[(String, Int)] = wordToOneRdd.reduceByKey(_+_)
3.2 CheckPoint检查点

检查点:是通过将RDD中间结果写入磁盘

原因: 由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

检查点存储路径: Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统。

存储格式为: 二进制的文件。

检查点切断血缘: 在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。

检查点触发时间: 对RDD进行Checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍。

在这里插入图片描述

//	设置检查点数据存储路径:
sc.setCheckpointDir("./checkpoint1")
//	调用检查点方法:
wordToOneRdd.checkpoint()

代码:

val rdd: RDD[String] = sc.textFile("input/1.txt")
//  业务逻辑
val rdd01: RDD[String] = rdd.flatMap(line => line.split(" "))
val rdd02: RDD[(String, Long)] = rdd01.map(word => (word, System.currentTimeMillis()))
//  增加缓存,避免再重新跑一个job做checkpoint
rdd02.cache()
//  数据检查点:针对wordToOneRdd做检查点计算
rdd02.checkpoint()
//  会立即启动一个新的job来专门的做checkpoint运算(一共会有2个job)
rdd02.collect().foreach(println)
//  再次触发2次执行逻辑,用来对比
rdd02.collect().foreach(println)
rdd02.collect().foreach(println)

执行结果:

通过页面http://localhost:4040/jobs查看DAG图。可以看到检查点切断了血缘依赖关系。

只增加checkpoint,没有增加Cache缓存打印第1个job执行完,触发了checkpoint,第2个job运行checkpoint,并把数据存储在检查点上。第3、4个job,数据从检查点上直接读取。
增加checkpoint,也增加Cache缓存打印第1个job执行完,数据就保存到Cache里面了,第2个job运行checkpoint,直接读取Cache里面的数据,并把数据存储在检查点上。第3、4个job,数据从检查点上直接读取。

在这里插入图片描述

在这里插入图片描述

3.3 缓存和检查点区别
  • Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
  • Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
  • 建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。
  • 如果使用完了缓存,可以通过unpersist()方法释放缓存。
3.4 检查点存储到HDFS集群

如果检查点数据存储到HDFS集群,要注意配置访问集群的用户名。否则会报访问权限异常。

// 设置访问HDFS集群的用户名
System.setProperty("HADOOP_USER_NAME", "atguigu")

// 需要设置路径.需要提前在HDFS集群上创建/checkpoint路径
sc.setCheckpointDir("hdfs://hadoop102:8020/checkpoint")

//  数据检查点:针对wordToOneRdd做检查点计算
rdd02.checkpoint()

4、键值对RDD数据分区

Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。

  • 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
  • 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。
val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
//	数据源处理
val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1, 1), (2, 2), (3, 3)))
//	打印分区器
println(rdd.partitioner)
//	使用HashPartitioner对RDD进行重新分区
val rdd02: RDD[(Int, Int)] = rdd.partitionBy(new HashPartitioner(2))
//	打印分区器
println(rdd02.partitioner)
sc.stop()

Hash分区

HashPartitioner分区的原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。

HashPartitioner分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据。

在这里插入图片描述

Ranger分区

  RangePartitioner作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。

实现过程为:

第一步:先从整个RDD中采用水塘抽样算法,抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;

第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的

  • 1)我们假设有100万条数据要分4个区
  • 2)从100万条中抽100个数(1,2,3, …… 100)
  • 3)对100个数进行排序,然后均匀的分为4段
  • 4)获取100万条数据,每个值与4个分区的范围比较,放入合适分区

二、累加器

分布式共享只写变量(Executor和Executor之间不能读数据)

  • 累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。
  • 注意:Executor端的任务不能读取累加器的值(例如:在Executor端调用sum.value,获取的值不是累加器最终的值)。因此我们说,累加器是一个分布式共享只写变量。

在这里插入图片描述

//	累加器定义(SparkContext.accumulator(initialValue)方法)
val sum: LongAccumulator = sc.longAccumulator("sum")
//	累加器添加数据(累加器.add方法)
sum.add(count)
//	累加器获取数据(累加器.value)
sum.value
val dataRdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
//  设置新的累加器
val accSum: LongAccumulator = sc.longAccumulator("sum")
dataRdd.foreach(line => {
  //  使用累加器累加
  accSum.add(line._2)
})
//  获取累加器,累加后的值
println(accSum.value)

累加器要放在行动算子中

  • 因为转换算子执行的次数取决于job的数量,如果一个spark应用有多个行动算子,那么转换算子中的累加器可能会发生不止一次更新,导致结果错误。
  • 所以,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动算子中。
  • 对于在行动算子中使用的累加器,Spark只会把每个Job对各累加器的修改应用一次。
val value: RDD[Unit] = dataRdd.map {
  case (a, count) => {
    accSum.add(count)
  }
}
//假如放在map中,调用两次行动算子,map执行两次,导致最终累加器的值翻倍
mapRDD.collect()
mapRDD.collect()

三、广播变量

分布式共享只读变量

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark Task操作使用。

步骤:

  1. 调用SparkContext.broadcast(广播变量)创建出一个广播对象,任何可序列化的类型都可以这么实现。
  2. 通过广播变量.value,访问该对象的值。
  3. 广播变量只会被发到各个节点一次,作为只读值处理(修改这个值不会影响到别的节点)。
//	声明广播变量
val bdStr: Broadcast[Int] = sc.broadcast(num)
//	使用广播变量
bdstr.value
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
//  需要广播的值
val num: Int = 1
//  声明广播变量
val bdStr: Broadcast[Int] = sc.broadcast(num)
//  使用广播变量
val rdd02: RDD[Int] = rdd.filter(lin => {
  lin.equals(bdStr.value)
})
rdd02.foreach(println)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1128202.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

天锐绿盾加密软件——企业数据透明加密、防泄露系统

天锐绿盾是一种企业级数据透明加密、防泄密系统,旨在保护企业的核心数据,防止数据泄露和恶意攻击。它采用内核级透明加密技术,可以在不影响员工正常工作的前提下,对需要保护的数据进行加密操作。 PC访问地址: https:/…

基于springboot基于会员制医疗预约服务管理系统项目【项目源码+论文说明】计算机毕业设计

基于springboot基于会员制医疗预约服务管理系统演示 摘要 会员制医疗预约服务管理信息系统是针对会员制医疗预约服务管理方面必不可少的一个部分。在会员制医疗预约服务管理的整个过程中,会员制医疗预约服务管理系统担负着最重要的角色。为满足如今日益复杂的管理需…

公司新品上市,如何做好新品发布会宣传

公司新品上市不仅展现了公司的生命力与活力,还代表了公司与时俱进的创新力,积极听取用户反馈的服务精神,而公司新品上市时都会举办新品发布会,今天媒介盒子就来和大家分享,公司如何做好新品发布会的宣传。 一、 撰写活…

2023年中国潜水电机行业现状及前景分析[图]

潜水电机是一种特殊设计的电动机,通常用于水下应用。它们被设计成能够在液体环境中工作,通常是在水中或其他液体中,而且能够在潜水的情况下继续正常运行。潜水电机通常具有防水性能和耐腐蚀性,以适应恶劣的水下环境。 潜水电机行…

Java实现连接SQL Server解决方案及代码

下面展示了连接SQL Server数据库的整个流程: 加载数据库驱动建立数据库连接执行SQL语句处理结果关闭连接 在连接之前,前提是确保数据库成功的下载,创建,配置好账号密码。 运行成功的代码: import java.sql.*;publi…

点集合的三角剖分

点集合的三角剖分是指如何将一些离散的点集合组合成不均匀的三角形网格,使得每个点成为三角网中三角面的顶点。这个算法的用处很多,一个典型的意义在于可以通过一堆离散点构建的TIN实现对整个构网区域的线性控制,比如用带高程的离散点构建的T…

Windows网络监视工具

对于任何规模的企业来说,网络管理在信息技术中都起着至关重要的作用。管理、监控和密切关注网络基础设施对任何组织都至关重要。在Windows网络中,桌面,服务器,虚拟服务器和虚拟机(如Hyper-V)在Windows操作系…

医院电力系统智能能效监控平台的应用

0引言 随着社会和科学技术的发展,配电系统的智能化已经成为一种发展趋势。医院建设电力智能监控平台,可对供电系统进行集中管理和调度、实时控制和数据采集,监控供电系统设备的运行情况,及时掌握和处理供电系统的各种事故、报警事…

Day07 Stream流递归Map集合Collections可变参数

Stream 也叫Stream流,是Jdk8开始新增的一套API (java.util.stream.*),可以用于操作集合或者数组的数据。 Stream流大量的结合了Lambda的语法风格来编程,提供了一种更加强大,更加简单的方式操作 public class Demo1 {public stati…

【机器学习合集】标准化与池化合集 ->(个人学习记录笔记)

文章目录 标准化与池化1. 标准化/归一化1.1 归一化归一化的作用 1.2 标准化批标准化方法 Batch Normailzation标准化方法的对比自动学习标准化方法 2. 池化2.1 池化的作用2.2 常见的池化方法2.3 池化方法的差异2.4 池化的必要性 标准化与池化 1. 标准化/归一化 1.1 归一化 归…

django建站过程(3)定义模型与管理页

定义模型与管理页 定义模型[models.py]迁移模型向管理注册模型[admin.py]注册模型使用Admin.site.register(模型名)修改Django后台管理的名称定义管理列表页面应用名称修改管理列表添加查询功能 django shell交互式shell会话 认证和授权 定义模型[models.py] 模仿博客形式&…

kali查看wifi破解密码,实测有效

首先需要安装kali系统 这个系统是安装在虚拟机上的 还需要一个无线网卡(最好是kali系统免驱的 否则是无法识别的) 有着两个工具就可以pojie密码了 kali官网:Kali Linux | Penetration Testing and Ethical Hacking Linux Distribution 下载这里大家去比站上或者博客都可以…

某雀服务器崩溃,引发数据安全性讨论,应该选择私有化部署吗?

随着云计算技术的飞速发展,越来越多的企业和个人选择将数据存储于云端。然而,云服务的稳定性和数据安全性问题也成为了用户关注的焦点。昨天下午,语雀服务器崩溃事件引起了广泛关注。这一事件再次凸显了私有化的重要性。又一批人群开始考虑将…

12 结构型模式-桥接模式

1 桥接模式介绍 2 桥接模式原理

跟人一样,手机太烫也会“生病”!如何给太烫的手机降温

高温是你手机最大的敌人。现代智能手机在纤薄的外壳中装有强大的处理器和大容量电池,即使在正常工作条件下(看看你,Galaxy Note 7,也许还有iPhone 15),过热也会成为一个真正的问题。无论是充电、闲置还是执…

【每天学习一点新知识】安全设备IDS、IRS、IPS

IDS:入侵检测系统 对那些异常的、可能是入侵行为的数据进行检测和报警,告知使用者网络中的实时状况,并提供相应的解决、处理方法;是一种侧重于风险管理的安全产品。 IRS:入侵响应系统 深入网络数据内部,查…

项目结束需要经历的5个关键步骤

项目结束是项目管理不可或缺的一部分。这是项目的最后阶段,根据关键绩效指标和范围对交付成果进行测试,收尾,总结经验教训,完成交接,并签署项目。 项目结束与启动会议和一样重要。管理人员应为此留出时间,…

【剑指Offer】:循环有序列表的插入(涉及链表的知识)

给定循环单调非递减列表中的一个点,写一个函数向这个列表中插入一个新元素 insertVal ,使这个列表仍然是循环升序的 给定的可以是这个列表中任意一个顶点的指针,并不一定是这个列表中最小元素的指针 如果有多个满足条件的插入位置&#xff0c…

从VTI7064与W25Qxx了解SPI通信协议

在学习过程中记录。 学习背景 最近在做的项目需要设计电路包含外扩FLASH(W25Q128)与SRAM(VTI7064),二者都用到了SPI通信协议,之前没学过,学习记录一下。 顺便说一下这次学习中发现的好用工具WPS AI。可以对文档进行…