Spark【RDD编程(二)RDD编程基础】

news2024/11/28 20:55:38

前言

接上午的那一篇,下午我们学习剩下的RDD编程,RDD操作中的剩下的转换操作和行动操作,最好把剩下的RDD编程都学完。

Spark【RDD编程(一)RDD编程基础】

RDD 转换操作

6、distinct

对 RDD 集合内部的元素进行去重,然后把去重后的其他元素放到一个新的 RDD 集合内。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDDTransForm {
  def main(args: Array[String]): Unit = {
    // 创建SparkContext对象
    val conf = new SparkConf()
    conf.setAppName("spark core rdd transform").setMaster("local")
    val sc = new SparkContext(conf)

    // 通过并行集合创建RDD对象
    val arr = Array("Spark","Flink","Spark","Storm")
    val rdd1: RDD[String] = sc.parallelize(arr)

    val rdd2: RDD[String] = rdd1.distinct()

    rdd2.foreach(println)
    //关闭SparkContext
    sc.stop()
  }
}

运行输出:

Flink
Spark
Storm

可以看到,重复的元素"Spark"被去除掉。 

7、union

        对 两个 RDD 集合进行并集运算,并返回新的 RDD集合,虽然是并集运算,但整个过程不会把重复的元素去除掉。
// 通过并行集合创建RDD对象
    val arr1 = Array("Spark","Flink","Storm")
    val arr2 = Array("Spark","Flink","Hadoop")
    val rdd1: RDD[String] = sc.parallelize(arr1)
    val rdd2: RDD[String] = sc.parallelize(arr2)

    val rdd3: RDD[String] = rdd1.union(rdd2)

    rdd3.foreach(println)

运行结果:

Spark
Flink
Storm
Spark
Flink
Hadoop
可以看到,重复的元素"Spark"和"Flink"没有被去除。

8、intersection

对两个RDD 集合进行交集运算。

// 通过并行集合创建RDD对象
    val arr1 = Array("Spark","Flink","Storm")
    val arr2 = Array("Spark","Flink","Hadoop")
    val rdd1: RDD[String] = sc.parallelize(arr1)
    val rdd2: RDD[String] = sc.parallelize(arr2)

    val rdd3: RDD[String] = rdd1.intersection(rdd2)

    rdd3.foreach(println)

运行结果:

Spark
Flink

"Spark"和"Flink"是两个RDD集合都有的。 

9、subtract

对两个RDD 集合进行差集运算,并返回新的RDD 集合。

rdd1.substract(rdd2) 返回的是 rdd1有而rdd2中没有的元素,并不会把rdd2中有rdd1中没有的元素也包进来。

// 通过并行集合创建RDD对象
    val arr1 = Array("Spark","Flink","Storm")
    val arr2 = Array("Spark","Flink","Hadoop")
    val rdd1: RDD[String] = sc.parallelize(arr1)
    val rdd2: RDD[String] = sc.parallelize(arr2)

    val rdd3: RDD[String] = rdd1.subtract(rdd2)

    rdd3.foreach(println)

运算结果:

Storm

"Storm"是rdd1中有的二rdd2中没有的,并不会返回"Hadoop"。 

10、zip

把两个 RDD 集合中的元素以键值对的形式进行合并,所以需要确保两个RDD 集合的元素个数必须是相同的。

// 通过并行集合创建RDD对象
    val arr1 = Array("Spark","Flink","Storm")
    val arr2 = Array(1,3,5)
    val rdd1: RDD[String] = sc.parallelize(arr1)
    val rdd2: RDD[Int] = sc.parallelize(arr2)

    val rdd3: RDD[(String,Int)] = rdd1.zip(rdd2)

    rdd3.foreach(println)

运行结果:

(Spark,1)
(Flink,3)
(Storm,5)

RDD 行动操作

RDD 的行动操作是真正触发计算的操作,计算过程十分简单。

1、count

返回 RDD 集合中的元素数量。

2、collect

以数组的形式返回 RDD 集合中所有元素。

3、first

返回 RDD 集合中的第一个元素。

4、take(n)

返回 RDD 集合中前n个元素。

5、reduce(func)

以规则函数func对RDD集合中的元素进行循环处理,比如将所有元素加到一起或乘起来。

6、foreach

