🚀 作者 :“大数据小禅”
🚀文章简介:本篇文章属于Spark系列文章,专栏将会记录从spark基础到进阶的内容
🚀 内容涉及到Spark的入门集群搭建,核心组件,RDD,算子的使用,底层原理,SparkCore,SparkSQL,SparkStreaming等,Spark专栏地址.欢迎小伙伴们订阅💪
常用算子合集
- Spark中的算子概述
- 转换算子与行动算子的区别于联系
- 常见的转换算子汇总
- map算子
- flatMap算子
- filter算子
- mapPartitions算子
- reduceByKey算子
- groupByKey算子
- sample算子
- sortBy 算子
- distinct 算子
- union算子
- foldByKey算子
- subtract算子
- join算子
- 常见的行动算子汇总
- reduce 算子
- collcet算子
- count算子
- take算子
- foreach算子
Spark中的算子概述
RDD 中的算子从功能上分为两大类
- 1.Transformation(转换算子) 它会在一个已经存在的 RDD 上创建一个新的 RDD,这也使得RDD之间存在了血缘关系与联系
- 2.Action(动作算子) 执行各个分区的计算任务, 结果返回到 Driver 中
特点
- 1.Spark 中所有的 Transformations 是 惰性 的, 不会立即执行获得结果. 只会记录在数据集上要应用的操作.当需要返回结果给 Driver 时, 才会执行这些操作, 这个特性叫做 惰性求值
- 2.每一个 Action 运行的时候, 所关联的所有 Transformation RDD 都会重新计算,
转换算子与行动算子的区别于联系
转换算子是spark中的一种操作,用于从一个RDD转换成另一个RDD,它可以被用来创建新的RDD,也可以被用来转换已有的RDD。它们提供了一种通用的方法来完成RDD的转换,如map、filter、groupByKey等。
行动算子是spark中的另一种操作,它们用于从一个RDD中收集数据,或者从一个RDD中计算结果,如collect、reduce、count等。行动算子可以基于RDD的转换算子的结果来进行计算,也可以基于一组RDD来进行计算。
总之,转换算子和行动算子之间有着紧密的联系,转换算子用于创建RDD,行动算子用于从RDD中收集数据和计算结果。
常见的转换算子汇总
map算子
- Map 将RDD的数据进行以一对一的关系转换成其他形式 输入分区与输出分区一对一
- collect: 收集一个弹性分布式数据集的所有元素到一个数组中,便于观察 适用于小型数据
- take : 取出对应数据的显示条数
- foreach(println(_)) : 遍历查看数据
- 结果: 1,4,9,16 (yo,1) (pai,1) (xc,1)
def mapTest(): Unit ={
val value = sc.parallelize(List(1, 2, 3, 4)).map(
value=>value*2
).collect().foreach(println(_))
val works = sc.parallelize(List("yo", "pai", "xc")).map(
work => (work, 1)
).collect().take(2).foreach(println(_))
}
flatMap算子
- flatMap算子的作用是将一行数据拆分成多个元素,并将所有元素放在一个新的集合中,返回一个新的RDD。
- 它与map算子的区别在于,map算子只是将一行数据拆分成一个元素,并将其放在新的集合中,
- 而flatMap算子可以将一行数据拆分成多个元素,并将所有元素放在一个新的集合中。
- 结果:y o p a i x c‘’
@Test
def flatmapTest(): Unit ={
val works = sc.parallelize(List("yo", "pai", "xc")).flatMap(
work=>(work)
).collect().foreach(println(_))
}
filter算子
- spark中的filter算子用于对RDD中的每个元素应用一个函数,根据函数的返回值是true还是false来决定是否将该元素放入新的RDD中。
- 也就是说,filter算子可以根据自定义函数中的逻辑,从源RDD中过滤出一个新的RDD。
- 结果:pai xc
@Test
def filterTest(): Unit ={
val works = sc.parallelize(List("yo", "pai", "xc")).filter(
//删选出不包含yo字段的
work=>(!work.contains("yo"))
).collect().foreach(println(_))
}
mapPartitions算子
- map算子是一对一的操作,会将一个RDD中的每一个元素都映射到另一个RDD中;
- 而mapPartitions算子是一对多的操作,它会将一个RDD中的每一个分区都映射到另一个RDD中,每个分区中的元素会被一次性处理,减少了操作次数,提高了处理效率。
- mapPartitions和map算子是一样的,只不过map是针对每一条数据进行转换,mapPartitions针对一整个分区近进行转换
- 场景:
- 1.如果说map后面有数据库的访问语句的话那如果说有几万条数据要查询就得进行几万次的连接建立这显然不符合逻辑
- 2.而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库
- map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
- 结果:10,20,30,40,50
@Test
def mapPartitionsTest(): Unit ={
val works = sc.parallelize(List(1,2,3,4,5)).mapPartitions(
//里面接收一个函数,函数里面接收一个集合,传递一个集合要求返回一个集合
//函数里面要求接收一个集合,并且把集合里的Iterator[T]每一条数据转换之后再返回一个集合回去
//Iterator[T] => Iterator[U]
item=>{
val ele=item.map(item=>item*10)
ele
}
)
println(works.collect().mkString(","))
}
reduceByKey算子
- reduceByKey((V, V) ⇒ V, numPartition)
- reduceByKey算子是spark中用于对pairRDD中key相同的元素进行聚合的算子。
- 它的作用是对pairRDD中的每个key的元素都进行reduce操作,将key对应的value值聚合到一起,从而实现对pairRDD的聚合操作。
- 结果:(勇哥,198) (小明,97)
@Test
def reduceByKeyTest(): Unit ={
val works = sc.parallelize(Seq(("勇哥", 100), ("勇哥", 98), ("小明", 97))).reduceByKey(
(a,b)=>a+b
)
println(works.collect().foreach(println(_)))
}
groupByKey算子
- groupByKey是Spark中的一个重要的转换操作,它的作用是对每个key对应的元素进行分组,然后将分组后的结果以key-value的形式返回,
- 其中key是原来的key,value是一个迭代器,迭代器中存放的是key对应的所有元素。
- groupByKey算子可用于对RDD中的元素进行分组,有时也可以用于聚合操作,但它的性能要比其他聚合函数低得多,因此一般情况下不推荐使用。
- 结果:
- (勇哥,CompactBuffer(100))
- (小红,CompactBuffer(98))
- (小明,CompactBuffer(97, 77))
//从本地集合创建RDD
val rdd = sc.parallelize(Seq(("勇哥", 100), ("小红", 98), ("小明", 97), ("小明", 77))).groupByKey()
println(rdd.collect().foreach(println(_)))
}
sample算子
- sample(withReplacement, fraction, seed)
- sample算子是spark中用来从一个RDD中抽样的算子,它可以根据指定的比例或数量从RDD中抽取一部分样本出来,可以用来做数据探索、模型开发等。
@Test
def sampleTest(){
//从本地集合创建RDD
val rdd = sc.parallelize(List(1,2,3,4,5,6,7)).sample(
//随机抽3个数字
withReplacement = true,2
)
println(rdd.collect().foreach(println(_)))
}
sortBy 算子
- sortBy 算子是将RDD中的元素按照指定的规则排序,其返回类型为排序后的RDD
- 结果: (Bob,70) (John,80) (Tom,90)
@Test
def sortByTest(){
val rdd = sc.parallelize(Array(("Tom",90),("Bob",70),("John",80))).sortBy(_._2)
println(rdd.collect().mkString(" "))
}
distinct 算子
- distinct 去除RDD中重复的元素。
- 结果: 4 6 2 1 3 5
@Test
def distinctTest(){
val rdd = sc.parallelize(List(1,2,3,4,4,5,6,6))
val distinctRDD = rdd.distinct()
println(distinctRDD.collect().mkString(" "))
}
union算子
- union算子是spark中用于将多个RDD合并成一个RDD的算子,结果RDD中包含了所有输入RDD中的元素,且不去重。
- subtract 可以从一个RDD中减去另一个RDD中的元素,以得到一个新的RDD。
- 结果: 1 2 3 1 2 3
@Test
def unionTest(){
val rdd1 = sc.parallelize(List(1,2,3))
val rdd2 = sc.parallelize(List(1,2,3))
val rdd = rdd1.union(rdd2)
println(rdd.collect().mkString(" "))
}
foldByKey算子
- foldByKey(zeroValue)((V, V) ⇒ V)
- 算子是对RDD中的key-value类型的数据按key进行聚合操作,将每个key对应的value进行聚合,
- 将聚合后的结果与zeroValue进行combine操作,返回一个新的RDD,
- 新的RDD中的每个元素是一个key-value对,其中key是原RDD中的key,value是zeroValue与原RDD中key对应的value的聚合结果。
- 结果: (勇哥,21) (小明,22)
@Test
def foldByKeyTest(){
//从本地集合创建RDD
val rdd = sc.parallelize(Seq(("勇哥", 1), ("小明", 1), ("小明", 1))).foldByKey(zeroValue = 20)((a,b)=>(a+b))
println(rdd.collect().foreach(println(_)))
}
subtract算子
- subtract算子是spark中的一种RDD操作,它可以接收两个RDD作为参数,并返回一个新的RDD
- 新RDD中包含第一个RDD中存在,但是第二个RDD中不存在的元素。
- 结果: 1 2
@Test
def subtractTest(){
val rdd1 = sc.parallelize(Seq(1,2,3,4,5))
val rdd2 = sc.parallelize(Seq(3,4,5,6,7))
val rdd3 = rdd1.subtract(rdd2)
println(rdd3.collect().foreach(println(_)))
}
join算子
- join算子是spark中的一种内连接算子,它可以将两个数据集中的相同键的元组连接起来。它可以在RDD、DataFrame和Dataset之间使用,
- 其中RDD和DataFrame可以使用join算子连接,而Dataset则可以使用joinWith算子连接。
- 结果: (2,(xc,xc1)) (1,(yo,yo1)) (3,(yong,yong1))
@Test
def joinTest(){
val rdd1 = sc.makeRDD(Array((1, "yo"), (2, "xc"), (3, "yong")))
val rdd2= sc.makeRDD(Array((1, "yo1"), (2, "xc1"), (3, "yong1")))
val rdd3=rdd1.join(rdd2)
println(rdd3.collect().mkString(" "))
}
常见的行动算子汇总
reduce 算子
- reduce 先聚合分区内数据,再聚合分区间数据
- 结果:10
@Test
def reduceTest(){
//从本地集合创建RDD
val rdd = sc.parallelize(List(1,2,3,4)).reduce(_+_)
println(rdd)
}
collcet算子
- collcet 先将结果数据集以数组Array的方式返回
- 结果:10 1 2 3 4
@Test
def collcetTest(){
//从本地集合创建RDD
val rdd = sc.parallelize(List(1,2,3,4))
println(rdd.collect().mkString(" "))
}
count算子
- count 返回RDD的元素个数
- 结果: 4
@Test
def countTest(){
//从本地集合创建RDD
val rdd = sc.parallelize(List(1,2,3,4))
println(rdd.count())
}
take算子
- take 返回RDD的前n个元素所组合而成的数组
- 结果: 1 2
@Test
def takeTest(){
//从本地集合创建RDD
val rdd: RDD[Int] = sc.parallelize(List(1,2,3,4))
println(rdd.take(2).mkString(" "))
}
foreach算子
- foreach 遍历RDD中的元素
- 结果: 1 2
@Test
def foreachTest(){
//从本地集合创建RDD
val rdd: RDD[Int] = sc.parallelize(List(1,2,3,4))
println(rdd.take(2).foreach(println(_)))
}
到这里spark的常用算子就总结完了,其实在Spark还有很多不同的算子本篇列举了一些日常开发中会比较常用的一些操作。对大数据感兴趣的小伙伴可看下方公众号一起交流!