学习spark笔记

news2024/9/20 15:06:36

✨ 学习 Spark 和 Scala

一 ​ 🐦Spark 算子

spark常用算子详解(小部分算子使用效果与描述不同)

Spark常用的算子以及Scala函数总结

Spark常用Transformations算子(二)

  1. Transformation 算子(懒算子):不会提交spark作业,从一个RDD转换成另一个RDD
  2. Action 算子:触发 SparkContext 提交job作业,将数据输出到Spark系统。返回类型是一个其他的数据类型

ps.show 属于action算子

一 Transformation 算子(Value数据类型)

1. 输入分区与输出分区 一对一

map,flatMap,mapPartirions

glom:算子将一个RDD分区中的元素 组成Array数组的形式 eg.RDD[Int] => RDD[Arrray[Int]]

请添加图片描述
请添加图片描述
在这里插入图片描述

2. 输入分区与输出分区多对一

union:我测试代码分区是叠加 网上别人文档是取最大分区 存疑。输出分区应大于输入分区,或者说输入2+2输出4。

cartesian:取笛卡儿积。问题同union

在这里插入图片描述在这里插入图片描述

3. 输入分区与输出分区多对多

groupby Key 算子 也是shuffle类算子。 (默认分区数与原RDD分区数一致可以重新指定)

sortby key 算子 也是shuffle类算子。 (默认分区数与原RDD分区数一致可以重新指定,默认使用范围分区器) 在spark ui界面查看 job sortby算子会触发一个job 这个Job是用于评估数据分布,评估结果用于后续的排序操作,并不是真正的排序操作。

4. 输出分区为输入分区子集

distnct 算子:也是 shuffle类算子 。map reducebykey map (默认分区数与原RDD分区数一致可以重新指定)

filter 算子:过滤算子

subtract 算子:也是 shuffle类算子 。RDD1 去除 RDD1与RDD2交集的 剩下的RDD1 (默认分区数与原RDD分区数一致可以重新指定)

intersection 算子: 也是 shuffle类算子 。返回两个RDD的交集并去重 (默认分区数与原RDD分区数一致可以重新指定)

sample 算子。采样 将 RDD 这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。内部实现是生成 SampledRDD(withReplacement, fraction, seed)。函数参数设置:withReplacement=true,表示有放回的抽样。withReplacement=false,表示无放回的抽样。
关于spark的sample()算子参数详解

takeSample:算子 不使用相对比例采样,而是按设定的采样个数进行采样

    println("distinct")
    val d1:RDD[String] = sparkSession1.sparkContext.makeRDD(Array("hello","hello","world"))
    val distinctRDD:RDD[String] = d1.map((_,1))
        .reduceByKey(_+_)
        .map(_._1)
    distinctRDD.foreach(println)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5. Cache 型算子

cache:算子将RDD元素从磁盘缓存到内存 相当于 persist(MEMORY_ONLY)函数。分区一对一的

persist:算子有缓存 内存 磁盘 压缩等多个参数可选

二 Transformation 算子(Key - Value数据类型)

1. 输入分区与输出分区一对一

mapValues:算子 对value处理 可以用map代替

在这里插入图片描述

2. 单个RDD聚集算子

combineByKey:combineByKey的使用 也是 shuffle类算子 。

reduceBykey:通过key值相同会去聚合 也是 shuffle类算子 。reuduceByKey 会在map端 先进行本地combine (预聚合),减少传输到reduce端的数据量,减少网络传输的开销。只有reduceByKey处理不了的时候,会用 groupByKey().map() 代替

eg.SparkStreaming 算子:reduceByKeyAndWindow 窗口函数 每10秒计算一下前15秒的内容

  1. 存储上一个window的reduce值
  2. 计算出上一个window的begin 时间到 重复段的开始时间的reduce 值 =》 oldRDD
  3. 重复时间段的值结束时间到当前window的结束时间的值 =》 newRDD
  4. 重复时间段的值等于上一个window的值减去oldRDD =》coincodeRDD
  5. coincodeRDD + newRDD
