Spark-Scala语言实战(9)

news2024/12/28 3:02:27

之前的文章中,我们学习了如何在spark中使用RDD方法的flatMap,take,union。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。

Spark-Scala语言实战(8)-CSDN博客文章浏览阅读675次,点赞16次,收藏10次。​今天开始的文章,我会带给大家如何在spark的中使用我们的RDD方法,今天学习RDD方法中的flatMap,take,union三种方法。希望我的文章能帮助到大家,也欢迎大家来我的文章下交流讨论,共同进步。https://blog.csdn.net/qq_49513817/article/details/137157697?今天的文章,我会继续带着大家如何在spark的中使用我们的RDD方法。今天学习RDD方法中的filter,distinct,intersection三种方法,并做一道相关例题。

一、知识回顾

昨天我们学习了RDD的三种方法,分别是flatMap,take,union。

flatMap的一般作用是用来切分我们的单词

它会构建一个新的RDD 

take方法是用来获取我们RDD中前n个元素,n可以自行设置

union可以将我们的两个RDD进行合并操作

但使用我们的union方法时,需保证两个RDD的数据类型相同,否则无法运行。

现在,开始今天的学习吧。

二、RDD方法

1.filter

  • filter()方法是一种转换操作,用于过滤RDD中的元素。
  • filter()方法需要一个参数,这个参数是一个用于过滤的函数,该函数的返回值为Boolean类型。
  • filter()方法将返回值为true的元素保留,将返回值为false的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新RDD
import org.apache.spark.{SparkConf, SparkContext}

object p1 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("p2")
    val sc=new SparkContext(conf)
    val p = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val rdd = sc.parallelize(p)
    // 使用filter操作过滤出所有偶数
    val pp = rdd.filter(x => x % 2 == 0)
    // 收集结果并打印
    val ppp = pp.collect()
    ppp.foreach(println)
  }
}

可以看到我们的代码创建了一个1到10的数组,也可以看到注释中我们的需求是筛出里面包括的偶数,那么我们运行代码得到的就应该是2,4,6,8,10,现在,运行我们的代码看看是否得到预期的值吧。

可以看到左下角成功输出代码预期值。

2.distinct

  •  distinct()方法是一种转换操作,用于RDD的数据去重,去除两个完全相同的元素,没有参数。
import org.apache.spark.{SparkConf, SparkContext}

object p1 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("p2")
    val sc=new SparkContext(conf)
    val p = Array(1, 2, 2, 3, 4, 4, 5, 5, 5)
    val pp = sc.parallelize(p)
    // 使用distinct操作去除重复元素
    val ppp = pp.distinct()
    // 收集结果并打印
    val pppp = ppp.collect()
    pppp.foreach(println)

  }
}

可以看到我们的代码给了一组重复数据特别多的数组,那么我们的distinct方法肯定就是要将它进行降重操作了,那么我们现在运行代码来看一下。

可以看到我们预期的降重实现了,但是它的输出顺序特别混乱,这是因为Spark 的分布式计算模型决定了数据在不同分区之间可能会被打乱,并且在执行 distinct 操作时可能会进行重分区。因此,即使你的输入数组  是有序的,经过 distinct 处理后的输出数组很可能不是有序的。

那么要解决这个问题,我们肯定需要手动排序了

在这里我们就可以使用到sorted进行排序。

    val ppppp=pppp.sorted
    ppppp.foreach(println)

把这两行代码加到末尾,运行代码

可以看到输出预期中降重并排序的结果了。 

3.intersection

  •  intersection()方法用于求出两个RDD的共同元素,即找出两个RDD的交集,参数是另一个RDD,先后顺序与结果无关。
import org.apache.spark.{SparkConf, SparkContext}

object p1 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("p2")
    val sc=new SparkContext(conf)
    val p1 = sc.parallelize(Array(1, 2, 3, 4, 5))
    val p2 = sc.parallelize(Array(4, 5, 6, 7, 8))
    // 计算两个RDD的交集
    val ppp = p1.intersection(p2)
    // 收集结果并打印
    val ppppp = ppp.collect()
    ppppp.foreach(println)

  }
}

 看代码,我们定义了两个数组,那么既然intersection是求交集,那么运行代码输出的肯定是两个数组中的共同元素,即4,5。运行代码

可以看到成功输出我们交集4与5

三、任务实现

现在,我们有两个csv文件,里面有我们大量的薪资信息,我们现在需要做的事情如下: 

  • 输出上半年或下半年实际薪资大于20万元的员工姓名。
  • 首先需要过滤出两个RDD中实际薪资大于20万元的员工姓名。
  • 再将两个RDD得到的员工姓名合并到一个RDD中,对员工姓名进行去重。
  • 即可得到上半年或下半年实际薪资大于20万元的员工姓名。

想要完成它,并不困难,现在我们把文件放在C盘的根目录下,方便寻找,当然这个位置可以自己随便放。

然后编写我们的代码:

