水善利万物而不争,处众人之所恶,故几于道💦
文章目录
- 1. collect()
- 2. count()
- 3. first()
- 4. take()
- 5. takeOrdered()
- 6. countByKey()
- 7. saveAS...()
- 8. foreach()
- 9. foreachPartition() ***
1. collect()
收集RDD每个分区的数据以数组封装之后发给Driver
如果RDD数据量比较大,Driver内存默认只有1G,可能出现内存溢出,工作中一般需要将Driver内存设置为5-10G。可以通过bin/spark-submit --driver-memory 10G
这样设置
@Test
def collect(): Unit ={
val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6))
val arr = rdd1.collect()
println(arr.toList)
}
结果:
2. count()
返回RDD中元素的个数
@Test
def count(): Unit ={
val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6))
println(rdd1.count())
}
结果:
3. first()
返回RDD中的第一个元素
他会从多个分区取数据,如果0号分区取到了数据的话就只有一个job;如果0号分区没有取到数据,或者取到的数据不够,那就会再启动一个job去其他分区取
@Test
def first(): Unit ={
val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6),7)
// 0号分区没有数据所以就会再启动一个job从后面的分区取,所以web页面看到有两个job
val i = rdd1.first()
println(i)
Thread.sleep(10000000)
}
结果:
4. take()
返回RDD中前n个元素组成的数组
take和first一样如果取到就一个job如果取不到或者没取够就再来一个job去取
@Test
def take(): Unit ={
val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6),3)
println(rdd1.take(3).toList)
Thread.sleep(10000000)
}
结果:
5. takeOrdered()
这个是取排序之后的前几个元素
takeOrdered没有shuffle,因为只需要每个分区取前三然后拉到一起再取一次前三就完事了
@Test
def takeOrdered(): Unit ={
val rdd1 = sc.parallelize(List(1, 7,98,3,7,86,23,54, 9, 42, 6),3)
val ints = rdd1.takeOrdered(3)
println(ints.toList)
Thread.sleep(1000000)
}
结果:
6. countByKey()
统计每个key出现的次数,返回的结果是(key,次数)
@Test
def countByKey(): Unit ={
val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))
val rdd2 = rdd1.countByKey()
println(rdd2.toList)
}
结果:
7. saveAS…()
saveAsTextFile(path)
将数据保存成text文件,有几个task就保存几个文件
saveAsSequenceFile(path)
将数据保存成Sequencefile文件【只有kv类型RDD有该操作,单值的没有】
saveAsObjectFile(path)
将数据序列化成对象保存到文件
@Test
def save(): Unit ={
val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))
rdd1.saveAsTextFile("output/text") // 为啥保存出来8个文件因为有8个task
rdd1.saveAsObjectFile("output/ObjectFile")
rdd1.saveAsSequenceFile("output/SequenceFile")
}
结果:
8. foreach()
遍历RDD中的每个元素
@Test
def foreach(): Unit = {
val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))
rdd1.foreach(println)
}
结果:
9. foreachPartition() ***
对每个分区遍历,参数列表传入的函数是针对每个分区的操作,有多少个分区函数就执行多少次
foreachPartition的使用场景是:一般用于将数据写入mysql/redis/hbase等位置,可以减少连接的创建、销毁次数,提高效率
@Test
def foreachPartition(): Unit ={
val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))
rdd1.foreachPartition(it=>{
var connection:Connection = null
var statement:PreparedStatement = null
try{
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456")
statement = connection.prepareStatement("insert into wc values(?,?)")
// 计数器
var count = 0
it.foreach(x=>{
statement.setString(1,x._1)
statement.setInt(2,x._2)
// 添加到批中,一批一批的执行
statement.addBatch()
// 满1000条执行一批
if(count % 1000 == 0){
statement.executeBatch()
// todo 执行完批后要记得clearBatch !!!!!
statement.clearBatch()
}
count = count+1
})
// 最后不满1000条的也执行一次
statement.executeBatch()
}catch {
case e:Exception => e.printStackTrace()
}finally {
if (connection != null) {
connection.close()
}
if (statement != null) {
statement.close()
}
}
})
}
结果: