04-240606Spark笔记

news2024/11/24 5:07:51

04-240606Spark笔记

1.行动算子-2

  • save相关算子:

格式:

def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
 path: String,
 codec: Option[Class[_ <: CompressionCodec]] = None): Unit

例子:

  val rdd = sc.makeRDD(List(
    ("a",1),("a",2),("a",3)
  ))
​
    rdd.saveAsTextFile("output")
    rdd.saveAsObjectFile("output1")
    // saveAsSequenceFile方法要求数据的格式必须为K-V类型
    rdd.saveAsSequenceFile("output2")

输出结果:

image-20240604225213130

  • foreach

格式:

def foreach(f: T => Unit): Unit = withScope {
 val cleanF = sc.clean(f)
 sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

例子:

    val rdd = sc.makeRDD(List(1,2,3,4))
​
    //foreach 其实是Driver端内存集合的循环遍历方法
    rdd.collect().foreach(println) //Driver
    println("***************")
    // foreach 其实是Executor端内存数据打印
    rdd.foreach(println)    // Executor
    // 算子 : Operator(操作)
    //         RDD的方法和Scala集合对象的方法不一样
    //         集合对象的方法都是在同一个节点的内存中完成的。
    //         RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行
    //         为了区分不同的处理效果,所以将RDD的方法称之为算子。
    //        RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。

输出结果:

image-20240604232824753

2. 序列化

2.1 闭包检测
  • 闭包检测

因为Driver需要给两个Executor共享User方法,共享就需要序列化

案例:

  def main(args: Array[String]): Unit = {
​
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
​
    val rdd = sc.makeRDD(List[Int]())
​
    val user = new User()
​
    // SparkException: Task not serializable
    // NotSerializableException: com.atguigu.bigdata.spark.core.rdd.operator.action.Spark07_RDD_Operator_Action$User
​
    // RDD算子中传递的函数是会包含闭包操作,那么就会进行检测功能
    // 闭包检测
    rdd.foreach(
      num => {
        println("age = " + (user.age + num))
      }
    )
​
    sc.stop()
​
  }
  //class User extends Serializable {
  // 样例类在编译时,会自动混入序列化特质(实现可序列化接口)
  //case class User() {
  class User {
    var age : Int = 30
  }
  • RDD 的分区器

自己来写分区器:

    def main(args: Array[String]): Unit = {
        val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sc = new SparkContext(sparConf)
​
        val rdd = sc.makeRDD(List(
            ("nba", "xxxxxxxxx"),
            ("cba", "xxxxxxxxx"),
            ("wnba", "xxxxxxxxx"),
            ("nba", "xxxxxxxxx"),
        ),3)
        val partRDD: RDD[(String, String)] = rdd.partitionBy( new MyPartitioner )
​
        partRDD.saveAsTextFile("output")
​
        sc.stop()
    }

自定义的分区器:

    class MyPartitioner extends Partitioner{
        // 分区数量
        override def numPartitions: Int = 3
​
        // 根据数据的key值返回数据所在的分区索引(从0开始)
        override def getPartition(key: Any): Int = {
            key match {
                case "nba" => 0
                case "wnba" => 1
                case _ => 2
            }
        }
    }
* 自定义分区器
* 1. 继承Partitioner
* 2. 重写方法

输出结果:

image-20240605170312913

image-20240605170321664

  • RDD 文件读取与保存

案例1:

    def main(args: Array[String]): Unit = {
        val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sc = new SparkContext(sparConf)
​
        val rdd = sc.textFile("output1")
        println(rdd.collect().mkString(","))
​
        val rdd1 = sc.objectFile[(String, Int)]("output2")
        println(rdd1.collect().mkString(","))
​
        val rdd2 = sc.sequenceFile[String, Int]("output3")
        println(rdd2.collect().mkString(","))
​
        sc.stop()
    }

输出结果:

image-20240605170535800

案例2:

    def main(args: Array[String]): Unit = {
        val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sc = new SparkContext(sparConf)
​
        val rdd = sc.makeRDD(
            List(
                ("a", 1),
                ("b", 2),
                ("c", 3)
            )
        )
​
        rdd.saveAsTextFile("output1")
        rdd.saveAsObjectFile("output2")
        rdd.saveAsSequenceFile("output3")
​
        sc.stop()
    }

输出结果:

image-20240605170643956

1. 数据结构:

image-20240605170954358

  • 累加器

累加器用来把 Executor 端变量信息聚合到 Driver 端。

![image-20240605202228850](E:\Files2\Typictures\image-20240605202228850.png

image-20240605202424331

Acc,累加器可以把Excutor端的数据返回到Driver中去:

image-20240605202543334

案例:

    def main(args: Array[String]): Unit = {
​
        val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
        val sc = new SparkContext(sparConf)
​
        val rdd = sc.makeRDD(List(1,2,3,4))
​
        // reduce : 分区内计算,分区间计算
        //val i: Int = rdd.reduce(_+_)
        //println(i)
        var sum = 0
        rdd.foreach(
            num => {
                sum += num
            }
        )
        println("sum = " + sum)
​
        sc.stop()
​
    }
  • 系统累加器

案例:

    def main(args: Array[String]): Unit = {
​
        val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
        val sc = new SparkContext(sparConf)
​
        val rdd = sc.makeRDD(List(1,2,3,4))
​
        // 获取系统累加器
        // Spark默认就提供了简单数据聚合的累加器
        val sumAcc = sc.longAccumulator("sum")
​
        //sc.doubleAccumulator
        //sc.collectionAccumulator
​
        rdd.foreach(
            num => {
                // 使用累加器
                sumAcc.add(num)
            }
        )
​
        // 获取累加器的值
        println(sumAcc.value)
​
        sc.stop()
​
    }

累加器的一些特殊情况:

少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
一般情况下,累加器会放置在行动算子进
    def main(args: Array[String]): Unit = {
​
        val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
        val sc = new SparkContext(sparConf)
​
        val rdd = sc.makeRDD(List(1,2,3,4))
​
        // 获取系统累加器
        // Spark默认就提供了简单数据聚合的累加器
        val sumAcc = sc.longAccumulator("sum")
​
        //sc.doubleAccumulator
        //sc.collectionAccumulator
​
        val mapRDD = rdd.map(
            num => {
                // 使用累加器
                sumAcc.add(num)
                num
            }
        )
​
        // 获取累加器的值
        // 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
        // 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
        // 一般情况下,累加器会放置在行动算子进行操作
        mapRDD.collect()
        mapRDD.collect()
        println(sumAcc.value)
​
        sc.stop()
​
    }
  • 自定义累加器

分布式共享只写变量

案例:

    def main(args: Array[String]): Unit = {
​
        val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
        val sc = new SparkContext(sparConf)
​
        val rdd = sc.makeRDD(List("hello", "spark", "hello"))
​
        // 累加器 : WordCount
        // 创建累加器对象
        val wcAcc = new MyAccumulator()
        // 向Spark进行注册
        sc.register(wcAcc, "wordCountAcc")
​
        rdd.foreach(
            word => {
                // 数据的累加(使用累加器)
                wcAcc.add(word)
            }
        )
​
        // 获取累加器累加的结果
        println(wcAcc.value)
​
        sc.stop()
​
    }
    /*
      自定义数据累加器:WordCount
​
      1. 继承AccumulatorV2, 定义泛型
         IN : 累加器输入的数据类型 String
         OUT : 累加器返回的数据类型 mutable.Map[String, Long]
​
      2. 重写方法(6)
     */
    class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
​
        private var wcMap = mutable.Map[String, Long]()
​
        // 判断是否初始状态
        override def isZero: Boolean = {
            wcMap.isEmpty
        }
​
        override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
            new MyAccumulator()
        }
​
        override def reset(): Unit = {
            wcMap.clear()
        }
​
        // 获取累加器需要计算的值
        override def add(word: String): Unit = {
            val newCnt = wcMap.getOrElse(word, 0L) + 1
            wcMap.update(word, newCnt)
        }
​
        // Driver合并多个累加器
        override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
​
            val map1 = this.wcMap
            val map2 = other.value
​
            map2.foreach{
                case ( word, count ) => {
                    val newCount = map1.getOrElse(word, 0L) + count
                    map1.update(word, newCount)
                }
            }
        }
