大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解

news2025/1/5 10:43:01

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节我们完成了如下的内容:

  • RDD的介绍
  • RDD的特点、特点介绍
  • Spark 编程模型的介绍

在这里插入图片描述

RDD 的创建

SparkContext

SparkContext是编写Spark程序用到的第一个类,是Spark的主要入口点,它负责和整个集群的交互。

  • 如果把Spark集群当做服务端,那么Driver就是客户端,SparkContext是客户端的核心
  • SparkContext是Spark对外的接口,负责向调用者提供Spark的各种功能
  • SparkContext用于连接Spark集群、创建RDD、累加器、广播变量

从集合创建RDD

我们在集群的节点上启动 Spark-Shell 进行学习和测试

spark-shell --master local[*]

如果顺利启动,你就可以看到如下的画面:
在这里插入图片描述

尝试运行如下的指令,感受一下

Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_412)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd2.getNumPartitions
res1: Int = 2

scala> rdd2.partitions.length
res2: Int = 2

scala> val rdd3 = sc.makeRDD(List(1,2,3,4,5))
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24

scala> val rdd4 = sc.makeRDD(1 to 100)
rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:24

scala> rdd4.getNumPartitions
res3: Int = 2

scala> 

对应的截图如下:
在这里插入图片描述

从文件系统创建RDD

用 textFile() 方法来从文件系统中加载数据创建RDD,方法将文件的URI作为参数:

  • 本地文件系统
  • 分布式文件系统 HDFS
  • Amazon S3的地址
# 本地系统 注意文件要确保存在
val lines = sc.textFile("file:///opt/wzk/1.txt")
# 从分布式文件系统加载
val lines = sc.textFile("hdfs://h121.wzk.icu:9000/wcinput/wordcount.txt")

运行结果如下图所示:
在这里插入图片描述

从RDD创建RDD

本质是将一个RDD转换为另一个RDD,从 Transformation

Transformation

RDD的操作算子分为两类:

  • Transformation,用来对RDD进行转换,这个操作时延迟执行的(或者是Lazy),Transformation,返回一个新的RDD
  • Action,用来触发RDD的计算,得到相关计算结果或者将结果保存到外部系统中,Action:返回int、double、集合(不会返回新的RDD)

每一个Transformation操作都会产生新的RDD,供给下一个“转换”使用
转换得到RDD是惰性求值,也就是说,整个转换过程只有记录了转换的轨迹,并不会发生真正的计算,只有遇到Action操作时,才会发生真正的计算,开始从学院关系(lineage)源头开始,进行物理的转换操作。

在这里插入图片描述

常见转换算子1

  • map(func):对数据集中的每个元素都用func,然后返回一个新的RDD
  • filter(func):对数据集中的每个元素都是用func,然后返回一个包含使func为true的元素构成RDD
  • flatMap(func):与map类似,每个输入元素被映射为0或多个输出元素
  • mapPartitions(func):和map很像,但是map是将func作用在每个元素上,而mapPartitions是func作用在整个分区上。假设一个RDD有N个元素,M个分区(N >> M),那么map的函数将被调用N次,而mapPartitions中的函数仅被调用M次,一次处理一个分区中的所有元素
  • mapPartitionsWithIndex(func):与mapPartitions类似,多了分区索引值信息

转换算子1测试

map filter

测试如下的代码:

val rdd1 = sc.parallelize(1 to 10)
val rdd2 = rdd1.map(_*2)
val rdd3 = rdd2.filter(_>10)

执行结果如下图:
在这里插入图片描述
我们可以查看当前的结果,但是当前的操作都是Transformation的,并没有真正的执行。
我们需要通过 collect 触发执行,拿到最终的结果

rdd2.collect
rdd3.collect

将会触发执行,可以看到结果为:
在这里插入图片描述

flatMap

我们从HDFS加载一个文件过来

val rdd4 = sc.textFile("hdfs://h121.wzk.icu:9000/wcinput/wordcount.txt")
rdd4.collect

执行结果如下图:

我们使用“a”作为分隔符,对这段内容进行分割:

rdd4.flatMap(_.split("a")).collect

执行结果如下图:
在这里插入图片描述

mapPartitions

val rdd5 = rdd1.mapPartitions(iter => iter.map(_*2))

执行结果如下
在这里插入图片描述

对比 map 和 mapPartitions

上面我们用:

  • rdd1.map(_*2)
  • rdd1.mapPartitions(iter => iter.map(_*2))

那么这两种有什么区别呢?

  • map:每次只处理一条数据
  • mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足时容易OOM
  • 当资源充足时,建议使用 mapPartitions,充分提高处理效率

常见转换算子2

  • groupBy(func):按照传入函数的返回值进行分组,将key相同的值放入一个迭代器
  • glom():将每一个分区形成一个数组,形成新的RDD类型RDD[Array[T]]
  • sample(withReplacement,fraction,seed):采样算子,以指定的随机数种子seed随机抽样出数量为fraction的数据,withReplacenent表示抽出数据是否放回,true则放回,false不放回
  • distinct([numTasks]):对RDD元素去重后,返回一个新的RDD,可传入numTasks参数改变RDD分区数
  • coalesce(numPartitions):缩减分区数,没有shuffle
  • repartition(numPartitions):增加或减少分区数,有shuffle
  • sortBy(func,[ascending], [numTasks]):使用func对数据进行处理,对处理后的结果进行排序

