99-127-spark-核心编程-持久化-分区-io-累加器-广播变量

news2024/11/26 2:52:00

99-spark-核心编程-持久化-分区-io:

RDD持久化

1) RDD Cache 缓存 Spark02_RDD_Persist

RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bI74D04a-1670771918745)(png/image-20211017094819447.png)]

RDD中不存储数据
如果一个RDD需要重复使用,那么需要从头再次执行来获取数据
RDD对象可以重用的,但是数据无法重用

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RcPrx4l7-1670771918746)(png/image-20211017094920159.png)]

RDD对象的持久化操作不一定是为了重用
在数据执行较长,或数据比较重要的场合也可以采用持久化操作

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tFWrdyfM-1670771918746)(png/image-20211017095014271.png)]

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。

Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或 cache。

RDD CheckPoint 检查点 Spark02_RDD_Persist

检查点就是通过将 RDD 中间结果写入磁盘,由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点

之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

缓存和检查点区别

1)Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。

2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高。

3)建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD。

RDD分区器

Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区,进而决定了 Reduce 的个数。

➢ 只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None

➢ 每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

  1. Hash分区:对于给定的 key,计算其 hashCode,并除以分区个数取余
class HashPartitioner(partitions: Int) extends Partitioner {
 require(partitions >= 0, s"Number of partitions ($partitions) cannot be 
negative.")
 def numPartitions: Int = partitions
 def getPartition(key: Any): Int = key match {
 case null => 0
 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
 }
 override def equals(other: Any): Boolean = other match {
 case h: HashPartitioner =>
 h.numPartitions == numPartitions
 case _ =>
 false
 }
 override def hashCode: Int = numPartitions
}
  1. Range 分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序
class RangePartitioner[K : Ordering : ClassTag, V](
 partitions: Int,
 rdd: RDD[_ <: Product2[K, V]],
 private var ascending: Boolean = true)
 extends Partitioner {
 // We allow partitions = 0, which happens when sorting an empty RDD under the 
default settings.
 require(partitions >= 0, s"Number of partitions cannot be negative but found 
$partitions.")
 private var ordering = implicitly[Ordering[K]]
 // An array of upper bounds for the first (partitions - 1) partitions
 private var rangeBounds: Array[K] = {
 ...
 }
 def numPartitions: Int = rangeBounds.length + 1
 private var binarySearch: ((Array[K], K) => Int) = 
CollectionsUtils.makeBinarySearch[K]
def getPartition(key: Any): Int = {
 val k = key.asInstanceOf[K]
 var partition = 0
 if (rangeBounds.length <= 128) {
 // If we have less than 128 partitions naive search
 while (partition < rangeBounds.length && ordering.gt(k, 
rangeBounds(partition))) {
 partition += 1
 }
 } else {
 // Determine which binary search method to use only once.
 partition = binarySearch(rangeBounds, k)
 // binarySearch either returns the match location or -[insertion point]-1
 if (partition < 0) {
 partition = -partition-1
 }
 if (partition > rangeBounds.length) {
 partition = rangeBounds.length
 }
 }
 if (ascending) {
 partition
 } else {
 rangeBounds.length - partition
 }
 }
 override def equals(other: Any): Boolean = other match {
 ...
 }
 override def hashCode(): Int = {
 ...
 }
 @throws(classOf[IOException])
 private def writeObject(out: ObjectOutputStream): Unit = 
Utils.tryOrIOException {
 ...
 }
 @throws(classOf[IOException])
 private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException 
{
 ...
 } }

自定义分区器 Spark01_RDD_Part

package spark.core.com.zh.operator.part

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}

/**
 * 分区器
 */
object Spark01_RDD_Part {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local").setAppName("hPartitioner") //appname  应用名称
    val sc = new SparkContext(sparkConf)
    val rdd: RDD[(String, String)] = sc.makeRDD(List(
      ("nba", "xx-nba-xxx"),
      ("cba", "xx-cba-xxx"),
      ("wnba", "xx-wnba-xxx"),
      ("nba", "xx-nba-xxx")
    ), 3)
    val partRdd: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner)
    partRdd.saveAsTextFile("part-ouput ")
    sc.stop()
  }

  //自定义分区器
  //1.继承Partitioner
  //2.实现方法
  class MyPartitioner extends Partitioner {
    //分区数量
    override def numPartitions: Int = 3

    //根据数据的key值返回数据的分区索引,从0开始,
    override def getPartition(key: Any): Int = {
      key match {
        case "nba" => 0
        case "wnba" => 1
        case _ => 0
      }
    }
  }
}