//PairRDDFunction ,PairRDD属于RDD RDD的方法也通用 PairRDD 就是键值对的RDD 也是 RDD[Tuple2[]]   
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }
//入参的func 必须是(V,V)=>V的类型 操作两个Value;返回的结果必须是(Key,Value) 
//可以用self和this访问自身成员
//withScope 源码参照
  private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)


Spark源码之withScope方法的理解

partitionBy:自定义分区器 也是 shuffle类算子 。

repartitionAndSortWithinPartitions(partitioner):该方法根据partitioner对RDD进行分区,并且在每个结果分区中按key进行排序。也是 shuffle类算子 。

coalesce(numPartitions):重新分区,减少RDD中分区的数量到numPartitions。也是 shuffle类算子 。

repartition(numPartitions):repartition是coalesce接口中shuffle为true的简易实现,即Reshuffle RDD并随机分区,使各分区数据量尽可能平衡。若分区之后分区数远大于原分区数,则需要shuffle。

aggregateBykey :也是 shuffle类算子 。没有使用过 对所有分区的元素先聚合再fold操作?

3. 两个RDD聚集算子

cogroup :合并两个RDD,生成一个新的RDD。实例中包含两个Iterable值,第一个表示RDD1中相同值,第二个表示RDD2中相同值(key值),这个操作需要通过partitioner进行重新分区,因此需要执行一次shuffle操作。(若两个RDD在此之前进行过shuffle,则不需要)

4. 连接

join:对两个需要连接的 RDD 进行 cogroup函数操作,将相同 key 的数据能够放到一个分区,在 cogroup 操作之后形成的新 RDD 对每个key 下的元素进行笛卡尔积的操作,返回的结果再展平,对应 key 下的所有元组形成一个集合。最后返回 RDD[(K, (V, W))]。
  下 面 代 码 为 join 的 函 数 实 现, 本 质 是通 过 cogroup 算 子 先 进 行 协 同 划 分, 再 通 过flatMapValues 将合并的数据打散。
this.cogroup(other,partitioner).f latMapValues{case(vs,ws) => for(v<-vs;w<-ws)yield(v,w) }
图 20是对两个 RDD 的 join 操作示意图。大方框代表 RDD,小方框代表 RDD 中的分区。函数对相同 key 的元素,如 V1 为 key 做连接后结果为 (V1,(1,1)) 和 (V1,(1,2))。

leftOutJoin和rightOutJoin: LeftOutJoin(左外连接)和RightOutJoin(右外连接)相当于在join的基础上先判断一侧的RDD元素是否为空,如果为空,则填充为空。 如果不为空,则将数据进行连接运算,并
返回结果。
下面代码是leftOutJoin的实现。
if (ws.isEmpty) {
vs.map(v => (v, None))
} else {
for (v <- vs; w <- ws) yield (v, Some(w))
}

zip:拉齐算子 包括 zipWithIndex(下标为value) 该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对,zipWithUniqueId 该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:
每个分区中第一个元素的唯一ID值为:该分区索引号,
每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)
该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。

所以分区不用值也不一样

通过SparkContext 提交作业 触发RDD DAG 的执行

  /**
   * Return an array that contains all of the elements in this RDD.
   *
   * @note This method should only be used if the resulting array is expected to be small, as
   * all the data is loaded into the driver's memory.
   */
//collect 算子的方法 
  def collect(): Array[T] = withScope {
      //提交Job
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
      //_* 变长参数
    Array.concat(results: _*)
  }


三 Action 算子

1. 无输出算子

foreach:对RDD的每个元素应用 f 函数操作,不返回 RDD 和 Array ,返回的Unit

2. 输出到HDFS

saveAsTextFile : 算子 通过调用 saveAsHadoopFile 进行实现:
this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFileTextOutputFormat[NullWritable, Text]
将 RDD 中的每个元素映射转变为 (null, x.toString),然后再将其写入 HDFS

 /**
   * Save this RDD as a compressed text file, using string representations of elements.
   */