import org.apache.spark.{SparkConf, SparkContext}

object p1 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("p2")
    val sc=new SparkContext(conf)
    // 从C盘根目录读取第一个CSV文件
    val p1 = sc.textFile("C:\\Employee_salary_first_half.csv")
    // 从C盘根目录读取第二个CSV文件
    val p2 = sc.textFile("C:\\Employee_salary_second_half.csv")
    // 使用mapPartitionsWithIndex方法跳过CSV文件的标题行
    val pp1 = p1.mapPartitionsWithIndex((ix,it) => {
      if (ix ==0) it.drop(1)
      it
    })
    val pp2 = p2.mapPartitionsWithIndex((ix, it) => {
      if (ix == 0) it.drop(1)
      it
    })
    // 将pp1中的每一行转换为(员工名, 工资)元组
    val ppp1 = pp1.map(
      Line => {val data = Line.split(",");(data(1),data(6).toInt)})//使用逗号分割每行数据, 提取第二列和第七列数据,并将第七列转换为整数
    val ppp2 = pp2.map(
      Line => {val data = Line.split(",");(data(1),data(6).toInt)})
    val pppp1=ppp1.filter(x => x._2 > 200000).map(x => x._1)// 找出ppp1中工资超过200,000的元组,并只保留员工名
    val pppp2=ppp2.filter(x => x._2 > 200000).map(x => x._1)//x._n,n即使你要找的元素,通过 ._1 来访问第一个元素 a,通过 ._2 来访问第二个元素 b。
    val ppppp=pppp1.union(pppp2).distinct()//合并并降重
    ppppp.collect().foreach(println)//逐行打印
  }
}

我们先读取了两个文件,在将文件的标题行进行跳过,再分割数据找出需要的两行,最后找出工资大于200000的数据打印

来看看运行效果

可以看到我们预期的输出效果达到了。

拓展-方法参数设置

方法参数描述使用例子不同参数/效果
filterfunc对RDD中的每个元素应用函数func,返回True的元素保留,返回False的元素被过滤掉rdd.filter(lambda x: x > 3)通过修改func,可以定义不同的过滤条件,从而保留或过滤掉不同的元素。例如,lambda x: x % 2 == 0会保留偶数。
distinct返回一个包含RDD中所有不同元素的新RDD,去重rdd.distinct()此方法没有参数,它直接返回一个新的RDD,其中包含了原始RDD中的所有不同元素。这对于去除重复项非常有用。
intersectionother返回当前RDD与另一个RDDother的交集,结果中不包含重复元素rdd1.intersection(rdd2)other参数指定了另一个RDD,该方法将返回两个RDD中共有的元素。改变other的值将会影响交集的结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1559549.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

keepalived+LVS高可用部署

目录 一.两台设备(2.130和2.133)作为调度器,前主后备 1.部署keepalived 2.修改配置文件准备启动 3.配置keepalived的系统日志并启动 二.模拟调度器掉点和web服务进程丢失 1.调度器掉点 2.当类似于httpd这种网站服务掉点 三.以三种健康…

【A-008】基于SSH的员工信息管理系统(含论文)

员工信息管理系统主要功能如下: (1)员工方面: ①人事管理:能够看到自己的基本信息,对自己的合同进行下载,可以看公司的培训信息; ②招聘管理:查看企业的招聘信息,包括应聘者&#x…

基于SpringBoot和Vue的学生笔记共享平台的设计与实现

今天要和大家聊的是一款基于SpringBoot和Vue的学生笔记共享平台的设计与实现 !!! 有需要的小伙伴可以通过文章末尾名片咨询我哦!!! 💕💕作者:李同学 💕&…

【Qt】常用控件(输入类)

目录 一、Line Edit二、Text Edit三、ComBo四、DateTimeEdit五、Slider 一、Line Edit QLineEdit 用来表示单行输入框,可以输入一段文本,但是不能换行。 属性说明test输入框中的文本inputMask输入内容格式约束maxLength最大长度frame是否添加边框echoM…

AES加密解密算法

一,AES算法概述 AES属于分组加密,算法明文长度固定为128位(单位是比特bit,1bit就是1位,128位等于16字节) 而密钥长度可以是128、192、256位 当密钥为128位时,需要循环10轮完成加密&#xff0…

OpenEuler华为欧拉系统安装教程及联网配置

OpenEuler简介 openEuler是一款开源操作系统。当前openEuler内核源于Linux,支持鲲鹏及其它多种处理器,能够充分释放计算芯片的潜能,是由全球开源贡献者构建的高效、稳定、安全的开源操作系统,适用于数据库、大数据、云计算、人工智…

人脸检测项目 | 基于C++在英特尔+ARM-CPU上部署人脸检测算法_推理速度可达1000fps