对RDD 集合进行遍历,输出RDD集合中所有元素。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDDAction {

  def main(args: Array[String]): Unit = {
    // 创建SparkContext对象
    val conf = new SparkConf()
    conf.setAppName("spark core rdd transform").setMaster("local")
    val sc = new SparkContext(conf)

    //通过并行集合创建 RDD 对象
    val arr: Array[Int] = Array(1,2,3,4,5)
    val rdd: RDD[Int] = sc.parallelize(arr)

    val size: Long = rdd.count()

    val nums: Array[Int] = rdd.collect()

    val value: Int = rdd.first()

    val res: Array[Int] = rdd.take(3)

    val sum: Int = rdd.reduce((v1, v2) => v1 + v2)
    
    println("size = " + size)
    println("The all elements are ")
    nums.foreach(println)
    println("The first element in rdd is " + value)
    println("The first three elements are ")
    res.foreach(println)
    println("sum is " + sum)
    rdd.foreach(print)
    //关闭SparkContext
    sc.stop()
  }

}

运行结果:

size = 5
The all elements are 
1
2
3
4
5
The first element in rdd is 1
The first three elements are 
1
2
3
sum is 15
12345
Process finished with exit code 0

文本长度计算案例

计算 data 目录下的文件字节数(文本总长度)。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FileLength {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    conf.setAppName("spark core rdd transform").setMaster("local")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.textFile("data")
    val rdd2: RDD[Int] = rdd1.map(line => line.length)
    val fileLength: Int = rdd2.reduce((len1, len2) => len1 + len2)
    println("File length is " + fileLength)
    
    sc.stop()
  }
}

持久化

在Spark 中,RDD采用惰性机制,每次遇到行动操作,就会从头到尾开始执行计算,这对于迭代计算代价是很大的,因为迭代计算经常需要多次重复使用相同的一组数据。

  • 使用cache() 方法将需要持久化的RDD对象持久化进缓存中
  • 使用unpersist() 方法将持久化rdd从缓存中释放出来
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDDCache {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("spark core rdd transform").setMaster("local")
    val sc = new SparkContext(conf)

    val list = List("Hadoop","Spark","Hive","Flink")
    val rdd: RDD[String] = sc.parallelize(list)

    rdd.cache()

    println(rdd.count())  //第一次行动操作

    println(rdd.collect.mkString(",")) //第二次行动操作

    rdd.unpersist() //把这个持久化的rdd从缓存中移除,释放内存空间
    
    sc.stop()
  }
}

分区

分区的作用

        RDD 是弹性分布式数据集,通过 RDD 都很大,会被分成多个分区,分别保存在不同的节点上。进行分区的好处:  

  1. 增加并行度。一个RDD不分区直接进行计算的话,不能充分利用分布式集群的计算优势;如果对RDD集合进行分区,由于一个文件保存在分布式系统中不同的机器节点上,可以就近利用本分区的机器进行计算,从而实现多个分区多节点同时计算,并行度更高。
  2. 减少通信开销。通过数据分区,对于一些特定的操作(如join、reduceByKey、groupByKey、leftOuterJoin等),可以大幅度降低网络传输。

分区的原则

        使分区数量尽量等于集群中CPU核心数目。可以通过设置配置文件中的 spark.default.parallelism 这个参数的值,来配置默认的分区数目。

设置分区的个数 

1、创建 RDD对象时指定分区的数量

1.1、通过本地文件系统或HDFS加载

sc.textFile(path,partitionNum)

1.2、通过并行集合加载 

 对于通过并行集合来创建的RDD 对象,如果没有在参数中指定分区数量,默认分区数目为 min(defaultParallelism,2) ,其中defaultParallelism就是配置文件中的spark.default.parallelism。如果是从HDFS中读取文件,则分区数目为文件分片的数目。

2、使用repartition()方法重新设置分区个数

val rdd2 = rdd1.repartition(1)    //重新设置分区为1

自定义分区函数

继承 org.apache.spark.Partitioner 这个类,并实现下面3个方法:

  1. numPartitions: Int ,用于返回创建出来的分区数。
  2. getPartition(key: Any),用于返回给定键的分区编号(0~paratitionNum-1)。
  3. equals(),Java中判断相等想的标准方法。

注意:Spark 的分区函数针对的是(key,value)类型的RDD,也就是说,RDD中的每个元素都是(key,value)类型的,然后函数根据 key 对RDD 元素进行分区。所以,当要对一些非(key,value)类型的 RDD 进行自定义分区时,需要首先把 RDD 元素转换为(key,value)类型,然后再使用分区函数。

案例

将奇数和偶数分开写到不同的文件中去。

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