​
        // 累加器结果
        override def value: mutable.Map[String, Long] = {
            wcMap
        }
    }
  • 广播变量

实现原理:

广播变量用来高效分发较大的对象。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务

分别发送。

案例:

    def main(args: Array[String]): Unit = {
​
        val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
        val sc = new SparkContext(sparConf)
​
        val rdd1 = sc.makeRDD(List(
            ("a", 1),("b", 2),("c", 3)
        ))
//        val rdd2 = sc.makeRDD(List(
//            ("a", 4),("b", 5),("c", 6)
//        ))
        val map = mutable.Map(("a", 4),("b", 5),("c", 6))
​
​
​
        // join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用
        //val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
        //joinRDD.collect().foreach(println)
        // (a, 1),    (b, 2),    (c, 3)
        // (a, (1,4)),(b, (2,5)),(c, (3,6))
        rdd1.map {
            case (w, c) => {
                val l: Int = map.getOrElse(w, 0)
                (w, (c, l))
            }
        }.collect().foreach(println)
​
​
​
        sc.stop()
​
    }

join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用

image-20240606162528164

Spark 中的广播变量就可以将闭包的数据保存到Executor的内存中

Spark 中的广播变量不能更改 : 分布式共享只读变量

image-20240606162609035

封装广播变量1

