03-240605-Spark笔记

news2025/1/13 7:51:26

03-240605

1. 行动算子-1

  • reduce

聚合

格式:

def reduce(f: (T, T) => T): T

例子:

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)
​
        val rdd = sc.makeRDD(List(1,2,3,4))
​
        // TODO - 行动算子
​
        //reduce
        val i: Int = rdd.reduce(_+_)
        println(i)

输出结果:

10

  • collect

采集

格式:

def collect(): Array[T]

例子:

        // collect : 方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
        val ints: Array[Int] = rdd.collect()
        println(ints.mkString(","))

输出结果:

1,2,3,4

  • count

计数

格式:

def count(): Long

例子:

        // count : 数据源中数据的个数
        val cnt = rdd.count()
        println(cnt)

运行结果:

4

  • first

获取数据源的第一个数据

格式:

def first(): T

例子:

        // first : 获取数据源中数据的第一个
        val first = rdd.first()
        println(first)

输出结果:

1

  • take

获取数据源的N个数据

格式:

def take(num: Int): Array[T]

例子:

        // take : 获取N个数据
        val ints: Array[Int] = rdd.take(3)
        println(ints.mkString(","))

输出结果:

1,2,3

  • takeOrdered

数据排序后.再取第N个数据

格式:

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

例子:

        // takeOrdered : 数据排序后,取N个数据
        val rdd1 = sc.makeRDD(List(4,2,3,1))
        val ints1: Array[Int] = rdd1.takeOrdered(3)
        println(ints1.mkString(","))

输出结果:

1,2,3

  • aggregate

给定初始值,初始值参与分区内与分区间的计算

格式:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

例子:

        val rdd = sc.makeRDD(List(1,2,3,4),2)
        //10 + 13 + 17 = 40
        // aggregateByKey : 初始值只会参与分区内计算
        // aggregate : 初始值会参与分区内计算,并且和参与分区间计算
        val result = rdd.aggregate(10)(_+_._+_)

        println(result)

输出结果:

40

  • fold

折叠操作,aggregate的简化版操作

格式:

def fold(zeroValue: T)(op: (T, T) => T): T

例子:

        //10 + 13 + 17 = 40
        // aggregateByKey : 初始值只会参与分区内计算
        // aggregate : 初始值会参与分区内计算,并且和参与分区间计算
        //val result = rdd.aggregate(10)(_+_, _+_)
        val result = rdd.fold(10)(_+_)

        println(result)

输出结果:

40

  • countByKey 与 countByValue

都是统计每种Key或者Value出现的个数

格式:

def countByKey(): Map[K, Long]

例子:

image-20240604213641365

        val rdd = sc.makeRDD(List(
            ("a", 1),("a", 2),("a", 3)
        ))

       //val intToLong: collection.Map[Int, Long] = rdd.countByValue()
        //println(intToLong)
        val stringToLong: collection.Map[String, Long] = rdd.countByKey()
        println(stringToLong)

输出结果:

Map(a -> 3)

  • WordCount 不同的实现方式:

运用9种不同的方式实现WordCount

  1. 使用groupBy:

    // groupBy
    def wordcount1(sc : SparkContext): Unit = {

        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val group: RDD[(String, Iterable[String])] = words.groupBy(word=>word)
        val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size)
    }
  1. 使用groupByKey:

    // groupByKey
    def wordcount2(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_,1))
        val group: RDD[(String, Iterable[Int])] = wordOne.groupByKey()
        val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size)
    }
  1. 使用reduceByKey:

    // reduceByKey
    def wordcount3(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_,1))
        val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_)
    }
  1. 使用aggregateByKey

    // aggregateByKey
    def wordcount4(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_,1))
        val wordCount: RDD[(String, Int)] = wordOne.aggregateByKey(0)(_+_, _+_)
    }
  1. 使用foldByKey:

    // foldByKey
    def wordcount5(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_,1))
        val wordCount: RDD[(String, Int)] = wordOne.foldByKey(0)(_+_)
    }
  1. 使用combineByKey:

    // combineByKey
    def wordcount6(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_,1))
        val wordCount: RDD[(String, Int)] = wordOne.combineByKey(
            v=>v,
            (x:Int, y) => x + y,
            (x:Int, y:Int) => x + y
        )
    }
  1. 使用countByKey:

    // countByKey
    def wordcount7(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_,1))
        val wordCount: collection.Map[String, Long] = wordOne.countByKey()
    }
  1. 使用countByValue:

    // countByValue
    def wordcount8(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordCount: collection.Map[String, Long] = words.countByValue()
    }
  1. 使用reduce:

    def wordcount91011(sc : SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))

        // 【(word, count),(word, count)】
        // word => Map[(word,1)]
        val mapWord = words.map(
            word => {
                mutable.Map[String, Long]((word,1))
            }
        )

       val wordCount = mapWord.reduce(
            (map1, map2) => {
                map2.foreach{
                    case (word, count) => {
                        val newCount = map1.getOrElse(word, 0L) + count
                        map1.update(word, newCount)
                    }
                }
                map1
            }
        )

        println(wordCount)
    }