class MyPartitioner(numParts: Int = 2) extends Partitioner{
  //覆盖默认的分区数目
  override def numPartitions: Int = numParts
  //覆盖默认的分区规则
   override def getPartition(key: Any): Int = {
    if (key.toString.toInt%2==0) 1 else 0
  }
}
object MyPartitioner{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("partitioner").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    val data: Array[Int] = (1 to 100).toArray
    val rdd: RDD[Int] = sc.parallelize(data,5)
    val savePath:String = System.getProperty("user.dir")+"/data/rdd/out"
    rdd.map((_,1)).partitionBy(new MyPartitioner()).map(_._1).saveAsTextFile(savePath)

    sc.stop()
  }
}

我们在代码中创建RDD 对象的时候,我们指定了分区默认的数量为 5,然后我们使用我们自定义的分区,观察会不会覆盖掉默认的分区数量: 

运行结果:

我们可以看到,除了校验文件,一共生成了两个文件,其中一个保存了1~100的所有奇数,一个保存了1~100的所有偶数; 

综合案例

在上一篇博客中,我们已经做过WordCount了,但是明显篇幅比较长,这里我们简化后只需要两行代码:

    //使用本地文件作为数据加载创建RDD 对象
    val rdd: RDD[String] = sc.textFile("data/word.txt")
    //RDD("Hadoop is good","Spark is better","Spark is fast")
    val res_rdd: RDD[(String,Int)] = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    //flatMap:
    //RDD(Array("Hadoop is good"),Array("Spark is better"),Array("Spark is fast"))
    //RDD("Hadoop","is",good","Spark","is","better","Spark","is","fast"))

运行结果:

(Spark,2)
(is,3)
(fast,1)
(good,1)
(better,1)
(Hadoop,1)

总结

至此,我们RDD基础编程部分就结束了,但是RDD编程还没有结束,接下来我会继续学习键值对RDD、数据读写,最后总结性低做一个大的综合案例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/968018.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows Update Blocker,windows系统关闭自动更新工具

今天打开电脑发现系统又自动更新了 这一天天更新真的太烦了 然后我从网上找到一个工具 可以自由开启和关闭系统自动更新 这里分享一下网址:https://www.filehorse.com/download-windows-update-blocker/ 若网址失效,蓝奏云盘链接 https://wwgw.lanzouc.c…

mapboxGL3新特性介绍

概述 8月7日,mapboxGL发布了3版本的更新,本文带大家一起来看看mapboxGL3有哪些新的特性。 新特新 如上图所示,是mapboxGL官网关于新版的介绍,大致翻译如下: 增强了web渲染的质量、便捷程度以及开发人员体验&#xff…

一篇文章教会你如何编写一个简单的Shell脚本

文章目录 简单Shell脚本编写1. 简单脚本编写2. Shell脚本参数2.1 Shell脚本参数判断2.1.1 文件测试语句2.1.2 逻辑测试语句2.1.3 整数值测试语句2.1.4 字符串比较语句 3. Shell流程控制语句3.1 if 条件测试语句3.1.1 if...3.1.2 if...else...3.1.3 if...elif...else 4. Shell脚…

目标检测模型推理实验记录

在进行目标检测算法的学习过程中,需要进行对比实验,这里可以直接使用MMDetection框架来完成,该框架集成了许多现有的目标检测算法,方便我们进行对比实验。 环境配置 首先是环境配置,先前博主曾经有过相关方面的配置&…

【数据结构Java版】 初识泛型和包装类

目录 1.包装类 1.1基本数据类型以及它们所对应的包装类 1.2装箱和拆箱 1.3自动装箱和自动拆箱 2.什么是泛型 3.引出泛型 4.泛型类的使用 4.1语法 4.2示例 4.3类型推导 5.泛型是如何编译的 5.1擦除机制 5.2正确的写法 6.泛型的上届 6.1语法 6.2示例 …

腾讯云、阿里云、华为云便宜云服务器活动整理汇总

云服务器的选择是一个很重要的事情,避免产生不必要的麻烦,建议选择互联网大厂提供的云计算服务,腾讯云、阿里云、华为云就是一个很不错的选择,云服务器稳定性、安全性以及售后各方面都更受用户认可,下面小编给大家整理…

Kitchen Hook

双扛厨房排钩:挂刀具

linux 内存一致性

