【Spark分布式内存计算框架——Spark Core】4. RDD函数(中)Transformation函数、Action函数

news2025/1/12 12:07:50

3.2 Transformation函数

在Spark中Transformation操作表示将一个RDD通过一系列操作变为另一个RDD的过程,这个操作可能是简单的加减操作,也可能是某个函数或某一系列函数。值得注意的是Transformation操作并不会触发真正的计算,只会建立RDD间的关系图。

如下图所示,RDD内部每个方框是一个分区。假设需要采样50%的数据,通过sample函数,从 V1、V2、U1、U2、U3、U4 采样出数据 V1、U1 和 U4,形成新的RDD。
在这里插入图片描述
常用Transformation转换函数,加上底色为重要函数,重点讲解常使用函数:
在这里插入图片描述

3.3 Action函数

不同于Transformation操作,Action操作代表一次计算的结束,不再产生新的 RDD,将结果返回到Driver程序或者输出到外部。所以Transformation操作只是建立计算关系,而Action 操作才是实际的执行者。每个Action操作都会调用SparkContext的runJob 方法向集群正式提交请求,所以每个Action操作对应一个Job。

常用Action执行函数,加上底色为重要函数,后续重点讲解。
在这里插入图片描述

3.4 重要函数

RDD中包含很多函数,主要可以分为两类:Transformation转换函数和Action函数。
在这里插入图片描述

主要常见使用函数如下,一一通过演示范例讲解
在这里插入图片描述
基本函数
RDD中map、filter、flatMap及foreach等函数为最基本函数,都是都RDD中每个元素进行操作,将元素传递到函数中进行转换。

  • map 函数:
    map(f:T=>U) : RDD[T]=>RDD[U],表示将 RDD 经由某一函数 f 后,转变为另一个RDD。
  • flatMap 函数:
    flatMap(f:T=>Seq[U]) : RDD[T]=>RDD[U]),表示将 RDD 经由某一函数 f 后,转变为一个新的 RDD,但是与 map 不同,RDD 中的每一个元素会被映射成新的 0 到多个元素(f 函数返回的是一个序列 Seq)。
  • filter 函数:
    filter(f:T=>Bool) : RDD[T]=>RDD[T],表示将 RDD 经由某一函数 f 后,只保留 f 返回为 true 的数据,组成新的 RDD。
  • foreach 函数:
    foreach(func),将函数 func 应用在数据集的每一个元素上,通常用于更新一个累加器,或者和外部存储系统进行交互,例如 Redis。关于 foreach,在后续章节中还会使用,到时会详细介绍它的使用方法及注意事项。
  • saveAsTextFile 函数:
    saveAsTextFile(path:String),数据集内部的元素会调用其 toString 方法,转换为字符串形式,然后根据传入的路径保存成文本文件,既可以是本地文件系统,也可以是HDFS 等。

上述函数基本上都使用过,在后续的案例中继续使用,此处不再单独演示案例。

分区操作函数
每个RDD由多分区组成的,实际开发建议对每个分区数据的进行操作,map函数使用mapPartitions代替、foreache函数使用foreachPartition代替。
在这里插入图片描述
针对词频统计WordCount代码进行修改,针对分区数据操作,范例代码如下:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
/**
* 分区操作函数:mapPartitions和foreachPartition
*/
object SparkIterTest {
	def main(args: Array[String]): Unit = {
		// 创建应用程序入口SparkContext实例对象
		val sc: SparkContext = {
			// 1.a 创建SparkConf对象,设置应用的配置信息
			val sparkConf: SparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// 1.b 传递SparkConf对象,构建Context实例
			new SparkContext(sparkConf)
		}
		sc.setLogLevel("WARN")
		// TODO: 1、从文件系统加载数据,创建RDD数据集
		val inputRDD: RDD[String] = sc.textFile("datas/wordcount/wordcount.data", minPartitions = 2)
		// TODO: 2、处理数据,调用RDD集合中函数(类比于Scala集合类中列表List)
		/*
			def mapPartitions[U: ClassTag](
			f: Iterator[T] => Iterator[U],
			preservesPartitioning: Boolean = false
			): RDD[U]
		*/
		val wordcountsRDD: RDD[(String, Int)] = inputRDD
			// 将每行数据按照分隔符进行分割,将数据扁平化
			.flatMap(line => line.trim.split("\\s+"))
			// TODO: 针对每个分区数据操作
			.mapPartitions{ iter =>
			// iter 表示RDD中每个分区中的数据,存储在迭代器中,相当于列表List
				iter.map(word => (word, 1))
			}
			// 按照Key聚合统计, 先按照Key分组,再聚合统计(此函数局部聚合,再进行全局聚合)
			.reduceByKey((a, b) => a + b )
		// TODO: 3、输出结果RDD到本地文件系统
		wordcountsRDD.foreachPartition{ datas =>
			// 获取各个分区ID
			val partitionId: Int = TaskContext.getPartitionId()
			// val xx: Iterator[(String, Int)] = datas
			datas.foreach{ case (word, count) =>
				println(s"p-${partitionId}: word = $word, count = $count")
			}
		}
		// 应用程序运行结束,关闭资源
		sc.stop()
	}
}

