大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式

news2024/12/25 2:19:43

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节完成的内容如下:

  • Spark RDD的依赖关系
  • 重回 WordCount
  • RDD 持久化
  • RDD 缓存

在这里插入图片描述

RDD容错机制

基本概念

涉及到的算子:checkpoint,也是Transformation

  • Spark中对于数据的保存除了持久化操作外,还提供了检查点的机制
  • 检查点本质是通过RDD写入高可靠的磁盘,主要目的是为了容错。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。
  • Lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销
  • cache和checkpoint是有显著区别的,缓存把RDD计算出来然后放到内存中,但RDD的依赖链不能丢掉,当某个点某个Executor宕机了,上面cache的RDD就会丢掉,需要通过依赖链重新计算。不同的是,checkpoint是把RDD保存在HDFS中,是多副本的可靠存储,此时依赖链可以丢弃,所以斩断了依赖链。

适合场景

  • DAG中的Lineage过长,如果重新计算,开销会很大
  • 在宽依赖上做checkpoint获得的收益更大

启动Shell

# 启动 spark-shell
spark-shell --master local[*]

checkpoint

// 设置检查点目录
sc.setCheckpointDir("/tmp/checkpoint")

val rdd1 = sc.parallelize(1 to 1000)
val rdd2 = rdd1.map(_*2)
rdd2.checkpoint
// checkpoint是lazy操作
rdd2.isCheckpointed

可以发现,返回结果是False
在这里插入图片描述

RDD 依赖关系1

checkpoint之前的rdd依赖关系

  • rdd2.dependencies(0).rdd
  • rdd2.dependencies(0).rdd.collect

我们可以观察到,依赖关系是有的,关系到之前的 rdd1 的数据了:
在这里插入图片描述

触发checkpoint

我们可以通过执行 Action 的方式,来触发 checkpoint
执行一次action,触发checkpoint的执行

  • rdd2.count
  • rdd2.isCheckpointed

此时观察,可以发现 checkpoint 已经是 True 了:
在这里插入图片描述

RDD依赖关系2

我们再次观察RDD的依赖关系:
再次查看RDD的依赖关系。可以看到checkpoint后,RDD的lineage被截断,变成从checkpointRDD开始

  • rdd2.dependencies(0).rdd
  • rdd2.dependencies(0).rdd.collect

此时观察到,已经不是最开始的 rdd1 了:
在这里插入图片描述

查看checkpoint

我们可以查看对应的保存的文件,查看RDD所依赖的checkpoint文件

  • rdd2.getCheckpointFile
    运行的结果如下图:
    在这里插入图片描述

RDD的分区

基本概念

spark.default.paralleism: 默认的并发数 2

本地模式

# 此时 spark.default.paralleism 为 N
spark-shell --master local[N]
# 此时 spark.default.paralleism 为 1
spark-shell --master local

伪分布式

  • x为本机上启动的Executor数
  • y为每个Executor使用的core数
  • z为每个Executor使用的内存
  • spark.default.paralleism 为 x * y
spark-shell --master local-cluster[x,y,z]

分布式模式

spark.default.paralleism = max(应用程序持有Executor的core总数, 2)

创建RDD方式

集合创建

简单的说,RDD分区数等于cores总数

val rdd1 = sc.paralleize(1 to 100)
rdd.getNumPartitions

textFile创建

如果没有指定分区数:

  • 本地文件: rdd的分区数 = max(本地文件分片数,sc.defaultMinPartitions)
  • HDFS文件:rdd的分区数 = max(HDFS文件block数,sc.defaultMinPartitions)

需要额外注意的是:

  • 本地文件分片数 = 本地文件大小 / 32M
  • 读取 HDFS 文件,同时指定了分区数 < HDFS文件的Block数,指定的数将不会生效
val rdd = sc.textFile("data/1.txt")
rdd.getNumPartitions

RDD分区器

判断分区器

以下RDD分别是否有分区器,是什么类型的分区器

val rdd1 = sc.textFile("/wcinput/wc.txt")
rdd1.partitioner

val rdd2 = sc.flatMap(_.split("\\s+"))
rdd2.partitioner

val rdd3 = rdd2.map((_, 1))
rdd3.partitioner

val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.partitioner

val rdd5 = rdd4.sortByKey()
rdd5.partitioner

分区器作用与分类

在PairRDD(key,value)中,很多操作都是基于Key的,系统会按照Key对数据进行重组,如 GroupByKey
数据重组需要规则,最常见的就是基于Hash的分区,此外还有一种复杂的基于抽样Range分区方法:
在这里插入图片描述

HashPartitioner

最简单、最常用,也是默认提供的分区器。
对于给定的Key,计算HashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个Key所属的分区ID。
该分区方法可以保证Key相同的数据出现在同一个分区中。
用户可以通过 partitionBy主动使用分区器,通过 partitions参数指定想要分区的数量。

默认情况下的分区情况是:

val rdd1 = sc.makeRDD(1 to 100).map((_, 1))
rdd1.getNumPartitions

执行结果如下图所示:
在这里插入图片描述
执行结果如下图所示,分区已经让我们手动控制成10个了:

val rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(10))
rdd2.getNumPartitions
rdd2.glom.collect.foreach(x => println(x.toBuffer))

RangePartitioner

简单来说就是将一定范围内的数映射到某个分区内,在实现中,分界的算法尤为重要,用到了水塘抽样算法。sortByKey会使用RangePartitioner。
在这里插入图片描述
进行代码的测试:

val rdd3 = rdd1.partitionBy(new org.apache.spark.RangePartitioner(10, rdd1))
rdd3.glom.collect.foreach(x => println(x.toBuffer))

执行结果如下图所示:
在这里插入图片描述
但是现在的问题是:在执行分区之前其实并不知道数据的分布情况,如果想知道数据的分区就需要对数据进行采样。

  • Spark中的RangePartitioner在对数据采样的过程中使用了 “水塘采样法”
  • 水塘采样法是:在包含N个项目的集合S中选取K个样本,其中N为1或者很大的未知的数量,尤其适用于不能把所有N个项目都存放到主内存的情况。
  • 在采样过程中执行了 collect() 操作,引发了 Action 操作。

自定义分区器

Spark允许用户通过自定义的Partitioner对象,灵活的来控制RDD的分区方式。
我们需要实现自定义分区器,按照以下的规则进行分区:

  • 分区 0 < 100
  • 100 <= 分区1 < 200
  • 200 <= 分区2 < 300
  • 300 <= 分区3 < 400
  • 900 <= 分区9 < 1000

编写代码

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import scala.collection.immutable


class MyPartitioner(n: Int) extends Partitioner {

  override def numPartitions: Int = n

  override def getPartition(key: Any): Int = {
    val k = key.toString.toInt
    k / 100
  }
}

object UserDefinedPartitioner {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("UserDefinedPartitioner")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val random = scala.util.Random
    val arr: immutable.IndexedSeq[Int] = (1 to  100)
      .map(idx => random.nextInt(1000))

    val rdd1: RDD[(Int, Int)] = sc.makeRDD(arr).map((_, 1))
    rdd1.glom.collect.foreach(x => println(x.toBuffer))

    println("=========================================")

    val rdd2 = rdd1.partitionBy(new MyPartitioner(10))
    rdd2.glom.collect().foreach(x => println(x.toBuffer))
    
    sc.stop()
    
  }

}

打包上传

这里之前已经重复过多次,就跳过了

mvn clean package

运行测试

spark-submit --master local[*] --class icu.wzk.UserDefinedPartitioner spark-wordcount-1.0-SNAPSHOT.jar

可以看到如下的运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2054251.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Python】AttributeError: module ‘PIL.Image‘ has no attribute ‘ANTIALIAS‘

【Python】成功解决AttributeError: module ‘PIL.Image‘ has no attribute ‘ANTIALIAS‘ 下滑即可查看博客内容 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我静心耕耘深度学习领域、真诚分享知识与智慧的小天地&#xff01;&#x1f387; &#x1f393; 博…

MySQL集群+Keepalived实现高可用部署

Mysql高可用集群-双主双活-myqlkeeplived 一、特殊情况 常见案例&#xff1a;当生产环境中&#xff0c;当应用服务使用了mysql-1连接信息&#xff0c;在升级打包过程中或者有高频的数据持续写入【对数据一致性要求比较高的场景】&#xff0c;这种情况下&#xff0c;数据库连接…

STM32之继电器与震动传感器的使用,实现震动灯

在STM32的外设应用中&#xff0c;继电器扮演着重要的角色。继电器作为一种电控制器件&#xff0c;其主要作用是通过小电流控制大电流的通断&#xff0c;实现电路的自动控制和保护。具体来说&#xff0c;继电器在STM32外设中的作用可以归纳为以下几点&#xff1a; 电路隔离与保…

在线学习考试设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图详细视频演示技术栈系统测试为什么选择我官方认证玩家&#xff0c;服务很多代码文档&#xff0c;百分百好评&#xff0c;战绩可查&#xff01;&#xff01;入职于互联网大厂&#xff0c;可以交流&#xff0c;共同进步。有保障的售后 代码参考数据库参…

“CSS”第一步——WEB开发系列13