RDD 文件读取与保存

Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

文件格式分为:text 文件、csv 文件、sequence 文件以及 Object 文件;

文件系统分为:本地文件系统、HDFS、HBASE 以及数据库。

text 文件

// 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/1.txt")
// 保存数据
inputRDD.saveAsTextFile("output")

sequence 文件

SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat File)。在 SparkContext 中,可以调用 sequenceFilekeyClass, valueClass。

// 保存数据为 SequenceFile
dataRDD.saveAsSequenceFile("output")
// 读取 SequenceFile 文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)

object 对象文件

对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。可以通过 objectFileT: ClassTag函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。

// 保存数据
dataRDD.saveAsObjectFile("output")
// 读取数据
sc.objectFile[Int]("output").collect().foreach(println)

➢ RDD : 弹性分布式数据集

➢ 累加器:分布式共享只写变量

➢ 广播变量:分布式共享只读变量

累加器:用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。注意累加器的少加和多加问题。(Spark02_Acc)

累加器实现wordcount Spark03_Acc_wordcount

未使用累加器图解,虚线表示sum并不会返回

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xG3JSkLP-1670771918747)(png/image-20211017124119557.png)]

使用累加器图解,会将executor中的sumacc进行返回,在driver进行merge

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QqJNFsRC-1670771918747)(png/image-20211017124255071.png)]

实现原理

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。如果需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送。

广播变量的原因: Spark04_Broadcast

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xoKEQZRT-1670771918747)(png/image-20211017135537993.png)]

闭包数据,都是以Task为 单位发送的,每个任务中包含闭包数据
这样可能会导致,一个Executor中含有大量重复的数据,并且占用大量的内存
Executor其实就一个 JVM,所以在启动时,会自动分配内存
完全可以将任务中的闭包数据放置在Executor的内存中,达到共享的目的
Spark中的广播变量就可以将闭包的数据保存到Executor的内存中
Spark中的广播变量不能够更改:分布式共享只读变量

Spark案例测试 来源借鉴学习于(https://www.bilibili.com/video/BV11A411L7CK?p=110)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VgQQy0UB-1670771918748)(png/image-20211017191933506.png)]

上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要包含用户的 4 种行为:搜索,点击,下单,支付。数据规则如下:

➢ 数据文件中每行数据采用下划线分隔数据

➢ 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种

➢ 如果搜索关键字为 null,表示数据不是搜索数据

➢ 如果点击的品类 ID 和产品 ID 为-1,表示数据不是点击数据

➢ 针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个,id 之间采用逗号分隔,如果本次不是下单行为,则数据采用 null 表示

➢ 支付行为和下单行为类似

需求 1:Top10热门品类 Spark02_Req1_HotCategoryTop10Analysis2

需求 2:Top10热门品类中每个品类的Top10活跃 Session 统计 Spark03_Req2_HotCategoryTop10SessionAnalysis

需求 3:页面单跳转换率统计图解 Spark04_Req3_PageflowAnalysis

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eegAWYbE-1670771918748)(png/image-20211017191831538.png)]

工程化代码:架构模式-三层架构

三层架构:controller(控制层),service(服务层),dao(持久层)

ThreadLocal可以对线程的内存进行控制,存储数据,共享数据

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CywrOnbe-1670771918749)(png/image-20211018225402687.png)]

路径:big-data-study\Spark-demo\src\main\java\spark\core\com\zh\operator\framework

学习路径:https://space.bilibili.com/302417610/,如有侵权,请联系q进行删除:3623472230

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/86361.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OS_@假脱机技术@设备分配@设备映射