为什么要对分区操作,而不是对每个数据操作,好处在哪里呢???

  • 应用场景:处理网站日志数据,数据量为10GB,统计各个省份PV和UV。
    • 假设10GB日志数据,从HDFS上读取的,此时RDD的分区数目:80 分区;
    • 但是分析PV和UV有多少条数据:34,存储在80个分区中,实际项目中降低分区数目,比如设置为2个分区。
      在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/331727.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

int、uint类型的比较与加减

uint与int的比较 int与uint比较时会把int转换成uint&#xff0c;一个负的int转换成uint会溢出。所以uint与int比较大小时容易得到错误的结果&#xff0c;如&#xff1a; #include <iostream> using namespace std;int main(int, char**) {cout << "compare …

IC真题 —— 刷题记录(1)

引言 记录一些 我自己刷的 IC行业招聘真题&#xff0c;不是每题记录&#xff0c;只记录一些值得记录的&#xff0c;写下自己的看法。主要是一些数字IC行业题目&#xff0c;偏前端。 1、有一个逐次逼近型 8位A/D 转换器&#xff0c;若时钟频率为250KHz&#xff0c;完成一次转换…

2023备战金三银四,自动化软件测试面试宝典合集

1.软件测试的定义是什么&#xff1f; 参考答案&#xff1a; 用手工或者自动化的方式执行测试用例的一个过程 2.软件测试的对象包括哪些&#xff1f; 参考答案&#xff1a; 源程序、目标程序、数据和相关文档 3.试结合软件开发流程模型&#xff0c;描述对应不同的阶段测试需要…

Linux系统

Linux系统 Linux操作系统&#xff1a;Windows、Mac Linux一切皆文件&#xff1a;文件就 读、写、&#xff08;权限&#xff09; Linux——》Redis——》Docker 学习方式&#xff1a; 认识Linux 基本的命令&#xff08;重点&#xff1a; git 讲了一些基本的命令&#xff0…

Windows上实现 IOS 自动化测试

本文介绍如何使用tideviceWDAairtest/facebook-wda实现在Windows上进行IOS APP自动化测试 环境准备 Windows Python环境 Python 3.6 WebDriverAgent安装 下载最新的项目到Mac&#xff1a;https://github.com/appium/WebDriverAgent $ git clone https://github.com/appiu…

求你了,不要再在对外接口中使用枚举类型了!

最近&#xff0c;我们的线上环境出现了一个问题&#xff0c;线上代码在执行过程中抛出了一个IllegalArgumentException&#xff0c;分析堆栈后&#xff0c;发现最根本的的异常是以下内容&#xff1a; java.lang.IllegalArgumentException: No enum constant com.a.b.f.m.a.c.A…

GEE遥感云大数据在林业中的应用

近年来遥感技术得到了突飞猛进的发展&#xff0c;航天、航空、临近空间等多遥感平台不断增加&#xff0c;数据的空间、时间、光谱分辨率不断提高&#xff0c;数据量猛增&#xff0c;遥感数据已经越来越具有大数据特征。遥感大数据的出现为相关研究提供了前所未有的机遇&#xf…

STM32开发(8)----CubeMX配置串口通讯(中断方式和DMA方式)

CubeMX配置串口通讯&#xff08;中断方式和DMA方式&#xff09;前言一、中断方式1.CubeMX配置2.代码实现3.实验结果二、DMA方式1.CubeMX配置2.代码实现3.实验结果总结前言 本章继续介绍使用STM32CubeMX对串口进行配置的方法&#xff0c;串口通讯有三种方式&#xff1a;轮询&am…

看完这篇 教你玩转渗透测试靶机vulnhub——Source:1

Vulnhub靶机Source:1渗透测试详解Vulnhub靶机介绍&#xff1a;Vulnhub靶机下载&#xff1a;Vulnhub靶机安装&#xff1a;Vulnhub靶机漏洞详解&#xff1a;①&#xff1a;信息收集&#xff1a;②&#xff1a;远程命令执行漏洞 CVE-2019-15017&#xff1a;③&#xff1a;获取FLAG…

MySQL篇02-三大范式,多表查询