CSS (Cascading Style Sheets&#xff0c;层叠样式表&#xff09;&#xff0c;是一种用来为结构化文档&#xff08;如 HTML 文档或 XML 应用&#xff09;添加样式&#xff08;字体、间距和颜色等&#xff09;的计算机语言&#xff0c;CSS 文件扩展名为 .css。 一、什么是 CSS&a…

ubuntu x86_64系统上安装运行aarch系统的虚拟机

安装qemu-system-aarch64 创建sda.qcow2 虚拟磁盘 运行命令启动虚拟机 sudo qemu-system-aarch64 -M virt-4.0 -m 4G -cpu cortex-a57 -bios /usr/share/qemu-efi-aarch64/QEMU_EFI.fd -cdrom ~/下载/openEuler-24.03-LTS-aarch64-dvd.iso -drive ifnone,filesda.qcow2,idhd0…

王老师 linux c++ 通信架构 笔记(五)编译后生成的 nginx 可执行程序的启动

&#xff08;22&#xff09; 启动 nginx &#xff1a; 上网测试一下&#xff1a; 端口号 介绍&#xff1a; &#xff08;23&#xff09; 因为 nginx 监听知名端口号 80 &#xff0c;http 服务。也可以知名端口号&#xff0c;格式如下&#xff1a; 生产环境下可以设置 ngi…

Pulsar官方文档学习笔记——架构概览

架构概览 在最高配置下&#xff0c;pulsar服务应该由一个或多个pulsar集群组成。 一个pulsar集群可以包括如下组件 一个或多个broker。broker会将生产者 的消息分派给消费者。与pulsar配置存储通信来协调各种任务。将消息 存储在 BookKeeper实例中 &#xff08;也可以叫book…

计算机毕业设计选什么题目好?springboot 基于Java的学院教学工作量统计系统

✍✍计算机毕业编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java、…

java生成随机数字,生成随机ID

java在代码中生成随机数字和ID的两个方法 import java.util.UUID; import java.util.Random; public class randomID {public static void main(String[] args) {// TODO Auto-generated method stubUUID uuid UUID.randomUUID();String randomId uuid.toString();System.ou…

Qt自定义控件之提升法

1、参考&#xff1a;Qt之实现自定义控件的两种方式——提升法 2、概述&#xff1a;自定义控件是常需要使用到的技能&#xff0c;在既有的Qt控件不能满足开发的前提下&#xff0c;自定义控件给了程序员很多的发挥空间和便利。自定义控件有两种方式&#xff0c;一种是通过提升法来…

记录win10下 yolov8 tensorrt模型部署

前言 我的环境是 CUDA11.6 cudnn8.4 python3.8 vs2022 tensorRT8.4.2.4 实现 参考了下面这个视频和文章&#xff08;跟着视频做为主&#xff0c;文章为辅&#xff09;一遍成功&#xff0c;因为这个博主写的很详细&#xff0c;很赞&#xff0c;我就不再重复去写了。 视频&…

C++ Primer 总结索引 | 第十八章:用于大型程序的工具

1、大规模应用程序的特殊要求包括&#xff1a; 在独立开发的子系统之间 协同处理错误的能力使用各种库&#xff08;可能包含独立开发的库&#xff09;进行 协同开发的能力对比较复杂的应用 概念建模的能力 对应 异常处理、命名空间和多重继承 1、异常处理 1、异常处理机制 …

【算法】令牌桶算法

一、引言 令牌桶算法&#xff08;Token Bucket Algorithm, TBA&#xff09;是一种流行于网络通信领域的流量控制和速率限制算法。它允许一定程度的突发传输&#xff0c;同时限制长时间内的传输速率。令牌桶算法广泛应用于网络流量管理、API请求限流等场景。其基本原理是通过一个…

vue3 响应式 API:computed()

介绍 基本概念&#xff1a; computed()接收一个 getter 函数或者一个包含 getter 和 setter 函数的对象作为参数&#xff0c;并返回一个基于原始响应式数据计算得到的新的响应式数据对象。计算属性的值会根据其依赖的响应式数据自动更新&#xff0c;当依赖的数据发生变化时&am…

设计模式23-职责链

设计模式23-职责链 动机定义与结构定义结构职责链模式图中元素解释工作流程 C 代码推导优缺点应用场景总结 动机 在软件构建过程中&#xff0c;一个请求可能被多个对象处理。但是每个请求在运行时只能有一个接受者。如果显示指定将必不可少的带来请求发送者与接受者的紧耦合。…

lvs+keepalive大战haproxy+keepalive实现高可用集群

华子目录 lvskeepalive实验架构实验前的准备工作1.主机准备2.KA1和KA2上安装lvskeepalive3.webserver1和webserver2上安装httpd4.制作测试效果网页内容5.所有主机关闭firewalld和selinux6.开启httpd服务 实验步骤1.webserver1和webserver2上配置vip2.webserver1和webserver2上关…

【C语言】 作用域和存储期

C语言的作用域和存储期 一、作用域1、概念&#xff1a;2、函数声明作用域3、局部作用域4、全局作用域5、作用域的临时掩盖6、static关键字 二、存储期1、概念2、自动存储期3、静态存储期4、自定义存储期 一、作用域 1、概念&#xff1a; \quad C语言中&#xff0c;标识符都有一…

《小迪安全》学习笔记03

须知少时凌云志&#xff0c;曾许人间第一流。 静态页面&#xff08;HTML&#xff09;是没有漏洞的&#xff0c;没有数据传递。 动态网站一般来说&#xff0c;有开发组合&#xff0c;即开发语言和数据库&#xff0c;两者兼容性比较好&#xff1a; 而且有的数据库不支持windows或…

网站自动化锚文本的实现逻辑

锚文本&#xff0c;‌即超链接的文本部分&#xff0c;‌它在网页中扮演着至关重要的角色。‌通过点击锚文本&#xff0c;‌用户可以方便地在网页间进行跳转&#xff0c;‌从而极大地提升了用户体验。‌同时&#xff0c;‌在搜索引擎优化&#xff08;‌SEO&#xff09;‌领域&am…