文章目录OS_假脱机技术设备分配设备映射设备分配的策略1)设备分配原则2)设备分配方式静态分配动态分配3)设备分配算法设备分配的安全性1)安全分配方式2)不安全分配方式逻辑设备名到物理设备名的映射两种方式设置逻辑设备表假脱机(Spooling)系统SPOOLing系统的组成SPOOLing的工作…

足球一代又一代得青春

世界杯由来 世界杯&#xff08;World Cup&#xff09;即国际足联世界杯&#xff0c;是世界上最高水平的足球赛事。 众所周知&#xff0c;现代足球起源于英国&#xff0c;随后风靡世界。由于足球运动的迅速发展&#xff0c;国际比赛也随之出现。1896年&#xff0c;第一届现代奥…

[附源码]计算机毕业设计的疫苗接种管理系统Springboot程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot mybatis MavenVue等等组成&#xff0c;B/S模式…

Educational Codeforces Round 139 (Rated for Div. 2) D. Lucky Chains

翻译&#xff1a; 让我们命名一对正整数(&#x1d465;&#xff0c;&#x1d466;)&#xff0c;幸运的是它们的最大公约数等于1 (gcd(&#x1d465;&#xff0c;&#x1d466;)1)。 让我们定义一个链(&#x1d465;&#x1d466;)引起的一系列双(&#x1d465;&#x1d466;)…

Vue中对获取数据、返回数据进行处理的wode总结

&#x1f4ad;&#x1f4ad; ✨&#xff1a;Vue中对获取数据、返回数据进行处理的总结   &#x1f49f;&#xff1a;东非不开森的主页   &#x1f49c;: 也许&#xff0c;不负光阴就是最好的努力&#xff0c;而努力就是最好的自己。&#x1f49c;&#x1f49c;   &#x1f3…

Mybatis:MyBatis的分页插件(11)

Mybaits笔记框架&#xff1a;https://blog.csdn.net/qq_43751200/article/details/128154837 Mybatis中文官方文档&#xff1a; https://mybatis.org/mybatis-3/zh/index.html 分页插件1. 分页插件使用步骤2. 分页插件的使用2.1: 开启分页功能2.2: 分页相关数据2.3: 常用数据1.…

在小公司干测试5年,如今终于熬出头了,入职美团涨薪14K

你的努力&#xff0c;终将成就无可替代的自己&#xff0c;本科毕业后就一直从事软件测试的工作&#xff0c;和多数人一样&#xff0c;最开始从事功能测试的工作&#xff0c;我朋友看着自己的同学一步一步往上走&#xff0c;自己还是在原地踏步&#xff0c;说实话这不是自己想要…

TongWeb7微服务适配方案

先介绍一下我们微服务项目的部署情况&#xff1a; 之前使用的是内置的Tomcat容器部署方式&#xff0c;运行项目使用的 java -jar 项目文件 方式&#xff0c;然后使用k8sdocker容器化部署。 还没了解TongWeb部署的同学们&#xff0c;可以看看我前面写的几个关于TongWeb本地部…

【有营养的算法笔记】巧解蛇形矩阵

&#x1f451;作者主页&#xff1a;进击的安度因 &#x1f3e0;学习社区&#xff1a;进击的安度因&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;有营养的算法笔记 ✉️分类专栏&#xff1a;题解 文章目录一、题目描述二、思路讲解三、代码实现一、题目描…

DevExpress ASP.NET and Blazor图表编制

DevExpress ASP.NET and Blazor图表编制 .NET 6现在是受支持的最低框架版本-此版本需要.NET 6和Microsoft Visual Studio 2022(v17.0)或更高版本。 图表编制 范围条形图-最小条形图大小-您现在可以使用新的minBarSize属性为范围条形图中显示的条形图指定最小大小。 数据可视化组…

RabbitMQ如何保证消息的可靠性