数据入库时,由于数据设计不合理&#xff0c;会存在数据重复、更新插入异常等情况, 故数据库中表的设计遵循的设计规范&#xff1a;三大范式1.第一范式(1NF)要求数据库的每一列都是不可分割的原子数据项&#xff0c;即原子性。强调的是列的原子性&#xff0c;即数据库中每一列的…

TOUGH系列软件建模实践方法及在地下水、CO2地质封存、水文地球化学、地热等多相多组分系统多过程耦合

查看原文>>> https://mp.weixin.qq.com/s?__bizMzAxNzcxMzc5MQ&mid2247578057&idx7&sn75f8d2c1c6edb28af76a8db4bb773de3&chksm9be2aed9ac9527cf0081082cdcf781e6c37f9f3ba383332ed1116abcbee0f05c0593187e964d&token2070450548&langzh_CN#r…

PostgreSQL查询引擎——General Expressions Grammar之restricted expression

General expressions语法规则定义在src/backend/parser/gram.y文件中&#xff0c;其是表达式语法的核心。有两种表达式类型&#xff1a;a_expr是不受限制的类型&#xff0c;b_expr是必须在某些地方使用的子集&#xff0c;以避免移位/减少冲突。例如&#xff0c;我们不能将BETWE…

TOOM舆情监测方案关键词设置,网络舆情监测方案有哪些举措?

网络舆情监测是通过在线社交媒体平台和其他网络渠道收集、分析和评估公众对某一话题的看法和反应的过程。目的是了解舆论趋势&#xff0c;提高社会影响力&#xff0c;帮助公司或组织了解公众对其产品或服务的评价&#xff0c;TOOM舆情监测方案关键词设置&#xff0c;网络舆情监…

docker快速部署xxjob2.3.0-SpringBoot快速集成示例

xxjob 2.3.0 部署 参考资料 docker安装xxl-job-admin步骤_JEECG低代码平台的技术博客_51CTO博客 run前准备 1 新建数据库 xxl_job 2 建表sql(可以直接使) https://github.com/xuxueli/xxl-job/blob/master/doc/db/tables_xxl_job.sql建库sql # # XXL-JOB v2.4.0-SNAPSHOT…

编译原理—栈式存储分配、有参函数的活动记录、参数传递与x86汇编

编译原理—参数传递与x86汇编-1.栈式存储分配0.有参函数的活动记录1. swap1(int p , int q)2. swap2(int *p,int *q)3. swap3(int *p, int *q)4. swap4(int &p, int &q)-1.栈式存储分配 0.有参函数的活动记录 参数分别是整型、指针、引用时的参数传递及其汇编代码 1. …

OpenResty(4):OpenResty快速入门

1 hello world openresty1.9.3.1及以下版本&#xff0c;请使用content_by_lua命令&#xff1b;在 openresty1.9.3.2以上&#xff0c;content_by_lua 改成了 content_by_lua_block。可使用 nginx -V 命令查看版本号 方法一&#xff1a; 修改openresty中nginx配置文件nginx.con…

【Django】云笔记项目

一、介绍 用户可在系统中记录自己的笔记&#xff0c;用户的数据被存储在云笔记平台&#xff1b;用户和用户之间的数据为隔离存储&#xff08;登陆后才能使用相关笔记功能&#xff0c;且只能查阅自己的笔记&#xff09; 二、功能拆解 1、用户模块 注册&#xff1a;成为平台…

Python学习------起步7(字符串的连接、删除、修改、查询与统计、类型判断及字符串字母大小写转换)

目录 前言&#xff1a; 1.字符串的连接 join() 函数 2.字符串的删除&取代 replace()函数 3.字符串的修改&切割 &#xff08;1&#xff09;strip() 函数 &#xff08;2&#xff09;lstrip()函数 和 rstrip()函数 &#xff08;3&#xff09;split()函数-->…

Dubbo3简单使用

Dubbo3简单使用 &#x1f449; 使用Spring Boot实现Dubbo3&#xff0c;请参见以下地址。 # Dubbo3官网地址 https://cn.dubbo.apache.org/zh/# 使用SpringBoot实现Dubbo3的地址 https://cn.dubbo.apache.org/zh/docs3-v2/java-sdk/quick-start/spring-boot/# 该项目的git地址…

C语言位域(位段)详解

有些数据在存储时并不需要占用一个完整的字节&#xff0c;只需要占用一个或几个二进制位即可。例如开关只有通电和断电两种状态&#xff0c;用 0 和 1 表示足以&#xff0c;也就是用一个二进位。正是基于这种考虑&#xff0c;C语言又提供了一种叫做位域的数据结构。在结构体定义…