//第一个参数:Path为保存的路径;第二个参数:codec为压缩编码格式;
  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope {
    // https://issues.apache.org/jira/browse/SPARK-2075
    val nullWritableClassTag: ClassTag[NullWritable] = implicitly[ClassTag[NullWritable]]
    val textClassTag: ClassTag[Text] = implicitly[ClassTag[Text]]
    val r : RDD[(NullWritable,Text)] = this.mapPartitions { iter =>
      val text:Text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
  }

saveAsObjectFile 算子:将分区中的每10个元素组成一个Array,然后将这个Array序列化,映射为(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile的格式。
  下面代码为函数内部实现。
  map(x=>(NullWritable.get(),new BytesWritable(Utils.serialize(x))))
  图24中的左侧方框代表RDD分区,右侧方框代表HDFS的Block。 通过函数将RDD的每个分区存储为HDFS上的一个Block。

3. Scala集合和数据类型

collect 算子 相当于 toArray, toArray 已经过时不推荐使用, collect 将分布式的 RDD 返回为一个单机的 scala Array 数组。在这个数组上运用 scala 的函数式操作。
  图 25中左侧方框代表 RDD 分区,右侧方框代表单机内存中的数组。通过函数操作,将结果返回到 Driver 程序所在的节点,以数组形式存储。

collectAsMap: 对(K,V)型的RDD数据返回一个单机HashMap。 对于重复K的RDD元素,后面的元素覆盖前面的元素。

lookup(key:K):Seq[V]查找
Lookup函数对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq。 这个函数处理优化的部分在于,如果这个RDD包含分区器,则只会对应处理K所在的分区,然后返回由(K,V)形成的Seq。 如果RDD不包含分区器,则需要对全RDD元素进行暴力扫描处理,搜索指定K对应的元素。
 在这里插入图片描述

count : 返回整个RDD的元素个数

top:返回最大的K个元素 ·top返回最大的k个元素。·take返回最小的k个元素。·takeOrdered返回最小的k个元素,并且在返回的数组中保持元素的顺序。·first相当于top(1)返回整个RDD中的前k个元素,可以定义排序的方式Ordering[T]。返回的是一个含前k个元素的数组。

reduceByKeyLocally: 实现的是先reduce再collectAsMap的功能,先对RDD的整体进行reduce操作,然后再收集所有结果返回为一个HashMap。

在这里插入图片描述

reduce: 规约操作 scala当中的reduce可以对集合当中的元素进行归约操作。
reduce包含reduceLeft和reduceRight。reduceLeft就是从左向右归约,reduceRight就是从右向左归约。

(1 to 9).reduceLeft( _ * _) //相当于1 * 2 * 3 * 4 * 5 * 6 * 7 * 8 * 9 
(1 to 9).reduceLeft( _ + _) //相当于1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 
(1 to 9).reduce(_ + _) //默认是reduceLeft

fold :RDD牵涉到多个分区时,每个分区的初始值都会被累加一次 在累加的时候会再加一次

Spark算子 - fold

2*(2*1*2)*(2*3*4) = 192

在这里插入图片描述

四 有状态算子

val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
/*
参数1: reduce 计算规则
参数2: 窗口长度
参数3: 窗口滑动步长. 每隔这么长时间计算一次.
 */
val count: DStream[(String, Int)] = 
        wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,Seconds(15), Seconds(10))

countByWindow(windowLength, slideInterval): 返回一个滑动窗口的元素个数

countByValueAndWindow(windowLength, slideInterval, [numTasks]): 对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的的对象的v是其在滑动窗口中频率。如上,可配置reduce任务数量。

// Shuffle 性能优化
new SparkConf().set("spark.shuffle.consolidateFiles","true")

spark.shuffle.consolidateFiles: 是否开启 shuffle block file 的合并,默认为 false;
 spark.reducer.maxSizeInFlight: reduce task 的拉取缓存,默认 48M
 spark.shuffle.file.buffer : map task 的写磁盘缓存,默认 32k;
 spark.shuffle.io.maxRetries: 拉取失败的最大重试次数,默认 3 次;
 spark.shuffle.io.retryWait : 拉取失败的重试间隔,默认 5s
 spark.shuffle.memoryFraction: 用于 reduce 端聚合的内存比例,默认 0.2, 超过比例就会溢出到磁盘上;

StructuredStreaming

🎋 Spark 内存模型

在这里插入图片描述

https://blog.csdn.net/j904538808/article/details/78854742?utm_medium=distribute.pc_relevant.none-task-blog-baidulandingword-2&spm=1001.2101.3001.4242

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/458675.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AWT——事件处理机制

事件处理&#xff1a; 定义&#xff1a; 当某个组件上发生某些操作的时候&#xff0c;会自动地触发一段代码的执行 在GUI事件处理机制中涉及到4个重要的概念需要理解。 事件源&#xff1a;操作发生的场所&#xff0c;通常指某个组件&#xff0c;例如按钮、窗口等 事件&…

Spring Boot Web请求

在上一讲&#xff0c;学习了Spring Boot Web的快速入门以及Web开发的基础知识&#xff0c;包括HTTP协议以及Web服务器Tomcat等内容。基于SpringBoot的方式开发一个web应用&#xff0c;浏览器发起请求 /hello 后 &#xff0c;给浏览器返回字符串 “Hello World ~”。运行启动类启…

不能接受就滚,某外卖企业在汕尾的蛮横做法,或是它衰退的开始

近期某外卖平台因为大举降低外卖骑手的单价&#xff0c;引发高度关注&#xff0c;直到汕尾的外卖骑手集体抵制&#xff0c;引发与该外卖平台的激烈博弈&#xff0c;而外卖平台也显示了它的强硬手段&#xff0c;此举只会进一步激化矛盾&#xff0c;进而导致该外卖企业的衰落。 据…

Nginx的优化和防盗链

一、隐藏版本号 1、隐藏版本号的操作步骤 可以使用 Fiddler 工具抓取数据包&#xff0c;查看 Nginx版本&#xff0c; 也可以在 CentOS 中使用命令 curl -I http://192.168.229.90 显示响应报文首部信息。 curl -I http://192.168.2.661.1、方法一&#xff1a;修改配置文件方…

上海车展:预售价109.8万元,仰望U8见证国产品牌崛起

如果要评选2023上海车展上比亚迪展台“最亮的星”&#xff0c;估计很多媒体和观众都会毫不迟疑地把票投给仰望U8。 没办法&#xff0c;因为在本届车展上&#xff0c;仰望U8的表现实在是太吸睛了。 作为比亚迪旗下的高端新能源品牌&#xff0c;仰望汽车在上海车展上携两款车型—…

element+vue小技巧和报错解决(持续更新)

目录 1-关于el-table复选框中表头和内容不对齐问题 2-日期选择器传值给后端格式不对 3-获取表格的当前行数据#default"{row}" 1-关于el-table复选框中表头和内容不对齐问题 <el-table:data"tableData"stripestyle"width: 100%"tooltip-ef…

Flink IntervalJoin 笔记

Flink Join 一、Fink IntervalJoin 1、简要说明 Flink 中基于 DataStream 的join&#xff0c;只能实现在同一个窗口的两个数据流进行Join。但是实际中会存在数据乱序或者延时情况&#xff0c;导致两个流的数据进度不一致。无法在同一个窗口内Join。 Flink 基于 KeyedStream…

详论YUM仓库的部署和NFS共享服务

目录 一:YUM仓库服务 1.YUM概述 2.安装源的准备 &#xff08;1&#xff09;软件仓库的提供方式 &#xff08;2&#xff09;RPM软件包的来源 &#xff08;3&#xff09;创建Centos7软件仓库 &#xff08;4&#xff09;在软件仓库加入非官方RPM包 3.访问YUM仓库 4.安装FT…

商品页面翻页功能--购物车拓展

之前我们在mvc练习中曾经写过翻页功能&#xff0c;现在我们给购物车产品显示界面也加一个 1、把productlist中dao的sql语句做出修改&#xff0c;并传递需要用到的参数 再来一个返回product总数的方法 2、 对productlist的servlet拓展相关操作&#xff0c;准备好翻页的功能 3、…

访问图像像素

Opencv访问图像像素 预备知识: 图像矩阵的大小取决于所用的颜色模型(或者说通道数)&#xff0c;灰度图矩阵如下: 多通道图像&#xff0c;如RGB颜色模型的矩阵如下&#xff1a; 注&#xff1a;opencv的通道顺序是BGR&#xff0c;而不是RGB 访问图像中像素方法: import num…

新建虚拟机更改ip(连接xshell)

# 查看网络设备 [rootcentos79 ~]# nmcli device DEVICE TYPE STATE CONNECTION ens32 ethernet 已连接 ens32 ens33 ethernet 已连接 ens33 virbr0 bridge 已连接 virbr0 lo loopback 未托管 -- # 查看…

Unity之OpenXR+XR Interaction Toolkit实现 抓取物体

前言 我们今天来说一下如何使用XR Interaction Toolkit来实现和3D物体的交互之&#xff1a;抓取&#xff0c;简单说就是通过VR手柄拿起来一个物体。 二.准备工作 有了前两篇的配置介绍,我们就不在详细说明这些了&#xff0c;大家自行复习 Unity之OpenXRXR Interaction Toolk…

PyCharm连接远程服务器配置过程

目录 背景 一、建立远程服务器连接 1.创建远程连接 2.进行本地项目与远程项目之间的文夹路径映射 3.设置自动上传项目&#xff08;如有需要&#xff0c;可设置&#xff09; 4.验证是否连接成功&#xff08;调出服务器的文件目录&#xff09; 二、本地配置Python解释器 …

【社区图书馆】-《科技服务与价值链》总结

【为什么研究价值链】 价值链及价值链协同体系是现代产业集群的核心枢纽&#xff0c;是推进城市群及产业集群化、服务化、生态化发展的纽带。因而推进价值链协同&#xff0c;创新发展价值链协同业务科技资源体系&#xff0c;既是科技服务业创新的重要方向&#xff0c;也是重塑生…

NetApp ONTAP: 企业级数据管理软件,为无缝混合云奠定基础

为何选择 NetApp ONTAP 进行企业数据管理 NetApp ONTAP 数据管理软件可帮助您快速应对新的业务挑战&#xff0c;简化日常活动并给您的团队留下深刻印象。无论您在内部环境和云中有着怎样的数据管理需求&#xff0c;ONTAP 都能满足您。 1、支持当今的数据驱动型企业 当今的企…

升级底座、打破壁垒、消灭报销,让业财融合一触即发!

一个平台 一个入口 一站服务 以移动互联网、云计算、大数据、人工智能、5G与物联网、区块链为代表的新一代信息通信技术&#xff08;ICT&#xff09;的集群式、交互式发展&#xff0c;驱动企业进入数智化新阶。商业创新是打造企业竞争力的必然选择&#xff0c;在数字化转型大潮…

p68 内网安全-域横向 PTHPTKPTT 哈希票据传递

数据来源 ​ ​ Kerberos 协议具体工作方法&#xff0c;在域中&#xff0c;简要介绍一下&#xff1a; 客户机将明文密码进行 NTLM 哈希,然后和时间戳一起加密(使用krbtgt 密码 hash 作为密钥)&#xff0c;发送给 kdc&#xff08;域控&#xff09;&#xff0c;kdc 对用户进行检…

C语言从入门到精通第11天(数组的基本操作)

数组的基本操作 数组的概念一维数组二维数组 数组的概念 在程序设计中&#xff0c;为了方便处理数据把具有相同类型的若干变量按有序形式集合在一起&#xff0c;这些按序排列的同类数据元素的集合称为数组。 在C语言中&#xff0c;数组属于构造数据类型&#xff0c;一个数组可…

聊聊如何通过APT+AST来实现AOP功能

前言 如果有使用过spring aop功能的小伙伴&#xff0c;应该都会知道spring aop主要是通过动态代理在运行时&#xff0c;对业务进行切面拦截操作。今天我们就来实现一下如何通过APTAST在编译期时实现AOP功能。不过在此之前先科普一下APT和AST相关内容 APT&#xff08;注解处理…

Nginx的重写功能

一、常用的Nginx 正则表达式 字符涵义以及示例^匹配输入字符串的起始位置$匹配输入字符串的结束位置*匹配前面的字符零次或多次&#xff1b;如“ol*”能匹配“o”及“ol”、“oll”匹配前面的字符一次或多次&#xff1b;如“ol”能匹配“ol”及“oll”、“olll”&#xff0c;但…