宽依赖的算子(shuffle):groupBy,distinct、repartition、sortBy

转换算子2测试

group by

val rdd1 = sc.parallelize(1 to 10)
val group = rdd1.groupBy(_%3)
group.collect

执行的结果如下图:
在这里插入图片描述

glom.map

将 RDD 中元素的每10个元素分组

val rdd1 = sc.parallelize(1 to 101)
val rdd2 = rdd1.glom.map(_.sliding(10, 10).toArray)
rdd2.collect

执行结果如下图:
在这里插入图片描述

sample

对数据采样,fraction表示采样的百分比

rdd1.sample(true, 0.2, 2).collect
rdd1.sample(false, 0.2, 2).collect
rdd1.sample(true, 0.2).collect

执行结果如下图:
在这里插入图片描述

distinct

对数据进行去重,我们生成一些随机数,然后对这些数值进行去重。

val random = scala.util.Random
val arr = (1 to 20).map(x => random.nextInt(10))
val rdd = sc.makeRDD(arr)
rdd.distinct.collect

执行结果如下图:
在这里插入图片描述

numSlices

对RDD重分区,我们需要多分一些区出来

val rdd1 = sc.range(1, 1000, numSlices=10)
val rdd2 = rdd1.filter(_%2==0)
rdd2.getNumPartitions

执行结果如下图:
在这里插入图片描述

repartition & coalesce

增加或者减少分区

rdd2.getNumPartitions
# repartition 是增加和缩减分区数
val rdd3 = rdd2.repartition(5)
# coalesce 是缩减分区数
val rdd4 = rdd2.coalesce(5)

执行结果如下图:
在这里插入图片描述

sortBy

rdd.sortBy(x => x).collect
rdd.sortBy(x => x).collect

执行结果如下:
在这里插入图片描述

coalesce & repartition

  • repartition:增大或者减少分区数,有shuffle
  • coalesce:一般用于减少分区数(此时无shuffle)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2040802.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CSS 伪类和伪元素

也是选择器的一种&#xff0c;被称为伪类和伪元素。这一类选择器的数量众多&#xff0c;通常用于很明确的目的。 伪类 什么是伪类 伪类是选择器的一种&#xff0c;它用于选择处于特定状态的元素。 比如当它们是这一类型的第一个元素时&#xff08;:first-child&#xff09;&…

Sentinel入门与进阶:微服务流量控制的最佳实践 ( 六 )

8.Gateway 整合 Sentinel&#xff08;熔断、限流&#xff09; 8.1.引入依赖 在 Spring Cloud Alibaba 2.1.6之前的版本&#xff0c;引入的是 sentinel-spring-cloud-gateway-adapter 包&#xff0c;并且需要自己实现好多配置类&#xff0c;2.1.6 之后的版本内部已经帮我们实现…

php 在app中唤起微信app进行支付,并处理回调通知