2. 序列化

算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。

  • RDD序列化

案例:

object Spark01_RDD_Serial {

    def main(args: Array[String]): Unit = {
        val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sc = new SparkContext(sparConf)

        val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))

        val search = new Search("h")

        //search.getMatch1(rdd).collect().foreach(println)
        search.getMatch2(rdd).collect().foreach(println)

        sc.stop()
    }
    // 查询对象
    // 类的构造参数其实是类的属性, 构造参数需要进行闭包检测,其实就等同于类进行闭包检测
    class Search(query:String){

        def isMatch(s: String): Boolean = {
            s.contains(this.query)
        }

        // 函数序列化案例
        def getMatch1 (rdd: RDD[String]): RDD[String] = {
            rdd.filter(isMatch)
        }

        // 属性序列化案例
        def getMatch2(rdd: RDD[String]): RDD[String] = {
            val s = query
            rdd.filter(x => x.contains(s))
        }
    }
}

输出结果:

image-20240605133427336

  • Kryo序列化框架

Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。

了解一下就行

案例:

 def main(args: Array[String]): Unit = {
 val conf: SparkConf = new SparkConf()
 .setAppName("SerDemo")
 .setMaster("local[*]")
 // 替换默认的序列化机制
 .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
 // 注册需要使用 kryo 序列化的自定义类
 .registerKryoClasses(Array(classOf[Searcher]))
 val sc = new SparkContext(conf)
 val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", 
"atguigu", "hahah"), 2)
 val searcher = new Searcher("hello")
 val result: RDD[String] = searcher.getMatchedRDD1(rdd)
 result.collect.foreach(println)
 }
}
case class Searcher(val query: String) {
 def isMatch(s: String) = {
 s.contains(query)
 }
 def getMatchedRDD1(rdd: RDD[String]) = {
 rdd.filter(isMatch) 
 }
 def getMatchedRDD2(rdd: RDD[String]) = {
 val q = query
 rdd.filter(_.contains(q))
 }

Kryo绕过了Java的序列化机制,Kryo比Java序列化小,适合大数据传输、存储

  • RDD 血缘关系

toDebugString查看血缘关系

image-20240605135251669

多个连续的RDD的依赖关系,称之为血缘关系

演示:

image-20240605135543501

关于如何将RDD间的关系保存下来:

image-20240605135759577

血缘关系演示:

image-20240605140042850

image-20240605140110991

image-20240605140124464

  • RDD的依赖关系

dependencies查看依赖关系

image-20240605140238899

OneToOne依赖(窄依赖)

image-20240605140706460

窄依赖我们形象的比喻为独生子女。

image-20240605141335525

Shuffle依赖(宽依赖):

image-20240605140820212

宽依赖我们形象的比喻为多生。

image-20240605141533442

  • RDD 阶级划分

image-20240605141721424

image-20240605142320693

  • RDD 任务划分

image-20240605142405972

源码演示:

image-20240605142747592

  • RDD 的持久化

这样的复用在底层不是很好用:

image-20240605143051395

image-20240605143137900

应该这样:

image-20240605143221932

image-20240605143241039

image-20240605143253432

放在内存中 mapRDD.cache()

放在磁盘中 mapRDD.persist()

Cache缓存:

image-20240605143410471

  • RDD CheckPoint 检查点

image-20240605143502870

image-20240605143516625

checkpoint 需要落盘,需要指定检查点保存路径

检查点路径保存的文件,当作业执行完毕后,不会被删除

一般保存路径都是在分布式存储系统: HDFS

  • checkpoint、Cache、Persist的区别:

以上三个都可以存储,关于他们的区别:

cache : 将数据临时存储在内存中进行数据重用

会在血缘关系中添加新的依赖。一旦出现问题,可以重新读取数据

persist : 将数据临时存储在硬盘文件中进行数据重用

涉及到磁盘IO,性能较低,但是数据安全

如果作业执行完毕,临时保存的数据文件就会丢失

checkpoint : 将数据长久地保存在磁盘文件中进行数据重用

涉及到磁盘IO,性能较低,但是数据安全

为了保证数据安全,所以一般情况下,会独立执行作业

为了能够提高效率,一般情况下,是需要和cache联合使用

执行过程中,会切断血缘关系,重新建立新的血缘关系

checkpoint等同于改变数据源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1810913.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

最好用的搜题软件大学?8个公众号和软件推荐清单! #知识分享#知识分享#经验分享

今天,我将分享一些受欢迎的、被大学生广泛使用的日常学习工具,希望能给你的学习生活带来一些便利和启发。 1.彩虹搜题 这个是公众号 一款专供大学生使用的搜题神器专注于大学生校内学习和考研/公考等能力提升 下方附上一些测试的试题及答案 1、行大量…

通过 CartPole 游戏详细说明 PPO 优化过程

CartPole 介绍 在一个光滑的轨道上有个推车,杆子垂直微置在推车上,随时有倒的风险。系统每次对推车施加向左或者向右的力,但我们的目标是让杆子保持直立。杆子保持直立的每个时间单位都会获得 1 的奖励。但是当杆子与垂直方向成 15 度以上的…

Vue3【十七】props的作用和组件之间的传值限定类型和默认值

Vue3【十七】props的作用和组件之间的传值限定类型和默认值 Vue3【十七】props的作用和组件之间的传值限定类型和默认值 父组件传值给子组件 多个值传递 传值限定类型和 默认值 实例截图 目录结构 代码 person.vue <template><div class"person"><p…

硬件开发笔记(十七):RK3568底板电路串口、485、usb原理图详解

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/139589308 红胖子网络科技博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软硬…

格式工厂 v5 解锁版 (免费多媒体文件转换工具)

前言 格式工厂是免费多功能的多媒体文件转换工具&#xff0c;轻松转换一切你想要的格式。利器在手&#xff0c;转换不愁&#xff01;支持几乎所有类型格式的相互转换&#xff0c;各种视频、音频、图片、PDF文档等格式&#xff0c;转换视频过程中&#xff0c;可以修复损坏的文件…

Cesium离线部署影像+地形:从0到1

Cesium加载本地影像地形 本教程记录的是小白从0-1搭建本地cesium服务的过程&#xff0c;踩的各种坑通过查找资料都一一填补&#xff0c;最终达到的效果是在本地上能够跑官网飞机航线的例子。效果如下&#xff1a; 主要流程如下&#xff1a; 1、下载离线地图和地形2、nginx部署…

工业机器人远程运维,增强智慧工厂运营管理

1、需求背景 随着工业自动化技术的普及和工业机器人应用的增加&#xff0c;制造业对于生产线稳定性和效率的要求不断提高。然而&#xff0c;传统的现场监控方式存在着地理位置限制、实时监控难度大以及诊断能力有限等问题&#xff0c;迫切需要一种更具灵活性和效率的监控方式。…

FFMpeg解复用流程

文章目录 解复用流程图复用器与解复用器小结 解复用流程图 流程图&#xff0c;如上图所示。 复用器与解复用器 复用器&#xff0c;就是视频流&#xff0c;音频流&#xff0c;字幕流&#xff0c;其他成分&#xff0c;按照一定规则组合成视频文件&#xff0c;视频文件可以是mp4…

“百变换装师”之证照之星

拍证件照是一件很麻烦的事吗&#xff1f;证件照编辑是一件复杂的事吗&#xff1f;只有专业人员才能对证件照进行编辑吗&#xff1f;以前可能是&#xff0c;但今天小编将给大家分享一个证件照编辑软件证照之星&#xff0c;它将使每一个人都能具备简单的证件照编辑技能。 证照之星…

cve_2017_12635-CouchDB垂直权限绕过

1.采用参考 https://www.cnblogs.com/mlxwl/p/16577781.html vulfocus&#xff1a;Vulfocus 漏洞威胁分析平台 2.产生原因 在2017年11月15日&#xff0c;CVE-2017-12635和CVE-2017-12636披露&#xff0c;CVE-2017-12635是由于Erlang和JavaScript对JSON解析方式的不同&#…

优优嗨聚集团:卤味市场新风向,创新融合与品质升级引领未来发展

卤味市场作为中国传统美食文化的重要组成部分&#xff0c;近年来呈现出蓬勃发展的态势。随着消费者口味的不断变化和市场的日益竞争&#xff0c;卤味行业正面临着前所未有的机遇与挑战。那么&#xff0c;卤味市场的未来发展将何去何从&#xff1f;本文将从创新融合和品质升级两…

Python 深度学习和机器学习的模型评估库之torchmetrics使用详解

概要 在深度学习和机器学习项目中,模型评估是一个至关重要的环节。为了准确地评估模型的性能,开发者通常需要计算各种指标(metrics),如准确率、精确率、召回率、F1 分数等。torchmetrics 是一个用于 PyTorch 的开源库,提供了一组方便且高效的评估指标计算工具。本文将详…

第26讲:Ceph集群OSD扩缩容中Reblanceing数据重分布

文章目录 1.Reblanceing数据重分布的概念2.验证Reblanceing触发的过程3.Reblanceing细节4.临时关闭Reblanceing机制 1.Reblanceing数据重分布的概念 当集群中OSD进行扩缩容操作后&#xff0c;会触发一个Reblanceing数据重分布的机制&#xff0c;简单的理解就是将扩缩容前后OSD…

2_1 Linux基础操作

2_1 Linux基础操作 文章目录 2_1 Linux基础操作0. 参考1. 装机后的一些小命令查看系统的信息2. 基础命令2.1 初识基本命令2.2 日期和时间 3. 帮助命令4. 关机、重启5. 设置主机名6. rm删除7. 软件包的管理RPM、 YUM8. IP知识9. 查看一些linux的信息10. 命令行快捷键11. 光盘挂载…

配置响应拦截器,全局前置导航守卫

1&#xff1a;配置响应拦截器 响应拦截器&#xff0c;统一处理接口的错误 问题&#xff1a;每次请求&#xff0c;都会有可能会错误&#xff0c;就都需要错误提示 说明&#xff1a;响应拦截器是咱们拿到数据的 第一个 数据流转站&#xff0c;可以在里面统一处理错误。 // 添…

Lua搭建网站后台教程

本文讲解如何使用二进制发布包和FastWeb网站管理工具搭建站点 FastWeb网站管理工具 使用该工具可快速在Windows平台部署。支持官方或三方模块的自动安装、日志调试、版本更新等。 1、下载最新版本压缩包 2、解压到任意目录(建议英文) 3、运行 ①点击 [设置]->[安装] 部…

微信小程序使用 “云函数“ 获取 “openid“

文章目录 1.前期准备2.具体操作步骤 1.前期准备 必须使用云开发已经配置好云开发 2.具体操作步骤 1.进入小程序开发工具→在云函数目录上右键→选中新建云函数 创建结束&#xff0c;自动上传&#xff08;必须确认已经上传才生效&#xff09; 2.进入对应页面的js文件&#…

【qt】坐标系变换

坐标系变换 一.物理坐标二.逻辑坐标1.平移2.旋转3.扭转4.缩放 三.案例结合画一个五角星四.总结 一.物理坐标 物理坐标系&#xff0c;就是我们上节课说的&#xff0c;坐标的原点在窗口的左上角。这节课我们可以通过改变原点的位置来达到我们想姚的逻辑坐标。 二.逻辑坐标 1.平…

Java程序设计————从控制台输入

向控制台输入信息可以借助Scanner扫描器类来实现 语法&#xff1a; Scanner input new Scanner(System.in); 提示 &#xff08;1&#xff09;在使用Scanner类型之前&#xff0c;需要首先指明Scanner类所在的位置&#xff0c;既通过代码 import java.util.Scanner; &…

利用 HTML5 Canvas 实现在线签字功能

目录 前言 一、HTML5 Canvas 简介 二、签字功能的实现 效果演示 完整代码 前言 在现代互联网应用中&#xff0c;有时我们需要让用户在网页上进行签字操作&#xff0c;比如确认文件、填写电子表格或者签署合同。利用 HTML5 的 canvas 画布&#xff0c;我们可以轻松地实现这一…