linux 出现内存一致性的场景 1、编译器优化 ,代码上下没有关联的时候,因为编译优化,会有执行执行顺序不一致的问题(多核单核都会出现) 2、多核cpu乱序执行,cpu的乱序执行导致内存不一致(多核出…

[二分查找] 旋转数组

1. &#xff08;严格递增序列&#xff09;旋转数组的元素查找 简单来说分为三种情况进行分析 1. 整个旋转数组单调递增 根据x和A[mid]的大小关系&#xff0c;更迭范围。 // 1. 整个旋转数组单调递增if (A[left]<A[right]){if (A[mid] x)return mid;else if (x < A[mid]…

C语言枚举类型enum详解、枚举变量。枚举函数

文章目录 枚举定义枚举应用枚举函数枚举函数2 枚举定义 关键字&#xff1a;enum 用途&#xff1a;定义一个取值受限制的整型变量&#xff0c;用于限制变量取值范围&#xff1b;宏定义的集合 定义枚举变量&#xff1a; enum{FALSE 0, TRUE 1} EnumName; 因为枚举变量类型较长…

矢量图片转换 Vector Magic for mac

Vector Magic会帮你进行自动识别和分析&#xff0c;转换过程中用户可选择相应的转换级别&#xff0c;从而达到自已所需的效果。 只需上传即可在线自动将 JPG、PNG、BMP 和 GIF 位图图像转换为真正的 SVG、Eps 和 PDF 矢量图像。真正的全彩描摹&#xff0c;无需安装软件&#xf…

java 对IP地址进行排序,或类ip地址的字符串进行排序

java 对IP地址进行排序&#xff0c;或类ip地址的字符串进行排序 排序前先认识一下这个拆分字符串非常好用的类 1.StringTokenizer类 1.1 构造方法 StringTokenizer(String str) &#xff1a;构造一个用来解析 str 的 StringTokenizer 对象。java 默认的分隔符是空格(“”)、…

PHP NBA球迷俱乐部系统Dreamweaver开发mysql数据库web结构php编程计算机网页

一、源码特点 PHP NBA球迷俱乐部系统是一套完善的web设计系统&#xff0c;对理解php编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 基于PHP的NBA球迷俱乐部 二、功能介绍 1、前台主要功能&#xff1a; 系统首页 网站介…

项目(补充2):智慧教室

一。emWin环境的搭建 1.codeBlock下载 开源免费。 2.使用stm的Cubemx提供的作图软件 &#xff08;1&#xff09;在C盘下找到第三方的固件库&#xff0c;旁边有个ST文件夹 注意&#xff1a;我在下载cubemx为默认的路径 &#xff08;2&#xff09;STemWin中的Soft提供了绘图…

【STM32】学习笔记(EXTI)

EXTI外部中断 中断&#xff1a;在主程序运行过程中&#xff0c;出现了特定的中断触发条件&#xff08;中断源&#xff09;&#xff0c;使得CPU暂停当前正在运行的程序&#xff0c;转而去处理中断程序&#xff0c;处理完成后又返回原来被暂停的位置继续运行 中断优先级&#x…

字符串颜色

字体颜色 30&#xff1a;黑 31&#xff1a;红 32&#xff1a;绿 33&#xff1a;黄 34&#xff1a;蓝色 35&#xff1a;紫色 36&#xff1a;深绿 37&#xff1a;白色 字体背景颜色 40&#xff1a;黑 41&#xff1a;深红 42&#xff1a;绿 43&#xff1a;黄色 44&#xff1a;蓝…

使用 Tkinter 在 Python 中构建井字游戏!

一、说明 做你还记得小时候玩井字游戏吗&#xff1f;这是一个简单的游戏&#xff0c;只需一支笔或铅笔就可以在一张纸上玩。但是你知道你也可以使用Python的Tkinter库创建一个井字游戏吗&#xff1f;在本文中&#xff0c;我们将介绍使用 Tkinter 创建井字游戏的过程。在本文结束…

vue的 ECMAScript 6的学习

一 ECMAScript 6 1.1 ECMAScript 6 ECMAScript 和 JavaScript 的关系是&#xff0c;前者是后者的规格&#xff0c;后者是前者的一种实现&#xff08;另外的 ECMAScript 方言还有 Jscript 和 ActionScript&#xff09;。 因此&#xff0c;ES6 既是一个历史名词&#xff0c;也…

word添加字体库

1001 Fonts ❤ Free Fonts Baby!51044 free fonts in 28637 families Free licenses for commercial use Direct font downloads Mac Windows Linuxhttps://www.1001fonts.com/ 下载字体后复制粘贴到下面的位置&#xff1a;

IDEA自定义模板

IDEA自定义模板 &#xff08;1&#xff09;定义sop模板 ①在Live Templates中增加模板 ②先定义一个模板的组 ③在模板组里新建模板 ④定义模板 Abbreviation:模板的缩略名称Description:模板的描述Template text:模板的代码片段应用范围。比如点击Define。选择如下&…