2023_Spark_实验十一:RDD高级算子操作

news2024/9/28 3:23:24


//checkpoint :

sc.setCheckpointDir("hdfs://Master:9000/ck") // 设置检查点

val rdd = sc.textFile("hdfs://Master:9000/input/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) // 执行wordcount任务的转换



rdd.checkpoint // Mark this RDD for checkpointing.

rdd.isCheckpointed

rdd.count //触发计算,日志显示:ReliableRDDCheckpointData: Done checkpointing RDD 27 to hdfs://hadoop001:9000/ck/fce48fd4-d76f-

4322-8d23-6a48d1aed7b5/rdd-27, new parent is RDD 28

rdd.isCheckpointed // res61: Boolean = true

rdd.getCheckpointFile // Option[String] = Some(hdfs://Master:9000/ck/b9a5add8-18d8-4056-9e8e-271d9522a29c/rdd-4)

coalesce :

总所周知,spark的rdd编程中有两个算子repartition和coalesce。公开的资料上定义为,两者都是对spark分区数进行调整的算子。

        repartition会经过shuffle,其实际上就是调用的coalesce(shuffle=true)。

        coalesce,默认shuffle=false,不会经过shuffle。

        当前仅针对coalesce算子考虑,我们看一下官方的定义:

        大概意思为:如果你想要从1000个分区到100个分区,并且不经过shuffle,近乎平均分配10个父分区到1个子分区。

        首先我说下我个人简单理解:不经过shuffle,就意味着coalesce算子前后都是在一个stage中的。从该stage开始到coalesce算子之前的任务的迭代执行的并行度都是1000,从coalesce算子开始到该stage结束的任务的迭代执行的并行度都是100。


val rdd1 = sc.parallelize(1 to 10, 10)

// 重新分区,分为两个2 ,不产生shuffle

val rdd2 = rdd1.coalesce(2, false)



// 获取新的RDD分区数

rdd2.partitions.length

def func1(index:Int,iter:Iterator[Int]):Iterator[String] = {

iter.toList.map(x=>"[PartID:"+index + ",value=" + x +"]").iterator

}

// 查看分区后的结果:

rdd2.mapPartitionsWithIndex(func1).collect



repartition:

val rdd1 = sc.parallelize(1 to 10, 4)

val rdd2 = rdd1.repartition(5)

collect、toArray

将RDD转换为Scala的数组。

collectAsMap

与collect、toArray相似。collectAsMap将key-value型的RDD转换为Scala的map。

注意:map中如果有相同的key,其value只保存最后一个值。


# 创建一个2分区的RDD

scala> var z = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

z: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[129] at parallelize at <console>:21

# 输出所有分区的数据

scala> z.collect

res44: Array[(String, Int)] = Array((cat,2), (cat,5), (mouse,4), (cat,12), (dog,12), (mouse,2))



# 转化为字典

scala> z.collectAsMap

res45: scala.collection.Map[String,Int] = Map(dog -> 12, cat -> 12, mouse -> 2)

scala>


collectAsMap

val rdd = sc.parallelize(List(("a", 1), ("b", 2)))

rdd.collectAsMap

//res2: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)

combineByKey与aggregateByKey相比较:

  1. 1.相同点:

  • 两者都能映射key值分别进行分区内计算和分区间计算。

    2. 不同点:

  • combineByKey有三个参数列表而且不需要初始值,而aggregateByKey只有两个参数列表且需要初始值。

aggregateByKey分区内计算示意图

//aggregateByKey存在函数颗粒化,有两个参数列表

//第一个参数列表,需要传递一个参数,表示为初始值

// 主要当碰见第一个key时候,和value进行分区内计算

//第二个参数列表,需要传递2个参数

// 第一个参数表示分区内计算

// 第二个参数表示分区间计算



rdd.aggregateByKey(zeroValue = 0)(

(x, y) => math.max(x, y),

(x, y) => x + y

).collect().foreach(println)

combineByKey分区内计算示意图

//combineByKey方法需要三个参数:

//第一个参数表示:将相同key的第一个数据进行结构转换,实现操作

//第二个参数:分区内的计算规则

//第三个参数:分区间的计算规则

val newRDD: RDD[(String, (Int, Int))] = rdd.combineByKey(

v => (v, 1),

(t: (Int, Int), v) => {

(t._1 + v, t._2 + 1)

},

(t1 Int: , t2: Int) => {

(t1._1 + t2._1, t1._2 + t2._2)

}

)

​

combineByKey与aggregateByKey两者的核心区别,就是在组内初始计算时少许不同。

combineByKey // 是在这个PairRDDFunctions类下的方法

val rdd1 = sc.textFile("hdfs://Master:9000/input/word.txt").flatMap(_.split(" ")).map((_, 1))

val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) // 等价与reduceByKey(_ + _),结果:Array[(String, Int)] =

Array((is,1), (Giuyang,1), (love,2), (capital,1), (Guiyang,1), (I,2), (of,1), (Guizhou,2), (the,1))

rdd2.collect

val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) // 将每个key的value各自加10,结果:Array[(String, Int)]