案例:

    def main(args: Array[String]): Unit = {
​
        val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
        val sc = new SparkContext(sparConf)
​
        val rdd1 = sc.makeRDD(List(
            ("a", 1),("b", 2),("c", 3)
        ))
        val map = mutable.Map(("a", 4),("b", 5),("c", 6))
​
        // 封装广播变量
        val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
​
        rdd1.map {
            case (w, c) => {
                // 方法广播变量
                val l: Int = bc.value.getOrElse(w, 0)
                (w, (c, l))
            }
        }.collect().foreach(println)
​
​
​
        sc.stop()
​
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1796324.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Next.js Tailwind CSS UI组件

摘要&#xff1a; 官网 今天公司使用到一个前端ui框架——Next.js Tailwind CSS UI组件&#xff01;这从头构建一个AI驱动的前端UI组件生成器&#xff0c;生成Next.js Tailwind CSS UI组件&#xff1a; 1、用Next.js、ts和Tailwind CSS构建UI组件生成器Web应用程序。 2、用Copi…

【MySQL】聊聊order by 是如何排序的

CREATE TABLE t (id int(11) NOT NULL,city varchar(16) NOT NULL,name varchar(16) NOT NULL,age int(11) NOT NULL,addr varchar(128) DEFAULT NULL,PRIMARY KEY (id),KEY city (city) ) ENGINEInnoDB;构建一个表结构&#xff0c;以及数据。 本篇主要来分析下order by是如何进…

前端 JS 经典:图片裁剪上传原理

前言&#xff1a;图片裁剪一般都是用户选择头像时用到&#xff0c;现在很多插件都可以满足这个功能&#xff0c;但是我们不仅要会用插件&#xff0c;还要自己懂的裁剪原理。 1. 流程 流程分为&#xff1a;1. 预览本地图片 2. 选择裁剪区域 3. 上传裁剪图像 2. 如何预览图片 …

作业06 递推算法1

作业&#xff1a; #include <iostream> using namespace std; int main(){long long a[110];short n;cin>>n;a[0]1;a[1]1;for(int i2;i<n;i){a[i]a[i-1]a[i-2];}cout<<a[n-1];return 0; } #include <iostream> using namespace std; int main(){lon…

【机器学习】机器学习与大模型在人工智能领域的融合应用与性能优化新探索

文章目录 引言机器学习与大模型的基本概念机器学习概述监督学习无监督学习强化学习 大模型概述GPT-3BERTResNetTransformer 机器学习与大模型的融合应用自然语言处理文本生成文本分类机器翻译 图像识别自动驾驶医学影像分析 语音识别智能助手语音转文字 大模型性能优化的新探索…

41【Aseprite 作图】粉红宫灯——拆解

1 宫灯轮廓 上面三角&#xff0c;下面3 3 3 &#xff08;粉色在后面&#xff0c;做轮廓&#xff09;&#xff0c;棕色在外面&#xff0c;看做是灯骨&#xff08;竖着更长&#xff09;&#xff1b;中间是横着做灯骨 尾部的彩带&#xff0c;下面粉色更浅&#xff0c;上面绿色更浅…

Eclipse添加C和C++编译成汇编文件的选项

在miscellaneous中添加assemble listing选项就可以生成汇编文件了

12_JavaWebAjax

文章目录 Ajax1. 同步请求异步请求2. Ajax实现方式3. 日程管理第四期4. 响应JSON串4.1 响应JSON串格式的一般格式 Appendix Ajax 发送请求的一些方式 1.输入浏览器回车 2.html>head>script/link ​ img标签 3.a标签form表单标签等 用户手动控制提交产生&#xff1b…

【MyBatis】MyBatis操作数据库(二):动态SQL、#{}与${}的区别

目录 一、 动态SQL1.1 \<if>标签1.2 \<trim>标签1.3 \<where>标签1.4 \<set>标签1.5 \<foreach>标签1.6 \<include>标签 二、 #{}与${}的区别2.1 #{}是预编译sql&#xff0c;${}是即时sql2.2 SQL注入2.3 #{}性能高于${}2.4 ${}用于排序功能…

MATLAB基础应用精讲-【数模应用】二元Logit分析(最终篇)(附python、MATLAB和R语言代码实现)

目录 算法原理 SPSSAU 1、二元logistic分析思路说明 2、如何使用SPSSAU进行二元logistic操作 3、二元logistic相关问题 算法流程 一、分析前准备 1、确定分析项 2.多重共线性判断 3.数据预处理 二、回归基本情况分析 三、模型拟合评价 1、似然比检验 2、拟合优…

C语言(数据存储)

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸各位能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎~~ &#x1f4a5;个人主页&#xff1a;小羊在奋斗 &#x1f4a5;所属专栏&#xff1a;C语言 本系列文章为个人学习笔记&#xff0c;在这里撰写成文一…

htb_Mailing

端口扫描 主页有一个download按键 点击后下载了一个pdf 大致看了一下&#xff0c;就是一个使用邮件服务器的说明书 不过最后有一个用户名Maya 目录扫描 /assests/ 一个存图片的地方 /download.php 需要指定下载文件&#xff0c;大概就是刚刚那个pdf 返回主页再次下…

编译等底层知识

目录 一. GCC命令语句大全 二. GCC编译4个阶段 三. makefile的使用 四. CMake 五. GNU工具链开发流程图 六. Keil中的地址段 七. 静态库和动态库 一. GCC命令语句大全 -c只编译源文件&#xff0c;生成目标文件&#xff08;.o 文件&#xff09;&#xff0c;不进行链接。…

学习使用Opentelemetry python SDK

前言 &#x1f4e2;博客主页&#xff1a;程序源⠀-CSDN博客 &#x1f4e2;欢迎点赞&#x1f44d;收藏⭐留言&#x1f4dd;如有错误敬请指正&#xff01; 一、什么是 OpenTelemetry OpenTelemetry 由 OpenTracing 和 OpenCensus 项目合并而成&#xff0c;是一组规范、工具、API…

DxO ViewPoint v4.8 解锁版安装教程 (校正几何和透视的图像处理)

前言 DxO ViewPoint中文版是一款能够校正几何和透视的图像处理软件,摄影师通过ViewPoint破解版修复构图和光学缺陷并恢复拍摄对象平衡,重新调整如弯曲架构和扭曲图案等细节,让图像具备更强冲击力和更优平衡性。 一、下载地址 下载链接&#xff1a;http://dygod/source 点击搜…

VMware Workstation虚拟机固定IP配置(主机互通、外网可访问)

VMware Workstation虚拟机固定IP配置 环境问题配置过程配置虚拟机网络适配器配置虚拟机网络配置虚拟网卡网络适配器配置虚拟机固定IP 结果验证结束语参考 环境 主机&#xff1a;Windows 11 VMware Workstation: 17.5.2 虚拟机&#xff1a;Ubuntu 24.02 LTS 注&#xff1a; 主…

VRRP----虚拟路由器冗余协议(技术专题)

目录 一、VRRP的基本原理 1.1 技术背景 1.2 VRRP带来了什么 1.2.1 VRRP的作用 1.2.2 VRRP工作的过程 1.2.3 VRRP报文: 1.3 VRRP术语 1.3.1 虚拟IP地址、MAC地址 1.3.2 Master、Backup路由器 二、VRRP的基础配置 实例一 需求 配置 一、VRRP的基本原理 1.1 技术背景…

Spring Cloud工程添加子模块打包后文件为war包而非jar包

Spring Cloud工程添加子模块打包后文件为war包而非jar包 Spring Cloud子模块打出的包通常是JAR包而非WAR包&#xff0c;这是因为Spring Cloud主要基于Spring Boot构建&#xff0c;而Spring Boot默认打包为可执行JAR包。然而&#xff0c;如果遇到了Spring Cloud子模块打成了WAR…

计算机毕业设计Spark+Flink+Hive地铁客流量预测 交通大数据 地铁客流量大数据 交通可视化 大数据毕业设计 深度学习 机器学习

项目说明​ ​ 1该项目主要分析通刷卡数据&#xff0c;通过大数据技术来研究地铁客运能力及探索优化服务的方向​ 2主要讲解Flink流处理实时分析部分&#xff0c;离线部分较简单&#xff0c;暂时略过​ ​ 技术架构​ ​项目流程&#xff1a;​ 采用python请求深圳地铁数…

每天五分钟深度学习:逻辑回归算法的单样本的梯度下降计算

本文重点 上节课我们已经知道了如何利用计算图通过链式法则来求解输出J对变量的梯度或者导数。本节课程我们将通过逻辑回归这一个具体的例子,来演示如何使用计算图完成逻辑回归的梯度下降算法。 逻辑回归 逻辑回归算法的目标函数,损失函数,代价函数,以及参数更新的方式如…