文章目录可靠性分析可靠性方案可靠性实现确认Exchange接收到消息确认Queue接收到消息保证Queue及其数据持久化保证消费者的正常消费重复消费问题消息丢失问题可靠性分析 RabbitMQ如何保证消息的可靠&#xff1f;如RabbitMQ基础概念中的架构模型 可以看到一条消息的传递过程&a…

还在用HttpUtil?SpringBoot 3.0全新HTTP客户端工具来了,用起来够优雅~

我们平时开发项目的时候&#xff0c;经常会需要远程调用下其他服务提供的接口&#xff0c;于是我们会使用一些HTTP工具类比如Hutool提供的HttpUtil。前不久SpringBoot 3.0发布了&#xff0c;出了一个Http Interface的新特性&#xff0c;它允许我们使用声明式服务调用的方式来调…

PreScan快速入门到精通第四十讲目标边界传感器

边界矩形传感器提供了关于传感器可检测物体的边界矩形的信息,并作为对摄像机输入的边界矩形算法的参考。一个例子是行人识别算法,该算法用于检测夜间、雾、雨或雪等恶劣照明条件下的行人。输出的检测到的边界矩形是按距离排序的--最近的在前。 注意:边界矩形传感器不检测(或…

Qt扫盲-QDoubleSpinBox理论总结

QDoubleSpinBox理论总结1. 简述2. 调值与值转换3. 信号4. 修饰&外观1. 简述 QDoubleSpinBox 主要是对于浮点数据的输入进行便捷的封装。QDoubleSpinBox和QSpinBox的使用基本一致&#xff0c;只是有些控制有些不同嘛。比如对于浮点数的小数点精度位数的控制啦。 QDoubleSpi…

第五章. 可视化数据分析图表—常用图表的绘制4—箱形图,3D图表

第五章. 可视化数据分析图 5.3 常用图表的绘制4—箱形图&#xff0c;3D图表 本节主要介绍常用图表的绘制&#xff0c;主要包括箱形图&#xff0c;3D柱形图&#xff0c;3D曲面图。 1.箱形图&#xff08;matplotlib.pyplot.boxplot&#xff09; 箱形图又称箱线图、盒须图或盒式…

你还在为 “动态规划” 发愁吗?看完本秘籍,带你斩杀这类题~

目录 前言 一、动态规划——解题思路 二、动态规划——模板以及题目 2.1、Fibonacci 2.2、字符串分割(Word Break) 2.3、三角矩阵(Triangle&#xff09; 2.4、路径总数(Unique Paths&#xff09; 2.5、最小路径和(Minimum Path Sum) 2.6、背包问题 2.7、回文串分割(Pa…

第08讲:使用脚手架创建vue项目

一、安装NodeJS 二、配置环境变量 2.1、软件安装完成之后配置npm的环境变量 第1步&#xff1a;获取npm安装位置 使用管理员身份打开CMD&#xff0c;用如下命令获取npm的安装位置&#xff1a; npm config list第2步&#xff1a;配置环境变量 将以上获取的路径保存到path变…

flask请求与响应、session执行流程

目录 请求对象 响应对象 session的使用和原理 闪现(flash) 请求扩展 蓝图 请求对象 请求对象request是全局的&#xff0c;需要导入这个全局的request&#xff0c;在哪个视图函数中就是当次的request对象 请求数据&#xff1a; request.method # 获取提交的方法 …

文件包含漏洞简介

今天继续给大家介绍渗透测试相关知识&#xff0c;本文主要内容是文件包含漏洞简介。 免责声明&#xff1a; 本文所介绍的内容仅做学习交流使用&#xff0c;严禁利用文中技术进行非法行为&#xff0c;否则造成一切严重后果自负&#xff01; 再次强调&#xff1a;严禁对未授权设备…

一些好玩的js小作品

今天小编给大家带来了一些很实用的js小作品&#xff0c;下面请一看究竟。 1、计算详细年龄工具js脚本 2、检测是否安装Flash插件及版本号js脚本 3、无法查看源码的页面 4、面积换算js脚本 5、体积和容积换算js脚本 6、长度换算js脚本 7、重量换算js脚本 8、只能输入汉字…