rdd3.collect

val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)

val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)

val rdd6 = rdd5.zip(rdd4)

​

一、Rdd行动算子

1、【countByKey】统计存储在rdd中元组的key的个数,key是相同的就会进行计数+1。通过这个key 会生成一个Map Map中的key是原有中的key,value是原有key的个数;

2、【countByValue】统计在Rdd中存储元素的个数。会将rdd中每一个元组看作为一个value,若这个元组中元素是相同的,此时就会将生成Map中的value+1;

3、【filterByRange】对rdd中的元素过滤,并返回指定内容的数据。该函数作用于键值对RDD,对RDD中的元素进行过滤,返回键在指定范围中的元素;

4、【flatMapValues】主要是对存在元组中的value进行扁平化处理;

countByKey // 计算每个键出现的次数

val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))

rdd1.countByKey

rdd1.countByValue  // countByValue返回每个值的出现次数

filterByRange  // 【filterByRange】对rdd中的元素过滤,并返回指定范围的内容数据

val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))

val rdd2 = rdd1.filterByRange("b", "d")

rdd2.collect

flatMapValues

val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))

rdd3.flatMapValues(_.split(" "))

foldByKey

函数原型:

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

 作用:将RDD[K,V]根据K将V做折叠、合并处理,zeroValue作为初始参数,调用func得到V,

再根据Key按照func对V进行调用。

例子:

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2)))

rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at <console>:27

scala> rdd1.foldByKey(0)(_+_).collect

res3: Array[(String, Int)] = Array((A,2), (B,3))

说明: 将0应用到_+_上,Array(("A",0+0),("A",2+0)) 再进一步处理得到Array(("A",0+2))最终得到Array(("A",2))

foldByKey

函数原型:

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

 作用:将RDD[K,V]根据K将V做折叠、合并处理,zeroValue作为初始参数,调用func得到V,

再根据Key按照func对V进行调用。

例子:

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2)))

rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at <console>:27

scala> rdd1.foldByKey(0)(_+_).collect

res3: Array[(String, Int)] = Array((A,2), (B,3))

说明: 将0应用到_+_上,Array(("A",0+0),("A",2+0)) 再进一步处理得到Array(("A",0+2))最终得到Array(("A",2))

foldByKey

val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)

val rdd2 = rdd1.map(x => (x.length, x))

val rdd3 = rdd2.foldByKey("")(_+_)

val rdd = sc.textFile("hdfs://Master:9000/input/word.txt").flatMap(_.split(" ")).map((_, 1))

rdd.foldByKey(0)(_+_)

foreachPartition  // foreachPartition是spark-core的action算子,该算子源码中的注释是:Applies a function func to each parition of this RDD.(将函数func应用于此RDD的每个分区)

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)

rdd1.foreachPartition(x => println(x.reduce(_ + _)))

keyBy

val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

val rdd2 = rdd1.keyBy(_.length)

rdd2.collect

keys values

val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

val rdd2 = rdd1.map(x => (x.length, x))

rdd2.keys.collect

rdd2.values.collect

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1026159.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows虚拟机访问网页证书错误问题

问题&#xff1a; 显示证书错误&#xff0c;图片加载不出来&#xff0c;看着很别扭&#xff0c;如下&#xff1a; 方法: 1.先导出可用的证书&#xff1a; 可以将自己正常环境的证书导出来&#xff08;google浏览器为例&#xff09; 浏览器右上角三个竖点——设置——隐私设…

阿里云服务器开放的一个新端口,重启防火墙,端口未启动

问题&#xff1a; 阿里云网页开放的一个新端口后&#xff0c;重启防火墙&#xff0c;端口未启动&#xff0c;之前配置的也都停止了。 解决&#xff1a; 原因可能是阿里的服务控制了&#xff0c;只能一个个端口开启了。把新配置新端口也单独启用。 开启80端口指令 firewall-cm…

mysql启动不了问题

突然昨天早上起来&#xff0c;就发生了这一幕&#xff1a; 启动MySQL服务时出现&#xff02;mysql本地计算机上的MySQL服务启动后停止。某些在未由其他服务或程序使用时将自动停止&#xff02; 几经周折&#xff0c;终于在一个大佬的贴下求得了启动成功的经验&#xff0c;其中我…

Qt---day4---9.20

qt完成时钟&#xff1a; 头文件&#xff1a; #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPaintEvent> #include <QtDebug> #include <QPainter> #include <QTimerEvent> #include <QTime>QT_BEGIN_NAMESPACE names…

VirtualBox安装RockyLinux并使用ssh访问

文章目录 1 前言2 安装 Rocky Linux2.1 新建虚拟机2.2 设置虚拟机内存和CPU数量2.3 设置虚拟机硬盘大小2.4 完成设置2.5 启动虚拟机2.6 Rocky Linux 的安装2.6.1 直接回车2.6.2 等待 check 完成2.6.3 设置语言2.6.4 设置最小化安装2.6.5 去除分区设置的感叹号2.6.7 设置 root 账…

Hive 优化建议与策略