<?phpnamespace app\api\controller;use think\facade\Db; use think\facade\Log;class Wxzf {

计算机网络之分组交换时延的计算

一.类型 分组交换的时延包括一下几种&#xff1a; 1.1发送时延 发送时延&#xff0c;也叫传输时延&#xff0c;结点将分组的所有比特推向链路所需要的时间&#xff0c;即从发送分组的第一个比特算起&#xff0c;到该分组的最后一个比特发送完为止。 发送时延 分组长度 / 发…

Web Image scr图片从后端API获取基本实现

因系统开发中需求&#xff0c;会有页面显示图片直接从后端获取后显示&#xff0c;代码如下&#xff1a; 后端&#xff1a; /*** 获取图片流* param response* param fileName*/RequestMapping(value"getImgStream",method RequestMethod.GET)public void getImgStr…

【JAVA】深入理解守护线程与非守护线程:概念、应用及示例

文章目录 介绍1. 线程的基础知识2. 守护线程与非守护线程2.1 什么是守护线程&#xff1f;特点&#xff1a; 2.2 什么是非守护线程&#xff1f;特点&#xff1a; 3. 为什么需要守护线程&#xff1f;示例&#xff1a;后台任务处理示例&#xff1a;日志记录 4. 非守护线程的应用场…

Scrapy 项目部署问题及解决方案

部署 Scrapy 项目时可能会遇到一些常见问题。以下是几个常见的部署问题及其解决方案&#xff1a; 1、依赖问题 问题&#xff1a;部署后爬虫运行失败&#xff0c;通常是由于缺少依赖库。 2、配置问题 问题&#xff1a;爬虫在部署环境中无法正常运行&#xff0c;可能是由于配…

stm32智能颜色送餐小车(openmv二维码识别+颜色识别+oled显示)

大家好啊&#xff0c;我是情谊&#xff0c;今天我们来介绍一下我最近设计的stm32产品&#xff0c;我们在今年七月份的时候参加了光电设计大赛&#xff0c;我们小队使用的就是stm32的智能送餐小车&#xff0c;虽然止步于省赛&#xff0c;但是还是一次成长的经验吧&#xff0c;那…

深度学习基础之反向传播算法

目录 原理与过程 1. 前向传播&#xff08;Forward Pass&#xff09; 2. 计算误差&#xff08;Error Calculation&#xff09; 3. 反向传播&#xff08;Backpropagation&#xff09; 4. 参数更新&#xff08;Parameter Update&#xff09; 应用与实例 总结 反向传播算法…

1秒出图,全球最快的开源Stable Diffusion出炉

前言 OneFlow 将 Stable Diffusion 的推理性能推向了一个全新的 SOTA。 第一辆汽车诞生之初&#xff0c;时速只有 16 公里&#xff0c;甚至不如马车跑得快&#xff0c;很长一段时间&#xff0c;汽车尴尬地像一种“很酷的玩具”。人工智能作图的出现也是如此。 AI 作图一开始的…

大数据面试SQL(八):求连续段的起始位置和结束位置

文章目录 求连续段的起始位置和结束位置 一、题目 二、分析 三、SQL实战 四、样例数据参考 求连续段的起始位置和结束位置 一、题目 有一张表t2_id记录了id&#xff0c;id不重复&#xff0c;但是会存在间断&#xff0c;求出连续段的起始位置和结束位置。 样例数据&…

两个若依系统,不能同时登录问题解决方案

原因&#xff1a; 问题根源在于两个独立的系统&#xff08;A系统与B系统&#xff09;共享了同一cookie键名来存储各自用户的认证令牌&#xff08;token&#xff09;。这种设计导致了以下情形&#xff1a; 当用户在A系统登录后&#xff0c;一个token被存储在cookie中&#xff0…

【LeetCode每日一题】——623.在二叉树中增加一行

文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 广度优先遍历 二【题目难度】 中等 三【题目编号】 623.在二叉树中增加一行 四【题目描述】…

c语言学习,memset()函数分析

1&#xff1a;memset() 函数说明&#xff1a; 将字符c&#xff08;unsigned char&#xff09;复制到str字符串的前n个字符 2&#xff1a;函数原型&#xff1a; void * memset(void * str,int c,size_t n) 3&#xff1a;函数参数&#xff1a; 参数str要填充的指针,c 要设置的值…

2024下半年EI学术会议一览表

2024下半年将举办多个重要的EI学术会议&#xff0c;涵盖了从机器视觉、图像处理与影像技术到感知技术、绿色通信、计算机、大数据与人工智能等多个领域。 2024下半年EI学术会议一览表 第二届机器视觉、图像处理与影像技术国际会议&#xff08;MVIPIT 2024&#xff09;将于2024…

threejs webgl效果 功能特效

雷达效果 ​飘扬的红旗 光柱效果 OD线 下雪 下雨 光墙效果 能源球 烟火效果 threejs烟花效果 光圈效果 threejs 光圈 波动 function initScene() {scene new THREE.Scene();}function initCamera() {camera new THREE.PerspectiveCamera(45, window.innerWidth / window.inne…

培训学校课程管理系统-计算机毕设Java|springboot实战项目

&#x1f34a;作者&#xff1a;计算机毕设残哥 &#x1f34a;简介&#xff1a;毕业后就一直专业从事计算机软件程序开发&#xff0c;至今也有8年工作经验。擅长Java、Python、微信小程序、安卓、大数据、PHP、.NET|C#、Golang等。 擅长&#xff1a;按照需求定制化开发项目、 源…

MiniCPM-V 2.6 面壁“小钢炮”,多图、视频理解多模态模型,部署和推理实战教程

MiniCPM-V 2.6是清华和面壁智能最新发布的多模态模型&#xff0c;亦称面壁“小钢炮”&#xff0c;它是 MiniCPM-V 系列中最新、性能最佳的模型。该模型基于 SigLip-400M 和 Qwen2-7B 构建&#xff0c;仅 8B 参数&#xff0c;但却取得 20B 以下单图、多图、视频理解 3 SOTA 成绩…

leetcode300. 最长递增子序列,动态规划附状态转移方程

leetcode300. 最长递增子序列 给你一个整数数组 nums &#xff0c;找到其中最长严格递增子序列的长度。 子序列 是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的元素而不改变其余元素的顺序。例如&#xff0c;[3,6,2,7] 是数组 [0,3,1,6,2,2…

【扩散模型(七)】Stable Diffusion 3 diffusers 源码详解2 - DiT 与 MMDiT 相关代码(上)

系列文章目录 【扩散模型&#xff08;一&#xff09;】中介绍了 Stable Diffusion 可以被理解为重建分支&#xff08;reconstruction branch&#xff09;和条件分支&#xff08;condition branch&#xff09;【扩散模型&#xff08;二&#xff09;】IP-Adapter 从条件分支的视…