项目应用场景 面向在英特尔ARM CPU 平台部署轻量级的人脸检测算法,要求人脸检测算法的速度是能够达到实时的的场景,项目采用 C 开发,支持跨平台移植到包括 Linux、Windows 等 项目效果: 项目细节 > 具体参见项目 README.md (1…

【LeetCode: 330. 按要求补齐数组 + 贪心 + 构造区间】

🚀 算法题 🚀 🌲 算法刷题专栏 | 面试必备算法 | 面试高频算法 🍀 🌲 越难的东西,越要努力坚持,因为它具有很高的价值,算法就是这样✨ 🌲 作者简介:硕风和炜,…

linux进程退出之exit与_exit

linux进程退出之exit与_exit _exitexit流程清理函数atexit()函数:on_exit()函数: _exit /* Terminate program execution with the low-order 8 bits of STATUS. */ /** status参数定义了进程的终止状态,父进程可以通过wait(&am…

使用open3d分离背景和物体点云

一、代码 方法简单介绍 RANSAC(随机采样一致性)是一种常用的分割算法,通常用于从点云中分割出最大的平面(如地面、墙壁等)。RANSAC速度相对较快,特别是当点云数据量不是很大时。在物体与背景之间存在明显…

蓝桥杯 本质上升序列

题目描述: 小蓝特别喜欢单调递增的事物。 在一个字符串中,如果取出若干个字符,将这些字符按照在字符串中的顺序排列后是单调递增的,则成为这个字符串中的一个单调递增子序列。 例如,在字符串 lanqiao 中,如果取出字符…

蓝桥杯省赛刷题——题目 2656:刷题统计

刷题统计OJ链接:蓝桥杯2022年第十三届省赛真题-刷题统计 - C语言网 (dotcpp.com) 题目描述 小明决定从下周一开始努力刷题准备蓝桥杯竞赛。他计划周一至周五每天做 a 道题目,周六和周日每天做 b 道题目。请你帮小明计算,按照计划他将在第几…

Segger Embedded Studio IDE使用体验——默认的Section和Linker的设置

Segger Embedded Studio IDE使用体验之一——默认的Section和Linker的设置 一、简介二、操作2.1 编译后代码分析2.1.1 符号浏览器2.1.2 读取elf文件和map文件 2.2 调试2.2.1 查看变量2.2.2 设置供电 2.3 运行环境设置2.3.1 编译器2.3.2 汇编器2.3.3 包含其他文件2.3.4 .bss和.d…

iOS问题记录 - App Store审核新政策:隐私清单 SDK签名(持续更新)

文章目录 前言开发环境问题描述问题分析1. 隐私清单 & SDK签名1.1. 隐私清单 - 数据使用声明1.2. 隐私清单 - 所用API原因描述1.3. SDK签名 2. 即将发布的第三方SDK要求 解决方案最后 前言 前段时间用Flutter开发的iOS App提交了新版本,结果刚过两分钟就收到了…

基于springboot实现旅游网站系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现旅游网站系统演示 摘要 随着科学技术的飞速发展,各行各业都在努力与现代先进技术接轨,通过科技手段提高自身的优势,旅游网站当然也不能排除在外,随着旅游网站的不断成熟,它彻底改变了过去传统的旅游…

魔改一个过游戏保护的CE

csdn审核不通过 网易云课堂有配套的免费视频 int0x3 - 主页 文章都传到github了 Notes/外挂/魔改CE at master MrXiao7/Notes GitHub 为什么要编译自己的CE 在游戏逆向的过程中,很多游戏有保护,我们运行原版CE的时候会被检测到 比如我们开着CE运…

【AXIS】AXI-Stream FIFO设计实现(四)——异步时钟

前文介绍了几种同步时钟情况下的AXI Stream FIFO实现方式,一般来说,FIFO也需要承担异步时钟域模块间数据传输的功能,本文介绍异步AXIS FIFO的实现方式。 如前文所说,AXI-Stream FIFO十分类似于FWFT异步FIFO,推荐参考前…

AtCoder Beginner Contest 347 (ABCDEF题)视频讲解

A - Divisible Problem Statement You are given positive integers N N N and K K K, and a sequence of length N N N, A ( A 1 , A 2 , … , A N ) A(A_1,A_2,\ldots,A_N) A(A1​,A2​,…,AN​). Extract all elements of A A A that are multiples of K K K, divi…

2-HDFS常用命令及上传下载流程

HDFS NameNode 安全模式(safemode) 当NameNode被重启的时候,自动进入安全模式 在安全模式中,NameNode首先会触发edits_inprogress文件的滚动。滚动完成之后,更新fsimage文件 更新完成之后,NameNode会将fsimage文件中的元数据加…

新闻管理系统(源码+文档)

新闻管理系统(小程序、ios、安卓都可部署) 文件包含内容程序简要说明含有功能项目截图客户端新闻详情新闻首页分类退出登录个人中心拨打客服热线注册界面个人资料新闻评论成功 管理端用户管理分类管理新闻管理 文件包含内容 1、搭建视频 2、流程图 3、开…