目录 ​编辑 一、Hive优化总体思想 二、具体优化措施、策略 2.1 分析问题得手段 2.2 Hive的抓取策略 2.2.1 策略设置 2.2.2 策略对比效果 2.3 Hive本地模式 2.3.1 设置开启Hive本地模式 2.3.2 对比效果 2.3.2.1 开启前 2.3.2.2 开启后 2.4 Hive并行模式 2.5 Hive…

详解C++静态多态和动态多态的区别

目录 1.多态的概念与分类 2.多态的作用 3.静态多态 4.动态多态 5.总结 1.多态的概念与分类 多态&#xff08;Polymorphisn&#xff09;是面向对象程序设计&#xff08;OOP&#xff09;的一个重要特征。多态字面意思为多种状态。在面向对象语言中&#xff0c;一个接口&…

【深度学习实验】前馈神经网络(三):自定义多层感知机(激活函数logistic、线性层算Linear)

目录 一、实验介绍 二、实验环境 1. 配置虚拟环境 2. 库版本介绍 三、实验内容 0. 导入必要的工具包 1. 构建数据集 2. 激活函数logistic 3. 线性层算子 Linear 4. 两层的前馈神经网络MLP 5. 模型训练 一、实验介绍 本实验实现了一个简单的两层前馈神经网络 激活函数…

一、【漏洞复现系列】Tomcat文件上传 (CVE-2017-12615)

1.1、漏洞原理 描述: Tomcat 是一个小型的轻量级应用服务器&#xff0c;在中小型系统和并发访问用户不是很多的场合下被普遍使用&#xff0c;是开发和调试JSP 程序的首选。 攻击者将有可能可通过精心构造的攻击请求数据包向服务器上传包含任意代码的 JSP 的webshell文件&#x…

100G QSFP28 100km光模块最新解决方案

随着信息时代的到来&#xff0c;数据传输的速度和距离要求越来越高。目前&#xff0c;易天光通信发布了具有超低成本、可实现100G超长距离传输新方案——100G QSFP28 100km光模块&#xff0c;该方案是在100G ZR4 80km光模块上的全面升级。 一、产品概述 100G ZR4 100km是专为…

requests模块高级用法练习

文章目录 模拟浏览器指纹发送get请求发送post请求文件上传服务器超时 模拟浏览器指纹 打开http://10.9.75.164/php/functions/setcookie.php网页&#xff0c;找到请求头的UA字段&#xff0c;这段信息是浏览器的指纹&#xff08;包括当前系统、浏览器名称和版本&#xff09;&am…

【再识C进阶3(上)】详细地认识字符串函数、进行模拟字符串函数以及拓展内容

小编在写这篇博客时&#xff0c;经过了九一八&#xff0c;回想起了祖国曾经的伤疤&#xff0c;勿忘国耻&#xff0c;振兴中华&#xff01;加油&#xff0c;逐梦少年&#xff01; 前言 &#x1f493;作者简介&#xff1a; 加油&#xff0c;旭杏&#xff0c;目前大二&#xff0c;…

【短文】sambe添加用户时报错Failed to add entry for user

2023年9月20日&#xff0c;周三晚上 Samba fails to add a user entry, how do I fix this? - Ask Ubuntu 也就是说&#xff0c;添加的sambe用户必须是Linux操作系统的用户

2023/09/20 day4 qt

做一个动态指针钟表 头文件 #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPainter> //绘制事件类 #include <QPaintEvent> //画家类 #include <QTime> #include <QTimer> #include <QTimerEvent> QT_BEGIN…

k8s使用时无法ping通服务器From IP地址 icmp_seq=1 Destination Host Unreachable

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

canvas-绘图库fabric.js简介

一般情况下简单的绘制&#xff0c;其实canvas原生方法也可以满足&#xff0c;比如画个线&#xff0c;绘制个圆形、正方形、加个文案。 let canvas document.getElementById(canvas);canvas.width 1200;canvas.height 600;canvas.style.width 1200px;canvas.style.height 6…

Canal实现Mysql数据同步至Redis、Elasticsearch

文章目录 1.Canal简介1.1 MySQL主备复制原理1.2 canal工作原理 2.开启MySQL Binlog3.安装Canal3.1 下载Canal3.2 修改配置文件3.3 启动和关闭 4.SpringCloud集成Canal4.1 Canal数据结构![在这里插入图片描述](https://img-blog.csdnimg.cn/c64b40c2231a4ea39a95aac81d771bd1.pn…

kafka消费者多线程开发

目录 前言 kafka consumer 设计原理 多线程的方案 参考资料 前言 目前&#xff0c;计算机的硬件条件已经大大改善&#xff0c;即使是在普通的笔记本电脑上&#xff0c;多核都已经是标配了&#xff0c;更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单…

java框架-Spring-容器创建过程

java框架-Spring-容器创建源码

pip pip3安装库时都指向python2的库

当在python3的环境下使用pip3安装库时&#xff0c;发现居然都指向了python2的库 pip -V pip3 -V安装命令更改为&#xff1a; python3 -m pip install <package>